博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka提交消息offset
阅读量:4108 次
发布时间:2019-05-25

本文共 1679 字,大约阅读时间需要 5 分钟。

自动提交

 

手动同步提交

while (true) {            ConsumerRecords
records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 }}

手动异步提交

while (true) {            ConsumerRecords
records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); });}

手动提交位移

try {           while(true) {                        ConsumerRecords
records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 commitAysnc(); // 使用异步提交规避阻塞 }} catch(Exception e) { handle(e); // 处理异常} finally { try { consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 } finally { consumer.close();}}

手动自由提交偏移量

private Map
offsets = new HashMap<>();int count = 0;……while (true) { ConsumerRecords
records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord
record: records) { process(record); // 处理消息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0) consumer.commitAsync(offsets, null); // 回调处理逻辑是 null count++; }}

 

转载地址:http://owtsi.baihongyu.com/

你可能感兴趣的文章
Node.js-模块和包
查看>>
(python版)《剑指Offer》JZ01:二维数组中的查找
查看>>
(python版)《剑指Offer》JZ28:数组中出现次数超过一半的数字
查看>>
(python版)《剑指Offer》JZ30:连续子数组的最大和
查看>>
管理用户状态——Cookie与Session
查看>>
通过Spring Boot三分钟创建Spring Web项目
查看>>
Java编程基础:static的用法
查看>>
Java编程基础:抽象类和接口
查看>>
Java编程基础:异常处理
查看>>
Spring MVC中使用Thymeleaf模板引擎
查看>>
Spring处理表单提交
查看>>
Spring MVC异常处理
查看>>
PHP 7 的五大新特性
查看>>
PHP底层的运行机制与原理
查看>>
深入了解php底层机制
查看>>
PHP中的stdClass 【转】
查看>>
XHProf-php轻量级的性能分析工具
查看>>
PHP7新特性 What will be in PHP 7/PHPNG
查看>>
比较strtr, str_replace和preg_replace三个函数的效率
查看>>
PHP编译configure时常见错误 debian centos
查看>>