kafka客户端生产者

Posted by CHuiL on July 18, 2021

客户端发送消息

java代码连接简单示例

public class KafkaProducerAnalysis {
    public static final String brokerList = "HOSTNAME:9092";
    public static final String topic = "quickstart-events";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }

    public static Properties initNewConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

    public static Properties initPerferConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        return props;
    }

    public static void main(String[] args) throws InterruptedException {
        Properties props = initNewConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

//        KafkaProducer<String, String> producer = new KafkaProducer<>(props,
//                new StringSerializer(), new StringSerializer());

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
        try {
//            producer.send(record);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("get:"+metadata.partition() + ":" + metadata.offset());
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

        TimeUnit.SECONDS.sleep(5);
    }

}

image

主线程

主线程部分,就是我们代码中发送消息的部分,上面的代码只是最简单的demo,实际上我们在发送消息的时候,可以设定消息发送的拦截器,序列号器和分区器。

  • 拦截器,在消息发送之前可以做一些工作,比如哪些消息要发送,哪些不发送,或者全局附带一些额外的信息等
  • 序列化器:我们需要将所发送的消息最终序列化为字节数组才能发送
  • 分区器:一个topic消息具体发送到哪个分区上,可以根据一些额外的参数来进行计算,比如分区数量,指定的key值等,计算得到一个Integer。

以上的这些工具都可以自定义,实现我们的消息发送逻辑。

RecordAccumulator

这部分主要用来缓存消息以便Sender线程可以批量发送。

这里为每个分区都维护了一个双端队列,来保证按分区消息的顺序发送。

这里根据分区将我们产生的消息按顺序组装成一个个producerBatch,注意,这里一个producerBatch是由一个或者多个ProducerRecord组成,这样可以使得字节更紧凑,减少网络请求的次数以提升整体的吞吐量。

该缓存是有大小限制的,默认是32MB,如果生产的速度超过发送的速度,则该缓存可能会满,将导致新的消息发送被阻塞,或者超时抛出异常,超时时间默认60s。

sender线程

发送线程在从缓存中获取消息之后,需要先将消息由 <分区,Deque>的关系 转换为 <node,List>。 node即对应分区所在broker的节点。

我们在发送消息时,只关心将消息发送到指定topic上,在这之后,需要计算得到发送到哪个分区,之后还需要通过获取该分区leader副本在哪个broker哪个node上,将消息真正发送出去。

InFlightRequest

这部分内容会缓存已经发出去但还没有收到响应的请求。默认每个连接最多只能缓存5个未响应的请求,超过之后就不能再像这个连接发送更多的请求了。
通过比较该队列的大小可以判断对应的node是否积压了很多未响应的请求,依次来判断该node的负载。

元数据

元数据值得是我们发送消息时所有必要的数据。比如我们通过topic发送消息时候,需要获得该topic有哪些分区,分区数量,分区leader副本在哪个broker,broker在哪个node上,node的地址和端口,有了这些才能建立连接发送消息。

而且我们观察前面的代码,我们配置只配置了部分broker节点的地址,不需要配置所有的broker节点地址。 因为客户端可以自己发现其他broker节点的地址。客户端可以捕获到元数据的动态变化。

更新元数据时,是通过前面的InFlightRequest中判断得到的负债最小的节点进行更新的。