[toc]

前言

RocketMQ为我们提供了事务消息的功能,它使得我们投放消息和其他的一些操作保持一个整体的原子性。比如:向数据库中插入数据,再向MQ中投放消息,把这两个动作作为一个原子性的操作。貌似其他的MQ是没有这种功能的。

RocketMQ系列(七)事务消息-阿里云开发者社区 (aliyun.com)

1. 事务消息实现思想

RocketMQ 事务消息的实现原理基于两阶段提交定时事务状态回查来决定消息最终是提交还是回滚

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程

整体流程为:

  • 正常事务发送与提交阶段

    1. 生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

    2. 服务端响应消息写入结果,半消息发送成功

    3. 开始执行本地事务

    4. 根据本地事务的执行状态执行Commit或者Rollback操作

      总结: 先发送MQ,再操作数据库, MQ发送半消息, 等数据库操作完成后,再决定那条半消息是成功还是失败

  • 事务信息的补偿流程

    1. 如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

    2. 生产者收到确认回查请求后,检查本地事务的执行状态

    3. 根据检查后的结果执行Commit或者Rollback操作

      补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

2. RocketMQ事务流程关键

  1. 事务消息在一阶段对用户不可见 事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成 RMQ_SYS_TRANS_HALF_TOPIC (消息体中存储了原来的主题和队列id),这样由于消费者没有订阅这个主题,所以不会被消费。

  2. 如何处理第二阶段的失败消息?

    在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次, 每60s发一次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

  3. 消息状态 事务消息有三种状态: TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息 TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。 TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

配置文件 : org.apache.rocketmq.common.BrokerConfig

/**
     * The minimum time of the transactional message  to be checked firstly, one message only exceed this time interval
     * that can be checked.
     */
    @ImportantField
    private long transactionTimeOut = 6 * 1000;

    /**
     * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
     */
    @ImportantField
    private int transactionCheckMax = 15;

    /**
     * Transaction message check interval.
     */
    @ImportantField
    private long transactionCheckInterval = 60 * 1000;

3. 提交或回滚事务

根据消息所属 的消息队列获取 Broker 的 IP 与端口 信息 ,然后发送结束事务命令 ,其关键就是根据本地执行事务的状态分别发送 **提交 、 回滚或 “不作为”**的命令

如果结束事务动作为提交事务 ,则执行提交事务逻辑,其关键 现如下

  1. 首先从结束事务请求命令中获取消息的物理偏移量( commitlogOffset ),其实现逻辑TransactionalMessageService#.commitMessage 实现

  2. 然后恢复消息的主题 消费队列,构建新的消息对象,由 TransactionalMessageService#endMessageTransaction 实现

  3. 然后将消息再次存储在 commitlog 文件中,此时的消息主题则为业务方发送的消息,将被转发到对应的消息消费队列,供消息消费者消费,其实现由 TransactionalMessageService#sendFinalMessage 实现

  4. 消息存储后,删除 prepare 消息,其实现方法并不是真正的删除,而是将 prepare消息存储到 RMQ_SYS_TRANS_OP_HALF TOPIC 主题中,表示该事务消息( prepare 状态的消息)已经处理过(提交或回滚),为未处理的事务进行事务回查提供查找依据

事务的回滚与提交的唯一差别是无须将消息恢复原主题,直接删除 prepare 消息即可,同样是将预处理消息存储在 RMQ_SYS_TRANS_OP_HALF _TOPIC 主题中,表示已处理过该消息

4. springCloud 整合 RocketMQ

RocketMQ 与 Spring Cloud Stream整合(八、事务消息) - 简书 (jianshu.com)

Spring Cloud Stream + RocketMq实现事务性消息_简单,坚持-CSDN博客

因为 Spring Cloud Stream 在设计时,并没有考虑事务消息, 所以还是用springCloud-rocketMQ的方式

RocketMQ进阶 - 事务消息 - 知乎 (zhihu.com)

RocketMQ入门教程(五):可靠消息最终一致性(事务消息)_monday的博客-CSDN博客

RocketMQ:(6) 事务消息 - 湮天霸神666 - 博客园 (cnblogs.com)

RocketMQ事务型消息 (aliyun.com)

RocketMQ RMQ_SYS_TRANS_HALF_TOPIC 爆掉的问题 | 赵烧鸡腿饭的个人博客 (jingzhouzhao.github.io)