[TOC]
一. 什么是幂等性?
幂等性其实是消息的一致性,分为生产者幂等性和消费者幂等性.
使用Kafka时,需要保证exactly-once语义。要知道在分布式系统中,出现网络分区是不可避免的,如果kafka broker 在回复ack时,出现网络故障或者是full gc导致ack timeout,producer将会重发,如何保证producer重试时不造成重复or乱序?又或者producer 挂了,新的producer并没有old producer的状态数据,这个时候如何保证幂等?即使Kafka 发送消息满足了幂等,consumer拉取到消息后,把消息交给线程池workers,workers线程对message的处理可能包含异步操作,
就是用来解决数据重复问题,保证kafka单会话单分区内数据不会重复消费在kafka0.11之前通过isr+ack机制可保证数据不丢,却不能保证不重复 有一些情况可能会导致数据重复。比如:网络请求延时导致的重试操作,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录之前的状态信息),所以就会有可能导致数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)导致的数据重复。
数据重复的解决方案就是加唯一id,通过id判断数据是否重复
原文链接:https://blog.csdn.net/qq_37923600/article/details/88583170
二. 生产者幂等性
保证在发送同一条消息时,在服务端只会被持久化一次,数据不丢不重。
但是是有条件的
- kafka的幂等性只能保证单会话有效,如果broker挂掉重启,幂等就无效了,因为无法获取之前的状态信息
- 幂等性不能跨多个Topic-Partition,只能保证单个partition的幂等性。
所以生产者分为了 幂等型producer 和 事务型producer,前者解决了单会话幂等性等问题,后者解决了多会话幂等性
单回话的意思大概是 发送一次消息表示一次回话吧
2.1 单回话幂等性
为解决producer重试引起的乱序和重复。Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个tp也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存(inSequence方法)。
简单的说,相当于把存一份标识符,来确定是否已生产了
在生产者配置文件中加入配置即可实现:
enable.idempotence=true
参考: https://blog.csdn.net/qq_37923600/article/details/88583170
2.2 多回话幂等性
kafka事务引入了transactionId 和Epoch,设置transactional.id后,一个transactionId只对应一个pid, 且Server 端会记录最新的 Epoch 值。这样有新的producer初始化时,会向TransactionCoordinator发送InitPIDRequest请求, TransactionCoordinator 已经有了这个 transactionId对应的 meta,会返回之前分配的 PID,并把 Epoch 自增 1 返回,这样当old producer恢复过来请求操作时,将被认为是无效producer抛出异常。 如果没有开启事务,TransactionCoordinator会为新的producer返回new pid,这样就起不到隔离效果,因此无法实现多会话幂等。
其实就是利用事务,只要没完成就不会自增(原子性),完成操作后手动提交
2.2.1 实现多会话幂等性
提供了API,使用API即可. 其中分为
- 只有写
- 有写有读(最常见)
- 只有读(没有实际意义,因为只有读不会发生异常)
这里只描述第二种
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();
// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();
// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s
",
record.offset(), record.key(), record.value());
// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}
// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 8.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}
创建消费者代码,需要:
将配置中的自动提交属性(auto.commit)进行关闭
而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
设置isolation.level
/** * 需要: * 1、关闭自动提交 enable.auto.commit * 2、isolation.level为 * @return */ public Consumer buildConsumer() { Properties props = new Properties(); // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); // 消费者群组 props.put("group.id", "group0323"); // 设置隔离级别 props.put("isolation.level","read_committed"); // 关闭自动提交 props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); return consumer; }
更多参见:http://www.heartthinkdo.com/?p=2040
三. 消费者幂等性
指的是即使有重复消息,也不能重复处理
其做法就是做唯一id,相当于天启同步模块中,做个判断,如果项目编号已存在则不做消费处理
if(cache.contain(msgId)){
// cache中包含msgId,已经处理过
continue;
}else {
lock.lock();
cache.put(msgId,timeout);
commitSync();
lock.unLock();
}
// 后续完成所有操作后,删除cache中的msgId,只要msgId存在cache中,就认为已经处理过。Note:需要给cache设置有消息