客户端发送消息
java代码连接简单示例
public class KafkaProducerAnalysis {
public static final String brokerList = "HOSTNAME:9092";
public static final String topic = "quickstart-events";
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
public static Properties initPerferConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initNewConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// KafkaProducer<String, String> producer = new KafkaProducer<>(props,
// new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
try {
// producer.send(record);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("get:"+metadata.partition() + ":" + metadata.offset());
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(5);
}
}
主线程
主线程部分,就是我们代码中发送消息的部分,上面的代码只是最简单的demo,实际上我们在发送消息的时候,可以设定消息发送的拦截器,序列号器和分区器。
- 拦截器,在消息发送之前可以做一些工作,比如哪些消息要发送,哪些不发送,或者全局附带一些额外的信息等
- 序列化器:我们需要将所发送的消息最终序列化为字节数组才能发送
- 分区器:一个topic消息具体发送到哪个分区上,可以根据一些额外的参数来进行计算,比如分区数量,指定的key值等,计算得到一个Integer。
以上的这些工具都可以自定义,实现我们的消息发送逻辑。
RecordAccumulator
这部分主要用来缓存消息以便Sender线程可以批量发送。
这里为每个分区都维护了一个双端队列,来保证按分区消息的顺序发送。
这里根据分区将我们产生的消息按顺序组装成一个个producerBatch
,注意,这里一个producerBatch是由一个或者多个ProducerRecord组成,这样可以使得字节更紧凑,减少网络请求的次数以提升整体的吞吐量。
该缓存是有大小限制的,默认是32MB,如果生产的速度超过发送的速度,则该缓存可能会满,将导致新的消息发送被阻塞,或者超时抛出异常,超时时间默认60s。
sender线程
发送线程在从缓存中获取消息之后,需要先将消息由 <分区,Deque
我们在发送消息时,只关心将消息发送到指定topic上,在这之后,需要计算得到发送到哪个分区,之后还需要通过获取该分区leader副本在哪个broker哪个node上,将消息真正发送出去。
InFlightRequest
这部分内容会缓存已经发出去但还没有收到响应的请求。默认每个连接最多只能缓存5个未响应的请求,超过之后就不能再像这个连接发送更多的请求了。
通过比较该队列的大小可以判断对应的node是否积压了很多未响应的请求,依次来判断该node的负载。
元数据
元数据值得是我们发送消息时所有必要的数据。比如我们通过topic发送消息时候,需要获得该topic有哪些分区,分区数量,分区leader副本在哪个broker,broker在哪个node上,node的地址和端口,有了这些才能建立连接发送消息。
而且我们观察前面的代码,我们配置只配置了部分broker节点的地址,不需要配置所有的broker节点地址。 因为客户端可以自己发现其他broker节点的地址。客户端可以捕获到元数据的动态变化。
更新元数据时,是通过前面的InFlightRequest中判断得到的负债最小的节点进行更新的。