Offset的默认维护位置

==0.9版本以前==:consumer默认将offset保存在Zookeeper中。

==0.9版本及以后==:consumer默认将offset保存在kafka一个内置的topic中,改topic为__consumer_offsets。

扩展

___consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是【group.id+topic+分区号】,value 就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact(压缩),也就是每个【group.id+topic+分区号】只保留最新数据。

20220423143621.png!

20220423115337.png

自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:

  • ==enable.auto.commit==:是否开启自动提交offset功能,默认是true。
  • ==auto.commit.interval.ms==:自动提交offset的时间间隔,默认是5s。

手动提交offset

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是==commitSync(同步提交)== 和 ==commitAsync(异步提交)==。两者的相同点是,都会将==本次提交的一批数据最高的偏移量提交==;不同点是,==同步提交阻塞当前线程==,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而==异步提交则没有失败重试机制,故有可能提交失败==。

  • ==commitSync(同步提交)==:必须等待offset提交完毕,再去消费下一批数据。
  • ==commitAsync(异步提交)==:发送完提交offset请求后,就开始消费下一批数据了。