本文共 1679 字,大约阅读时间需要 5 分钟。
自动提交
手动同步提交
while (true) { ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 }}
手动异步提交
while (true) { ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); });}
手动提交位移
try { while(true) { ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 commitAysnc(); // 使用异步提交规避阻塞 }} catch(Exception e) { handle(e); // 处理异常} finally { try { consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 } finally { consumer.close();}}
手动自由提交偏移量
private Mapoffsets = new HashMap<>();int count = 0;……while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record: records) { process(record); // 处理消息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0) consumer.commitAsync(offsets, null); // 回调处理逻辑是 null count++; }}
转载地址:http://owtsi.baihongyu.com/