[TOC]

一. 什么是幂等性?

幂等性其实是消息的一致性,分为生产者幂等性和消费者幂等性.

使用Kafka时,需要保证exactly-once语义。要知道在分布式系统中,出现网络分区是不可避免的,如果kafka broker 在回复ack时,出现网络故障或者是full gc导致ack timeout,producer将会重发,如何保证producer重试时不造成重复or乱序?又或者producer 挂了,新的producer并没有old producer的状态数据,这个时候如何保证幂等?即使Kafka 发送消息满足了幂等,consumer拉取到消息后,把消息交给线程池workers,workers线程对message的处理可能包含异步操作,

来自 https://www.cnblogs.com/jingangtx/p/11330338.html

就是用来解决数据重复问题,保证kafka单会话单分区内数据不会重复消费在kafka0.11之前通过isr+ack机制可保证数据不丢,却不能保证不重复 有一些情况可能会导致数据重复。比如:网络请求延时导致的重试操作,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录之前的状态信息),所以就会有可能导致数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)导致的数据重复。

数据重复的解决方案就是加唯一id,通过id判断数据是否重复

原文链接:https://blog.csdn.net/qq_37923600/article/details/88583170

二. 生产者幂等性

保证在发送同一条消息时,在服务端只会被持久化一次,数据不丢不重。

但是是有条件的

  1. kafka的幂等性只能保证单会话有效,如果broker挂掉重启,幂等就无效了,因为无法获取之前的状态信息
  2. 幂等性不能跨多个Topic-Partition,只能保证单个partition的幂等性。

所以生产者分为了 幂等型producer事务型producer,前者解决了单会话幂等性等问题,后者解决了多会话幂等性

单回话的意思大概是 发送一次消息表示一次回话吧

2.1 单回话幂等性

为解决producer重试引起的乱序和重复。Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个tp也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存(inSequence方法)。

简单的说,相当于把存一份标识符,来确定是否已生产了

在生产者配置文件中加入配置即可实现: enable.idempotence=true

参考: https://blog.csdn.net/qq_37923600/article/details/88583170

2.2 多回话幂等性

kafka事务引入了transactionId 和Epoch,设置transactional.id后,一个transactionId只对应一个pid, 且Server 端会记录最新的 Epoch 值。这样有新的producer初始化时,会向TransactionCoordinator发送InitPIDRequest请求, TransactionCoordinator 已经有了这个 transactionId对应的 meta,会返回之前分配的 PID,并把 Epoch 自增 1 返回,这样当old producer恢复过来请求操作时,将被认为是无效producer抛出异常。 如果没有开启事务,TransactionCoordinator会为新的producer返回new pid,这样就起不到隔离效果,因此无法实现多会话幂等。

其实就是利用事务,只要没完成就不会自增(原子性),完成操作后手动提交

2.2.1 实现多会话幂等性

提供了API,使用API即可. 其中分为

  1. 只有写
  2. 有写有读(最常见)
  3. 只有读(没有实际意义,因为只有读不会发生异常)

这里只描述第二种

/**
     * 在一个事务内,即有生产消息又有消费消息
     */
    public void consumeTransferProduce() {
        // 1.构建上产者
        Producer producer = buildProducer();
        // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
        producer.initTransactions();

        // 3.构建消费者和订阅主题
        Consumer consumer = buildConsumer();
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            // 4.开启事务
            producer.beginTransaction();

            // 5.1 接受消息
            ConsumerRecords<String, String> records = consumer.poll(500);

            try {
                // 5.2 do业务逻辑;
                System.out.println("customer Message---");
                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
                for (ConsumerRecord<String, String> record : records) {
                    // 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s
",
                            record.offset(), record.key(), record.value());

                    // 5.2.2 记录提交的偏移量
                    commits.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset()));


                    // 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
                    producer.send(new ProducerRecord<String, String>("test", "data2"));
                }

                // 7.提交偏移量
                producer.sendOffsetsToTransaction(commits, "group0323");

                // 8.事务提交
                producer.commitTransaction();

            } catch (Exception e) {
                // 7.放弃事务
                producer.abortTransaction();
            }
        }
    }

创建消费者代码,需要:

  • 将配置中的自动提交属性(auto.commit)进行关闭

  • 而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )

  • 设置isolation.level

    /**
     * 需要:
     * 1、关闭自动提交 enable.auto.commit
     * 2、isolation.level为
     * @return
     */
    public Consumer buildConsumer() {
        Properties props = new Properties();
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", "localhost:9092");
        // 消费者群组
        props.put("group.id", "group0323");
        // 设置隔离级别
        props.put("isolation.level","read_committed");
        // 关闭自动提交
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer
                <String, String>(props);
    
        return consumer;
    }
    

更多参见:http://www.heartthinkdo.com/?p=2040

三. 消费者幂等性

指的是即使有重复消息,也不能重复处理

其做法就是做唯一id,相当于天启同步模块中,做个判断,如果项目编号已存在则不做消费处理

if(cache.contain(msgId)){
  // cache中包含msgId,已经处理过
        continue;
}else {
  lock.lock();
  cache.put(msgId,timeout);
  commitSync();
  lock.unLock();
}
// 后续完成所有操作后,删除cache中的msgId,只要msgId存在cache中,就认为已经处理过。Note:需要给cache设置有消息

参考: https://www.cnblogs.com/jingangtx/p/11330338.html