Kafka生产者
生产者发送流程

- RecordAccumulator:在内存中,大小默认为32M;
- 每个分区都会对应的创建一个队列(DQuene);
- 队列为双端队列;
- 内存池:每次从内存池中获取一个批次的大小进行使用;
- 每个分区都会对应的创建一个队列(DQuene);
- 每个批次的大小默认为16K(包含N条消息);
- Sender线程主动拉取数据,满足
batch.size和linger.ms两个条件之一就会开始发送数据; - 发送线程中的NetworkClient最多可以缓存5个请求,如果5个请求一个都没返回结果,则第六个请求不在发送,等待返回结果;
- 发送成功之后==清理==请求和队列中的数据;
- Selector用于打通通道;
- 重试次数(retries)的次数默认为Integer的最大值;
异步发送
只发送消息,不关心消息是否发送成功。消息先存储在缓冲区中,达到设定条件后批量发送。当然这是 kafka 吞吐量最高的一种方式,并配合参数 acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息。但是也是消息最不可靠的一种方式,因为对于发送失败的消息没有做任何处理。
在发送消息之前有可能会发生异常,例如是序列化消息失败的 SerializationException、缓冲区满的 BufferExhaustedException、发送超时的 TimeoutException 或者发送的线程被中断的 InterruptException。发送消息之后并没有异常处理。
回调异步
异步发送,在调用 send()方法的时候指定一个 callback 函数,当 broker 接收到返回的时候,该 callback 函数会被触发执行。如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步 + 回调的方式来发送消息,配合参数 retries=0,并将发送失败的消息记录到日志文件中;要使用 callback 函数,先要实现 org.apache.kafka.clients.producer.Callback 接口,该接口只有一个 onCompletion 方法。如果发送异常,onCompletion 的参数 Exception e 会为非空。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
同步发送
同步发送,send()方法会返回 Futrue 对象,通过调用 Futrue 对象的 get()方法,等待直到结果返回,根据返回的结果可以判断是否发送成功。如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个 partation 上,结合参数设置 retries 的值让发送失败时重试,设置 max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送 1 个消息,在消息发送成功后立刻 flush,从而控制消息顺序发送。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
try {
RecordMetadata metadata = producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
producer.flush();
producer.close();
在调用 send()方法后再调用 get()方法等待结果返回。如果发送失败会抛出异常,如果发送成功会返回一个 RecordMetadata 对象,然后可以调用 offset()方法获取该消息在当前分区的偏移量。
KafkaProducer 有两种类型的异常,第一种是可以重试的 Retriable,该类异常可以通过重新发送消息解决。例如是连接异常后重新连接、“no leader”异常后重新选取新的 leader。KafkaProducer 可以配置为遇到该类异常后自动重新发送消息直到超过重试次数。第二类是不可重试的,例如是“message size too large”(消息太大),该类异常会马上返回错误。
参考/引用: