[TOC]

前言

一图解读分布式事务

在这里插入图片描述

名词解释

  • 事务:事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特性,即原子性、一致性、隔离性和持久性。
  • 本地事务:当事务由资源管理器本地管理时被称作本地事务。本地事务的优点就是支持严格的ACID特性,高效,可靠,状态可以只在资源管理器中维护,而且应用编程模型简单。但是本地事务不具备分布式事务的处理能力,隔离的最小单位受限于资源管理器。
  • 全局事务:当事务由全局事务管理器进行全局管理时成为全局事务,事务管理器负责管理全局的事务状态和参与的资源,协同资源的一致提交回滚。
  • TX协议:应用或者应用服务器与事务管理器的接口。
  • XA协议:全局事务管理器与资源管理器的接口。XA是由X/Open组织提出的分布式事务规范。该规范主要定义了全局事务管理器和局部资源管理器之间的接口。主流的数据库产品都实现了XA接口。XA接口是一个双向的系统接口,在事务管理器以及多个资源管理器之间作为通信桥梁。之所以需要XA是因为在分布式系统中从理论上讲两台机器是无法达到一致性状态的,因此引入一个单点进行协调。由全局事务管理器管理和协调的事务可以跨越多个资源和进程。全局事务管理器一般使用XA二阶段协议与数据库进行交互。
  • AP:应用程序,可以理解为使用DTP(Data Tools Platform)的程序。
  • RM:资源管理器,这里可以是一个DBMS或者消息服务器管理系统,应用程序通过资源管理器对资源进行控制,资源必须实现XA定义的接口。资源管理器负责控制和管理实际的资源。
  • TM:事务管理器,负责协调和管理事务,提供给AP编程接口以及管理资源管理器。事务管理器控制着全局事务,管理事务的生命周期,并且协调资源。
  • 两阶段提交协议:XA用于在全局事务中协调多个资源的机制。TM和RM之间采取两阶段提交的方案来解决一致性问题。两节点提交需要一个协调者(TM)来掌控所有参与者(RM)节点的操作结果并且指引这些节点是否需要最终提交。两阶段提交的局限在于协议成本,准备阶段的持久成本,全局事务状态的持久成本,潜在故障点多带来的脆弱性,准备后,提交前的故障引发一系列隔离与恢复难题。
  • BASE理论:BA指的是基本业务可用性,支持分区失败,S表示柔性状态,也就是允许短时间内不同步,E表示最终一致性,数据最终是一致的,但是实时是不一致的。原子性和持久性必须从根本上保障,为了可用性、性能和服务降级的需要,只有降低一致性和隔离性的要求。
  • CAP定理:对于共享数据系统,最多只能同时拥有CAP其中的两个,任意两个都有其适应的场景,真是的业务系统中通常是ACID与CAP的混合体。分布式系统中最重要的是满足业务需求,而不是追求高度抽象,绝对的系统特性。C表示一致性,也就是所有用户看到的数据是一样的。A表示可用性,是指总能找到一个可用的数据副本。P表示分区容错性,能够容忍网络中断等故障。

CAP定理

CAP定理是由加州大学伯克利分校Eric Brewer教授提出来的,他指出WEB服务无法同时满足一下3个属性:

  • 一致性(Consistency) : 意味着所有客户端无论连接到哪个节点,都能在同一时间看到相同的数据。为此,每当数据写入一个节点时,必须立即将其转发或复制到系统中的所有其他节点,然后才会视之为“成功”写入
  • 可用性(Availability) : 意味着任何发出数据请求的客户端都会得到响应,即使一个或多个节点发生故障也是如此
  • 分区容错性(Partition tolerance) : 系统存在网络分区的情况下,仍然可以接受请求(即满足一致性和可用性)。网络分区指的是由于网络抖动等等原因网络被分成若干个孤立的区域,而区域之间互不相通。分区容错性可理解为系统对结点动态加人和离开的处理能力,因为结点的加入和离开可认为是集群内部的网络分区

什么是 CAP 定理?| IBM

CAP中的分区容错性和可用性有什么区别? - 知乎 (zhihu.com)

具体地讲在分布式系统中,一个Web应用至多只能同时支持上面的两个属性。

在这里插入图片描述

CAP 权衡

通过 CAP 理论,我们知道无法同时满足一致性、可用性和分区容错性这三个特性,那要舍弃哪个呢?

对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到 N 个 9,即保证 P 和 A,舍弃C(退而求其次保证最终一致性)。虽然某些地方会影响客户体验,但没达到造成用户流程的严重程度。

对于涉及到钱财这样不能有一丝让步的场景,C 必须保证。网络发生故障宁可停止服务,这是保证 CA,舍弃 P。貌似这几年国内银行业发生了不下 10 起事故,但影响面不大,报道也不多,广大群众知道的少。还有一种是保证 CP,舍弃 A。例如网络故障是只读不写。

BASE定理

CAP是分布式系统设计理论,BASE是CAP理论中AP方案的延伸,对于C我们采用的方式和策略就是保证最终一致性;

eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(StrongConsistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。

BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的缩写。BASE基于CAP定理演化而来,核心思想是即时无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

分布式事务分类

刚性事务

刚性事务:通常无业务改造,强一致性,原生支持回滚/隔离性,低并发,适合短事务。

原则:刚性事务满足足CAP的CP理论

刚性事务指的是,要使分布式事务,达到像本地式事务一样,具备数据强一致性,从CAP来看,就是说,要达到CP状态。

刚性事务:XA 协议(2PC、JTA、JTS)、3PC,但由于同步阻塞,处理效率低,不适合大型网站分布式场景。

柔性事务

柔性事务指的是,不要求强一致性,而是要求最终一致性,允许有中间状态,也就是Base理论,换句话说,就是AP状态。

与刚性事务相比,柔性事务的特点为:有业务改造,最终一致性,实现补偿接口,实现资源锁定接口,高并发,适合长事务。

柔性事务分为:

  • 补偿型
  • 异步确保型
  • 最大努力通知型。

柔型事务:TCC/FMT、Saga(状态机模式、Aop模式)、本地事务消息、消息事务(半消息)

刚性事务:XA

X/Open DTP(Distributed Transaction Process) 是一个分布式事务模型。这个模型主要使用了两段提交(2PC - Two-Phase-Commit)来保证分布式事务的完整性。

在X/Open **DTP(Distributed Transaction Process)**模型里面,有三个角色:

AP: Application,应用程序。也就是业务层。哪些操作属于一个事务,就是AP定义的。

TM: Transaction Manager,事务管理器。接收AP的事务请求,对全局事务进行管理,管理事务分支状态,协调RM的处理,通知RM哪些操作属于哪些全局事务以及事务分支等等。这个也是整个事务调度模型的核心部分。

RM:Resource Manager,资源管理器。一般是数据库,也可以是其他的资源管理器,如消息队列(如JMS数据源),文件系统等。

在这里插入图片描述

XA之所以需要引入事务管理器是因为,在分布式系统中,从理论上讲(参考Fischer等的论文),两台机器理论上无法达到一致的状态,需要引入一个单点进行协调。事务管理器控制着全局事务,管理事务生命周期,并协调资源。资源管理器负责控制和管理实际资源(如数据库或JMS队列)

XA规范

XA规范(XA Specification) 是X/OPEN 提出的分布式事务处理规范。XA则规范了TM与RM之间的通信接口,在TM与多个RM之间形成一个双向通信桥梁,从而在多个数据库资源下保证ACID四个特性。目前知名的数据库,如Oracle, DB2,mysql等,都是实现了XA接口的,都可以作为RM。

XA协议的实现

2PC/3PC协议

两阶段提交(2PC)协议是XA规范定义的 数据一致性协议。

三阶段提交(3PC)协议对 2PC协议的一种扩展。

Seata

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式

Seata AT 模式

Seata AT 模式是增强型2pc模式。

AT 模式: 两阶段提交协议的演变,没有一直锁表

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
  • 二阶段:提交异步化,非常快速地完成。或回滚通过一阶段的回滚日志进行反向补偿

XA的主要限制

  • 必须要拿到所有数据源,而且数据源还要支持XA协议。目前MySQL中只有InnoDB存储引擎支持XA协议。
  • 性能比较差,要把所有涉及到的数据都要锁定,是强一致性的,会产生长事务。

2PC(标准XA模型)

2PC即Two-Phase Commit,二阶段提交。

2PC节点角色

二阶段提交协议将节点分为:

  • 协调者角色(事务管理器Coordinator)
  • 参与者角色(资源管理器Participant)

2PC角色中,事务管理器的角色,负责协调多个数据库(资源管理器)的事务,

协调者角色(事务管理器Coordinator),负责向参与者发送指令,收集参与者反馈,做出提交或者回滚决策

参与者角色(资源管理器Participant),接收协调者的指令执行事务操作,向协调者反馈操作结果,并继续执行协调者发送的最终指令

详解:两个阶段

广泛应用在数据库领域,为了使得基于分布式架构的所有节点可以在进行事务处理时能够保持原子性和一致性。

顾名思义,2PC分为两个阶段处理,

  • 阶段一:提交事务请求、
  • 阶段二:执行事务提交,或者执行中断事务;

如果阶段一超时或者出现异常,2PC的阶段二为:执行中断事务

说明:绝大部分关系型数据库,都是基于2PC完成分布式的事务处理。

阶段一:提交事务请求

  1. 事务询问。协调者向所有参与者发送事务内容,询问是否可以执行提交操作,并开始等待各参与者进行响应;
  2. 执行事务。各参与者节点,执行事务操作,并将Undo和Redo操作计入本机事务日志;
  3. 各参与者向协调者反馈事务问询的响应。成功执行返回Yes,否则返回No。

阶段二:提交或者中断事务

这一阶段包含两种情形:

  • 执行事务提交,
  • 执行中断事务

协调者在阶段二决定是否最终执行事务提交操作。

  • 所有参与者reply Yes,那么执行事务提交。
  • 当存在某一参与者向协调者发送No响应,或者等待超时。协调者只要无法收到所有参与者的Yes响应,就会中断事务。
执行事务提交

所有参与者reply Yes,那么执行事务提交。

  1. 发送提交请求。协调者向所有参与者发送Commit请求;
  2. 事务提交。参与者收到Commit请求后,会正式执行事务提交操作,并在完成提交操作之后,释放在整个事务执行期间占用的资源;
  3. 反馈事务提交结果。参与者在完成事务提交后,写协调者发送Ack消息确认;
  4. 完成事务。协调者在收到所有参与者的Ack后,完成事务。

事务提交示意图

执行事务中断

事情总会出现意外,当存在某一参与者向协调者发送No响应,或者等待超时。协调者只要无法收到所有参与者的Yes响应,就会中断事务。

  1. 发送回滚请求。协调者向所有参与者发送Rollback请求;
  2. 回滚。参与者收到请求后,利用本机Undo信息,执行Rollback操作。并在回滚结束后释放该事务所占用的系统资源;
  3. 反馈回滚结果。参与者在完成回滚操作后,向协调者发送Ack消息;
  4. 中断事务。协调者收到所有参与者的回滚Ack消息后,完成事务中断。

事务中断示意图

2PC二阶段提交的特点

2PC 方案比较适合单体应用里,跨多个库的分布式事务,而且因为严重依赖于数据库层面来搞定复杂的事务,效率很低

2PC具有明显的优缺点:

优点:

  • 主要体现在实现原理简单;

缺点比较多:

  1. 同步阻塞导致性能问题

    执行过程中,所有参与节点都是事务阻塞型的。所有participant 都处于阻塞状态,各个participant 都在等待其他参与者响应,无法进行其他操作。

    所有分支的资源锁定时间,由最长的分支事务决定。

    另外当参与者锁定公共资源时,处于事务之外的其他第三方访问者,也不得不处于阻塞状态。

  2. 单点故障导致高可用(HA)问题:

    协调者是个单点,一旦出现问题,各个participant 将无法释放事务资源,也无法完成事务操作;

    并且,由于协调者的重要性,一旦协调者发生故障,参与者会一直阻塞下去。

    尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。

  3. 丢失消息导致的数据不一致问题:

    如果协调者向所有参与者发送Commit请求后,发生局部网络异常,或者协调者在尚未给全部的participant发送完Commit请求即出现崩溃,最终导致只有部分participant收到、执行请求。部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据部一致性的现象。

  4. 过于保守:

    二阶段提交协议没有设计较为完善的容错机制,任意一个节点的失败都会导致整个事务的失败

在这里插入图片描述

3PC

针对2PC的缺点,研究者提出了3PC,即Three-Phase Commit。

作为2PC的改进版,3PC将原有的两阶段过程,重新划分为CanCommit、PreCommit和do Commit三个阶段。

3PC 协议将 2PC 协议的准备阶段一分为二,从而形成了三个阶段:

在这里插入图片描述

详解:三个阶段

所谓的三个阶段分别是:询问,然后再锁资源,最后真正提交

  • 第一阶段:CanCommit
  • 第二阶段:PreCommit
  • 第三阶段:Do Commit

三阶段提交示意图

阶段一:CanCommit

  1. 事务询问。协调者向所有参与者发送包含事务内容的canCommit的请求,询问是否可以执行事务提交,并等待应答;
  2. 各参与者反馈事务询问。正常情况下,如果参与者认为可以顺利执行事务,则返回Yes,否则返回No。

在这里插入图片描述

阶段二:PreCommit

在本阶段,协调者会根据上一阶段的反馈情况来决定是否可以执行事务的PreCommit操作。有以下两种可能:

  • 执行事务预提交
  • 中断事务
执行事务预提交
  1. 发送预提交请求。协调者向所有节点发出PreCommit请求,并进入prepared阶段;
  2. 事务预提交。参与者收到PreCommit请求后,会开始事务操作,并将Undo和Redo日志写入本机事务日志;
  3. 各参与者成功执行事务操作,同时将反馈以Ack响应形式发送给协调者,同事等待最终的Commit或Abort指令。

在这里插入图片描述

中断事务

如果任意一个参与者向协调者发送No响应,或者等待超时,协调者在没有得到所有参与者响应时,即可以中断事务。

中断事务的操作为:

  1. 发送中断请求。 协调者向所有参与者发送Abort请求;
  2. 中断事务。无论是participant 收到协调者的Abort请求,还是participant 等待协调者请求过程中出现超时,参与者都会中断事务;

coordinator发送Abort的两个场景

场景1: 任意一个参与者向协调者发送No响应

场景2: 协调者在没有得到所有参与者响应时

在这里插入图片描述

阶段三:doCommit

在这个阶段,会真正的进行事务提交,同样存在两种可能。

  • 执行提交
  • 回滚事务
执行提交
  1. coordinator发送提交请求。假如coordinator协调者收到了所有参与者的Ack响应,那么将从预提交转换到提交状态,并向所有参与者,发送doCommit请求;
  2. 事务提交。参与者收到doCommit请求后,会正式执行事务提交操作,并在完成提交操作后释放占用资源;
  3. 反馈事务提交结果。参与者将在完成事务提交后,向协调者发送Ack消息;
  4. 完成事务。协调者接收到所有参与者的Ack消息后,完成事务。

在这里插入图片描述

回滚事务

在该阶段,假设正常状态的协调者接收到任一个参与者发送的No响应,或在超时时间内,仍旧没收到反馈消息,就会回滚事务:

  1. 发送中断请求。协调者向所有的参与者发送rollback请求;
  2. 事务回滚。参与者收到rollback请求后,会利用阶段二中的Undo消息执行事务回滚,并在完成回滚后释放占用资源;
  3. 反馈事务回滚结果。参与者在完成回滚后向协调者发送Ack消息;
  4. 回滚事务。协调者接收到所有参与者反馈的Ack消息后,完成事务回滚。

在这里插入图片描述

2PC和3PC的区别:

三阶段提交协议在协调者和参与者中都引入 超时机制,并且把两阶段提交协议的第一个阶段拆分成了两步:询问,然后再锁资源,最后真正提交。

三阶段提交的三个阶段分别为:can_commit,pre_commit,do_commit。

img

在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rollback请求时,会在等待超时之后,继续进行事务的提交。

其实这个应该是基于概率来决定的,

当进入第三阶段时,说明参与者在第二阶段已经收到了PreCommit请求,什么场景会产生PreCommit请求呢?

协调者产生PreCommit请求的前提条件比较严格:是在第二阶段开始之前,收到所有参与者的CanCommit响应都是Yes。

所以,一旦参与者收到了PreCommit,意味他知道大家其实都同意修改了

一句话概括就是:

当进入第三阶段时,由于网络超时/网络分区等原因,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大

3PC主要解决的单点故障问题:

相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,

因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态

由于在docommit阶段,participant参与者如果超时,能自己决定提交本地事务,所以,3pc没有2pc那么保守或者说悲观,或者说3pc更加的乐观

3PC主要没有解决的数据一致性问题:

但是这种机制,还是有数据一致性问题,或者说,没有彻底解决数据一致性问题。

‘因为,由于网络原因,协调者发送的rollback命令没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。

这样就和其他接到rollback命令并执行回滚的参与者之间存在数据不一致的情况。

“3PC相对于2PC而言到底优化了什么地方呢?”

相比较2PC而言,3PC对于协调者(Coordinator)和参与者(Participant)都设置了超时时间,而2PC只有协调者才拥有超时机制。

这解决了一个什么问题呢?

这个优化点,主要是避免了Participant 参与者在长时间无法与协调者节点通讯(协调者挂掉了)的情况下,无法释放资源的问题,

因为参与者自身拥有超时机制会在超时后,自动进行本地commit从而进行释放资源。

而这种机制也侧面降低了整个事务的阻塞时间和范围。

另外,通过CanCommit、PreCommit、DoCommit三个阶段的设计,相较于2PC而言,多设置了一个缓冲阶段保证了在最后提交阶段之前各参与节点的状态是一致的。

以上就是3PC相对于2PC的一个提高(相对缓解了2PC中的前两个问题),但是3PC依然没有完全解决数据不一致的问题。

假如在 DoCommit 过程,参与者A无法接收协调者的通信,那么参与者A会自动提交,但是提交失败了,其他参与者成功了,此时数据就会不一致。

柔性事务的分类

在电商领域等互联网场景下,刚性事务在数据库性能和处理能力上都暴露出了瓶颈。

柔性事务有两个特性:基本可用和柔性状态。

  • 基本可用是指分布式系统出现故障的时候允许损失一部分的可用性。
  • 柔性状态是指允许系统存在中间状态,这个中间状态不会影响系统整体的可用性,比如数据库读写分离的主从同步延迟等。柔性事务的一致性指的是最终一致性。

柔性事务主要分为补偿型通知型

补偿型事务又分:TCC、Saga;

通知型事务分:MQ事务消息、最大努力通知型。

补偿型事务都是同步的,通知型事务都是异步的。

通知型事务

通知型事务的主流实现是通过MQ(消息队列)来通知其他事务参与者自己事务的执行状态,引入MQ组件,有效的将事务参与者进行解耦,各参与者都可以异步执行,所以通知型事务又被称为异步事务

通知型事务主要适用于那些需要异步更新数据,并且对数据的实时性要求较低的场景,主要包含:

异步确保型事务最大努力通知事务两种。

  • 异步确保型事务:主要适用于内部系统的数据最终一致性保障,因为内部相对比较可控,如订单和购物车、收货与清算、支付与结算等等场景;
  • 最大努力通知:主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致性,比如充值平台与运营商、支付对接等等跨网络系统级别对接;

在这里插入图片描述

异步确保型事务

指将一系列同步的事务操作修改为基于消息队列异步执行的操作,来避免分布式事务中同步阻塞带来的数据操作性能的下降。

MQ事务消息方案

基于MQ的事务消息方案主要依靠MQ的半消息机制来实现投递消息和参与者自身本地事务的一致性保障。半消息机制实现原理其实借鉴的2PC的思路,是二阶段提交的广义拓展。

半消息:在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知MQ发送;

有一些第三方的MQ是支持事务消息的,这些消息队列,支持半消息机制,比如RocketMQ,ActiveMQ。但是有一些常用的MQ也不支持事务消息,比如 RabbitMQ 和 Kafka 都不支持。

在这里插入图片描述

  1. 事务发起方首先发送半消息到MQ;
  2. MQ通知发送方消息发送成功;
  3. 在发送半消息成功后执行本地事务;
  4. 根据本地事务执行结果返回commit或者是rollback;
  5. 如果消息是rollback, MQ将丢弃该消息不投递;如果是commit,MQ将会消息发送给消息订阅方;
  6. 订阅方根据消息执行本地事务;
  7. 订阅方执行本地事务成功后再从MQ中将该消息标记为已消费;
  8. 如果执行本地事务过程中,执行端挂掉,或者超时,MQ服务器端将不停的询问producer来获取事务状态;
  9. Consumer端的消费成功机制有MQ保证;

本地消息表方案

有时候我们目前的MQ组件并不支持事务消息,或者我们想尽量少的侵入业务方。这时我们需要另外一种方案“基于DB本地消息表“。

本地消息表最初由eBay 提出来解决分布式事务的问题。是目前业界使用的比较多的方案之一,它的核心思想就是将分布式事务拆分成本地事务进行处理。

img

发送消息方:

  • 需要有一个消息表,记录着消息状态相关信息。
  • 业务数据和消息表在同一个数据库,要保证它俩在同一个本地事务。直接利用本地事务,将业务数据和事务消息直接写入数据库。
  • 在本地事务中处理完业务数据和写消息表操作后,通过写消息到 MQ 消息队列。使用专门的投递工作线程进行事务消息投递到MQ,根据投递ACK去删除事务消息表记录
  • 消息会发到消息消费方,如果发送失败,即进行重试。

消息消费方:

  • 处理消息队列中的消息,完成自己的业务逻辑。
  • 如果本地事务处理成功,则表明已经处理成功了。
  • 如果本地事务处理失败,那么就会重试执行。
  • 如果是业务层面的失败,给消息生产方发送一个业务补偿消息,通知进行回滚等操作。

生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

本地消息表优缺点:

优点:

  • 本地消息表建设成本比较低,实现了可靠消息的传递确保了分布式事务的最终一致性。
  • 无需提供回查方法,进一步减少的业务的侵入。
  • 在某些场景下,还可以进一步利用注解等形式进行解耦,有可能实现无业务代码侵入式的实现。

缺点:

  • 本地消息表与业务耦合在一起,难于做成通用性,不可独立伸缩。
  • 本地消息表是基于数据库来做的,而数据库是要读写磁盘IO的,因此在高并发下是有性能瓶颈的

MQ事务消息 VS 本地消息表

二者的共性: 1、 事务消息都依赖MQ进行事务通知,所以都是异步的。 2、 事务消息在投递方都是存在重复投递的可能,需要有配套的机制去降低重复投递率,实现更友好的消息投递去重。 3、 事务消息的消费方,因为投递重复的无法避免,因此需要进行消费去重设计或者服务幂等设计。

二者的区别:

MQ事务消息:

  • 需要MQ支持半消息机制或者类似特性,在重复投递上具有比较好的去重处理;
  • 具有比较大的业务侵入性,需要业务方进行改造,提供对应的本地操作成功的回查功能;

DB本地消息表:

  • 使用了数据库来存储事务消息,降低了对MQ的要求,但是增加了存储成本;
  • 事务消息使用了异步投递,增大了消息重复投递的可能性;

在这里插入图片描述

最大努力通知

最大努力通知方案的目标,就是发起通知方通过一定的机制,最大努力将业务处理结果通知到接收方。

最大努力通知型的最终一致性:

本质是通过引入定期校验机制实现最终一致性,对业务的侵入性较低,适合于对最终一致性敏感度比较低、业务链路较短的场景。

最大努力通知事务主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致性,比如充值平台与运营商、支付对接、商户通知等等跨平台、跨企业的系统间业务交互场景

异步确保型事务主要适用于内部系统的数据最终一致性保障,因为内部相对比较可控,比如订单和购物车、收货与清算、支付与结算等等场景。

普通消息是无法解决本地事务执行和消息发送的一致性问题的。因为消息发送是一个网络通信的过程,发送消息的过程就有可能出现发送失败、或者超时的情况。超时有可能发送成功了,有可能发送失败了,消息的发送方是无法确定的,所以此时消息发送方无论是提交事务还是回滚事务,都有可能不一致性出现。

所以,通知型事务的难度在于: 投递消息和参与者本地事务的一致性保障

因为核心要点一致,都是为了保证消息的一致性投递,所以,最大努力通知事务在投递流程上跟异步确保型是一样的,因此也有两个分支

  • 基于MQ自身的事务消息方案
  • 基于DB的本地事务消息表方案

最大努力通知事务在于第三方系统的对接,所以最大努力通知事务有几个特性:

  • 业务主动方在完成业务处理后,向业务被动方(第三方系统)发送通知消息,允许存在消息丢失。
  • 业务主动方提供递增多挡位时间间隔(5min、10min、30min、1h、24h),用于失败重试调用业务被动方的接口;在通知N次之后就不再通知,报警+记日志+人工介入。
  • 业务被动方提供幂等的服务接口,防止通知重复消费。
  • 业务主动方需要有定期校验机制,对业务数据进行兜底;防止业务被动方无法履行责任时进行业务回滚,确保数据最终一致性。

MQ事务消息方案

要实现最大努力通知,可以采用 MQ 的 ACK 机制。

最大努力通知事务在投递之前,跟异步确保型流程都差不多,关键在于投递后的处理。

因为异步确保型在于内部的事务处理,所以MQ和系统是直连并且无需严格的权限、安全等方面的思路设计。

img

  1. 业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
  2. 主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。
  3. 主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息。
  4. 业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。
  5. 如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。

本地消息表方案

要实现最大努力通知,可以采用 定期检查本地消息表的机制 。

在这里插入图片描述

发送消息方:

  • 需要有一个消息表,记录着消息状态相关信息。
  • 业务数据和消息表在同一个数据库,要保证它俩在同一个本地事务。直接利用本地事务,将业务数据和事务消息直接写入数据库。
  • 在本地事务中处理完业务数据和写消息表操作后,通过写消息到 MQ 消息队列。使用专门的投递工作线程进行事务消息投递到MQ,根据投递ACK去删除事务消息表记录
  • 消息会发到消息消费方,如果发送失败,即进行重试。
  • 生产方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

最大努力通知事务 VS 异步确保型事务

最大努力通知事务在我认知中,其实是基于异步确保型事务发展而来适用于外部对接的一种业务实现。他们主要有的是业务差别,如下: • 从参与者来说:最大努力通知事务适用于跨平台、跨企业的系统间业务交互;异步确保型事务更适用于同网络体系的内部服务交付。 • 从消息层面说:最大努力通知事务需要主动推送并提供多档次时间的重试机制来保证数据的通知;而异步确保型事务只需要消息消费者主动去消费。 • 从数据层面说:最大努力通知事务还需额外的定期校验机制对数据进行兜底,保证数据的最终一致性;而异步确保型事务只需保证消息的可靠投递即可,自身无需对数据进行兜底处理。

通知型事务的问题

通知型事务,是无法解决本地事务执行和消息发送的一致性问题的。

因为消息发送是一个网络通信的过程,发送消息的过程就有可能出现发送失败、或者超时的情况。超时有可能发送成功了,有可能发送失败了,消息的发送方是无法确定的,所以此时消息发送方无论是提交事务还是回滚事务,都有可能不一致性出现。

消息发送一致性

消息中间件在分布式系统中的核心作用就是异步通讯、应用解耦和并发缓冲(也叫作流量削峰)。在分布式环境下,需要通过网络进行通讯,就引入了数据传输的不确定性,也就是CAP理论中的分区容错性。

1233356-44395b81aed884dd.png

消息发送一致性是指产生消息的业务动作与消息发送动作一致,也就是说如果业务操作成功,那么由这个业务操作所产生的消息一定要发送出去,否则就丢失。所以,需要借助半消息、本地消息表,保障一致性。

消息重复发送问题和业务接口幂等性设计

1233356-7f06f451b8bfdb8c.png

对于未确认的消息,采用按规则重新投递的方式进行处理。

对于以上流程,消息重复发送会导致业务处理接口出现重复调用的问题。消息消费过程中消息重复发送的主要原因就是消费者成功接收处理完消息后,消息中间件没有及时更新投递状态导致的。如果允许消息重复发送,那么消费方应该实现业务接口的幂等性设计。

补偿型

什么是补偿模式?

补偿模式使用一个额外的协调服务来协调各个需要保证一致性的业务服务,协调服务按顺序调用各个业务微服务,如果某个业务服务调用异常(包括业务异常和技术异常)就取消之前所有已经调用成功的业务服务。

补偿模式大致有TCC,和Saga两种细分的方案

TCC 事务模型

什么是TCC 事务模型

TCC(Try-Confirm-Cancel)的概念来源于 Pat Helland 发表的一篇名为“Life beyond Distributed Transactions:an Apostate’s Opinion”的论文。

TCC 分布式事务模型包括三部分:

1.主业务服务:主业务服务为整个业务活动的发起方,服务的编排者,负责发起并完成整个业务活动。

2.从业务服务:从业务服务是整个业务活动的参与方,负责提供 TCC 业务操作,实现初步操作(Try)、确认操作(Confirm)、取消操作(Cancel)三个接口,供主业务服务调用。

3.业务活动管理器:业务活动管理器管理控制整个业务活动,包括记录维护 TCC 全局事务的事务状态和每个从业务服务的子事务状态,并在业务活动提交时调用所有从业务服务的 Confirm 操作,在业务活动取消时调用所有从业务服务的 Cancel 操作。

TCC 提出了一种新的事务模型,基于业务层面的事务定义,锁粒度完全由业务自己控制,目的是解决复杂业务中,跨表跨库等大颗粒度资源锁定的问题。

TCC 把事务运行过程分成 Try、Confirm / Cancel 两个阶段,每个阶段的逻辑由业务代码控制,避免了长事务,可以获取更高的性能。

TCC的工作流程

TCC(Try-Confirm-Cancel)分布式事务模型相对于 XA 等传统模型,其特征在于它不依赖资源管理器(RM)对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务

TCC 模型认为对于业务系统中一个特定的业务逻辑,其对外提供服务时,必须接受一些不确定性,即对业务逻辑初步操作的调用仅是一个临时性操作,调用它的主业务服务保留了后续的取消权。如果主业务服务认为全局事务应该回滚,它会要求取消之前的临时性操作,这就对应从业务服务的取消操作。而当主业务服务认为全局事务应该提交时,它会放弃之前临时性操作的取消权,这对应从业务服务的确认操作。每一个初步操作,最终都会被确认或取消。

TCC 分布式事务模型包括三部分:

image.png

Try 阶段: 调用 Try 接口,尝试执行业务,完成所有业务检查,预留业务资源。

Confirm 或 Cancel 阶段: 两者是互斥的,只能进入其中一个,并且都满足幂等性,允许失败重试。

Confirm 操作: 对业务系统做确认提交,确认执行业务操作,不做其他业务检查,只使用 Try 阶段预留的业务资源。

Cancel 操作: 在业务执行错误,需要回滚的状态下执行业务取消,释放预留资源。

Try 阶段失败可以 Cancel,如果 Confirm 和 Cancel 阶段失败了怎么办?

TCC 中会添加事务日志,如果 Confirm 或者 Cancel 阶段出错,则会进行重试,所以这两个阶段需要支持幂等;如果重试失败,则需要人工介入进行恢复和处理等。

TCC事务模型的要求:

  1. 可查询操作:服务操作具有全局唯一的标识,操作唯一的确定的时间。
  2. 幂等操作:重复调用多次产生的业务结果与调用一次产生的结果相同。一是通过业务操作实现幂等性,二是系统缓存所有请求与处理的结果,最后是检测到重复请求之后,自动返回之前的处理结果。
  3. TCC操作:Try阶段,尝试执行业务,完成所有业务的检查,实现一致性;预留必须的业务资源,实现准隔离性。Confirm阶段:真正的去执行业务,不做任何检查,仅适用Try阶段预留的业务资源,Confirm操作还要满足幂等性。Cancel阶段:取消执行业务,释放Try阶段预留的业务资源,Cancel操作要满足幂等性。TCC与2PC(两阶段提交)协议的区别:TCC位于业务服务层而不是资源层,TCC没有单独准备阶段,Try操作兼备资源操作与准备的能力,TCC中Try操作可以灵活的选择业务资源,锁定粒度。TCC的开发成本比2PC高。实际上TCC也属于两阶段操作,但是TCC不等同于2PC操作。
  4. 可补偿操作:Do阶段:真正的执行业务处理,业务处理结果外部可见。Compensate阶段:抵消或者部分撤销正向业务操作的业务结果,补偿操作满足幂等性。约束:补偿操作在业务上可行,由于业务执行结果未隔离或者补偿不完整带来的风险与成本可控。实际上,TCC的Confirm和Cancel操作可以看做是补偿操作。

TCC与2PC对比

TCC其实本质和2PC是差不多的:

  • T就是Try,两个C分别是Confirm和Cancel。
  • Try就是尝试,请求链路中每个参与者依次执行Try逻辑,如果都成功,就再执行Confirm逻辑,如果有失败,就执行Cancel逻辑。

TCC与XA两阶段提交有着异曲同工之妙,下图列出了二者之间的对比

img

  1. 在阶段1:
  • 在XA中,各个RM准备提交各自的事务分支,事实上就是准备提交资源的更新操作(insert、delete、update等);
  • 而在TCC中,是主业务活动请求(try)各个从业务服务预留资源。
  1. 在阶段2:
  • XA根据第一阶段每个RM是否都prepare成功,判断是要提交还是回滚。如果都prepare成功,那么就commit每个事务分支,反之则rollback每个事务分支。
  • TCC中,如果在第一阶段所有业务资源都预留成功,那么confirm各个从业务服务,否则取消(cancel)所有从业务服务的资源预留请求。

TCC和2PC不同的是:

  • XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。基于数据库锁实现,需要数据库支持XA协议,由于在执行事务的全程都需要对相关数据加锁,一般高并发性能会比较差
  • TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁,性能较好。但是对微服务的侵入性强,微服务的每个事务都必须实现try、confirm、cancel等3个方法,开发成本高,今后维护改造的成本也高为了达到事务的一致性要求,try、confirm、cancel接口必须实现幂等性操作由于事务管理器要记录事务日志,必定会损耗一定的性能,并使得整个TCC事务时间拉长

TCC它会弱化每个步骤中对于资源的锁定,以达到一个能承受高并发的目的(基于最终一致性)。

SAGA长事务模型

SAGA可以看做一个异步的、利用队列实现的补偿事务。

Saga相关概念

Saga模型是把一个分布式事务拆分为多个本地事务,每个本地事务都有相应的执行模块和补偿模块(对应TCC中的Confirm和Cancel),当Saga事务中任意一个本地事务出错时,可以通过调用相关的补偿方法恢复之前的事务,达到事务最终一致性。

这样的SAGA事务模型,是牺牲了一定的隔离性和一致性的,但是提高了long-running事务的可用性。

Saga 模型由三部分组成

  • LLT(Long Live Transaction):由一个个本地事务组成的事务链**。**
  • 本地事务:事务链由一个个子事务(本地事务)组成,LLT = T1+T2+T3+…+Ti。
  • 补偿:每个本地事务 Ti 有对应的补偿 Ci。

Saga的执行顺序有两种:

  • T1, T2, T3, …, Tn
  • T1, T2, &mldr;, Tj, Cj,&mldr;, C2, C1,其中0 < j < n

Saga 两种恢复策略:

  • 向后恢复(Backward Recovery):撤销掉之前所有成功子事务。如果任意本地子事务失败,则补偿已完成的事务。如异常情况的执行顺序T1,T2,T3,..Ti,Ci,&mldr;C3,C2,C1。

在这里插入图片描述

  • 向前恢复(Forward Recovery):即重试失败的事务,适用于必须要成功的场景,该情况下不需要Ci。执行顺序:T1,T2,&mldr;,Tj(失败),Tj(重试),&mldr;,Ti。

显然,向前恢复没有必要提供补偿事务,如果你的业务中,子事务(最终)总会成功,或补偿事务难以定义或不可能,向前恢复更符合你的需求。

Saga的使用条件

Saga看起来很有希望满足我们的需求。所有长活事务都可以这样做吗?这里有一些限制:

  1. Saga只允许两个层次的嵌套,顶级的Saga和简单子事务
  2. 在外层,全原子性不能得到满足。也就是说,sagas可能会看到其他sagas的部分结果
  3. 每个子事务应该是独立的原子行为
  4. 在我们的业务场景下,各个业务环境(如:航班预订、租车、酒店预订和付款)是自然独立的行为,而且每个事务都可以用对应服务的数据库保证原子操作。

补偿也有需考虑的事项:

  • 补偿事务从语义角度撤消了事务Ti的行为,但未必能将数据库返回到执行Ti时的状态。(例如,如果事务触发导弹发射, 则可能无法撤消此操作)

但这对我们的业务来说不是问题。其实难以撤消的行为也有可能被补偿。例如,发送电邮的事务可以通过发送解释问题的另一封电邮来补偿。

SAGA模型的解决方案

SAGA模型的核心思想是,通过某种方案,将分布式事务转化为本地事务,从而降低问题的复杂性。

比如以DB和MQ的场景为例,业务逻辑如下:

  1. 向DB中插入一条数据。
  2. 向MQ中发送一条消息。

由于上述逻辑中,对应了两种存储端,即DB和MQ,所以,简单的通过本地事务是无法解决的。那么,依照SAGA模型,可以有两种解决方案。

方案一:半消息模式。

RocketMQ新版本中,就支持了这种模式。

方案二:本地消息表

在DB中,新增一个消息表,用于存放消息。如下:

  1. 在DB业务表中插入数据。
  2. 在DB消息表中插入数据。
  3. 异步将消息表中的消息发送到MQ,收到ack后,删除消息表中的消息。

Saga和TCC对比

Saga和TCC都是补偿型事务,他们的区别为:

劣势:

  • Saga无法保证隔离性;

优势:

  • 一阶段提交本地事务,无锁,高性能;
  • 事件驱动模式,参与者可异步执行,高吞吐;
  • Saga 对业务侵入较小,只需要提供一个逆向操作的Cancel即可;而TCC需要对业务进行全局性的流程改造;
  • TCC最少通信次数为2n,而Saga为n(n=sub-transaction的数量)。
  • 有些第三方服务没有Try接口,TCC模式实现起来就比较tricky了,而Saga则很简单。

实际案例

背景

下单过程中, 需要创建订单-扣库存-使用优惠券-扣除G豆等等操作, 整个过程需要使用分布式事务(因为订单和这些业务不在同一个数据库中), 此处采用 TCC + 本地消息表 来保证事务, 再加上MQ来异步解耦

虽然该例子不是完美解决方案, 但其是一个可落地方案

方案大致描述

  1. 先行预扣 库存/优惠券等 订单前置事务因素, 如果失败就写入本地消息表, 并发送mq(异步回退锁定资源) - Try

    库存采用 锁定库存的方式 (锁定库存与可用库存)

    优惠券采用了 分布式锁的方式(也可直接落库)

    库存等资源不够或者网络异常等问题, 只要是预扣不正常都会写入消息表并发送mq

  2. 如果预扣全部成功, 则订单正常流转完成.

    该案例中, 订单状态的变更亦是异步处理, 因为前置因素处理完, 则订单是允许生成的

    所以消费时要注意区分 回退和订单状态处理

  3. 确认收款后, 预扣资源才会真正扣除, 此处也采用MQ解耦 - 此代码与下单类似,不重复描述 - Confirm

  4. 消费MQ数据,异步回退锁定的资源, 并修改本地消息表的数据 - cancal

    考虑代码的幂等性和并发性

  5. 定时扫描本地消息表, 做数据兜底, 处理丢mq消息或者写数据库失败的场景 - 此代码暂未完全实现

CREATE TABLE `order_create_message_0` (
  `id` bigint NOT NULL,
  `user_id` bigint NOT NULL COMMENT '用户id',
  `order_id` bigint NOT NULL COMMENT '订单号',
  `type` tinyint DEFAULT '1' COMMENT '类型 1-创建订单 2-确认收款',
  `message_status` tinyint NOT NULL DEFAULT '0' COMMENT '消息状态(0:未处理,1:处理完成)',
  `step_status` char(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '步骤状态(A:未处理,S:处理成功,F:处理失败)(第1位:订单;第2位:库存;第3位:优惠券;第4位:g豆;第5位:兑换卡)',
  `num` int NOT NULL DEFAULT '0' COMMENT '消息处理次数',
  `created_time` datetime DEFAULT NULL COMMENT '创建时间',
  `created_by` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '创建人',
  `modify_time` datetime DEFAULT NULL COMMENT '更新时间',
  `modify_by` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '修改人',
  `enable_flag` tinyint DEFAULT NULL COMMENT '是否可用 10-可用 20-删除',
  `version` bigint DEFAULT NULL COMMENT '版本号--乐观锁预留字段',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='创建订单消息表'

核心代码

订单处理类

 public ServerResult createOrder(CreateOrderDTO createOrderDTO) {

     ....// 业务代码处理

         // 查询库存/优惠券等资源, 查看是否充足等校验

          // 先在redis把优惠券锁定住
        log.debug("用户{}创建订单,冻结优惠券", createOrderDTO.getUserId());
        Map<String, String> lockedCoupon = lockUseCoupon(createOrderDTO.getUserId(), couponInfoMap, couponIdList);

     ....// 业务代码处理

           // 先将订单预写到订单数据库,理论上这段保存逻辑也可以往后写的,但是为了方便追踪问题,先把订单先给预写进订单库
        transactionHelper.run(() -> {
           orderService.saveBatch(orderDoList);
           orderItemService.saveBatch(allOrderItemDOList);
        });

      // 处理扣减订单的资源:库存、优惠券、g豆、兑换卡等
        processOrder(createOrderDTO, couponInfoMap, parentOrderId, orderDoList, allOrderItemDOList);


 }


    /**
     * 锁定用户下单使用的优惠券
     *
     * @param userId        用户id
     * @param couponInfoMap 优惠券信息Map
     * @param couponIdList  优惠券id列表
     * @return {@link Map}<{@link String}, {@link String}>
     */
    private Map<String, String> lockUseCoupon(String userId, Map<String, CouponInfoDTO> couponInfoMap, List<String> couponIdList) {
        if (CollUtil.isNotEmpty(couponIdList)) {
            return new HashMap<>();
        }
        Map<String, String> lockedCouponIdMap = new HashMap<>();
        for (String couponId : couponIdList) {
            CouponInfoDTO couponInfoDTO = couponInfoMap.get(couponId);
            couponInfoDTO.setCouponCode(couponInfoDTO.getCouponCodeList().get(0));
            String lockUserCouponKey = getLockUserCouponKey(userId, couponInfoDTO.getCouponCode());
            String value = IdUtil.fastUUID();
            boolean lock = redisUtil.tryLock(lockUserCouponKey, value, refreshScopeManage.getLockUserCouponTime());
            if (!lock) {
                // 锁失败了,尽快的解锁已经锁定的券
                orderPoolTaskExecutor.execute(() -> unLockInLockedCoupon(lockedCouponIdMap));
                // 正在使用
                throw new BusinessException(ServerResultCode.ORDER_COUPON_IN_USE);
            }
            lockedCouponIdMap.put(lockUserCouponKey, value);
        }
        return lockedCouponIdMap;
    }


/**
     * 处理扣减订单的资源
     * 1. 扣减库存
     * 2. 扣减 用户优惠券
     * 3. 扣减g豆、积分
     * 4. 扣减兑换卡
     *
     * @param createOrderDTO     创建订单dto
     * @param couponInfoMap      优惠券信息Map
     * @param parentOrderId      父订单id
     * @param orderDoList        订单做列表
     * @param allOrderItemDOList 所有订单项list
     */
    private void processOrder(CreateOrderVo createOrderDTO, Map<String, CouponInfoDTO> couponInfoMap,
                              long parentOrderId, List<OrderDO> orderDoList, List<OrderItemDO> allOrderItemDOList) {
        // 第1位:订单;第2位:库存;第3位:优惠券;第4位:g豆;第5位:兑换卡
        boolean occupyStock = false;
        boolean occupyCoupon = false;
        boolean occupyGBean = false;
        boolean occupyCard = false;
        try {
            // 预扣库存
            List<SkuInventoryDTO> withholdingList = allOrderItemDOList.stream()
                    .map(orderItemDO -> SkuInventoryDTO.builder().skuId(orderItemDO.getSkuId()).withholding(orderItemDO.getQuantity()).build())
                    .collect(Collectors.toList());
            ServerResult withholdingServerResult = skuInventoryServiceFeign.withholding(SkuInventoryBatchDTO.builder().parentOrderId(parentOrderId).list(withholdingList).build());
            occupyStock = true;
            if (withholdingServerResult.checkNotSuccess()) {
                log.error("用户{}创建订单异常,父订单号{},批量扣减库存失败", createOrderDTO.getUserId(), parentOrderId);
                // 批量扣减失败,库存的回退由商品服务保证回退成功
                throw new BusinessException(withholdingServerResult);
            }
            // 扣减优惠券
            if (CollUtil.isNotEmpty(couponInfoMap)) {
                boolean batchUpdateCoupon = batchDeductionCoupon(createOrderDTO, couponInfoMap, parentOrderId);
                occupyCoupon = true;
                if (!batchUpdateCoupon) {
                    log.error("用户{}创建订单异常,父订单号{},批量扣减优惠券失败", createOrderDTO.getUserId(), parentOrderId);
                    throw new BusinessException(ServerResultCode.ORDER_COUPON_UPDATE_STATE_FAILED);
                }
            }
            // 处理g豆
            BigDecimal gBeanAmount = StringUtils.isBlank(createOrderDTO.getGBeansAmount()) ? BigDecimal.ZERO : new BigDecimal(createOrderDTO.getGBeansAmount());
            if (gBeanAmount.compareTo(BigDecimal.ZERO) > 0) {
                // 使用了G豆
                boolean useGbean = deduceUseGbean(createOrderDTO);
                occupyGBean = true;
                if (useGbean) {
                    log.error("用户{}创建订单异常,父订单号{},扣减G豆失败", createOrderDTO.getUserId(), parentOrderId);
                    throw new BusinessException(ServerResultCode.ORDER_DEDUCE_GBEAN_ERROR);
                }
            }
            // 处理兑换卡
            if (StringUtils.isNotBlank(createOrderDTO.getCardAmount()) && StringUtils.isNotBlank(createOrderDTO.getCardId())) {
                // 使用了兑换卡
                boolean useCard = deduceUseCard(createOrderDTO);
                occupyCard = true;
                if (useCard) {
                    log.error("用户{}创建订单异常,父订单号{},扣减兑换卡失败", createOrderDTO.getUserId(), parentOrderId);
                    throw new BusinessException(ServerResultCode.ORDER_DEDUCE_CARD_ERROR);
                }
            }
            // 下单成功,对应的优惠资产、库存、g豆已经扣减
            OrderCreateMessageDO orderCreateMessageDO = buildOrderCreateMessageDO(parentOrderId, Long.parseLong(createOrderDTO.getUserId()));
            transactionHelper.run(() -> {
                // 预创建(用户不可见) 转换为 待付款(用户可见)
                orderMapper.updateBatchByIdAndUserId(orderDoList, Long.parseLong(createOrderDTO.getUserId()), GetTableNameUtil.getOrderTableName(Long.parseLong(createOrderDTO.getUserId())));
                orderItemMapper.updateBatchByIdAndUserId(allOrderItemDOList, Long.parseLong(createOrderDTO.getUserId()), GetTableNameUtil.getOrderItemTableName(Long.parseLong(createOrderDTO.getUserId())));
                orderCreateMessageService.save(orderCreateMessageDO);
            });
            // 同步到ES
            orderListenerService.onCreateOrderSuccess(orderDoList,allOrderItemDOList);
            // 发送mq消息 -> 处理订单状态(上面线程处理可能失败)
            rocketMQTemplate.syncSend(refreshScopeManage.getOrderGlobalAsyncTopicName(), JSON.toJSONString(orderCreateMessageDO));
        } catch (Exception e) {
            log.error(StrFormatter.format("用户{}创建订单异常,父订单号{},失败原因", createOrderDTO.getUserId(), parentOrderId), e);
            rollBackGlobalTransaction(parentOrderId, Long.parseLong(createOrderDTO.getUserId()), occupyStock, occupyCoupon, occupyGBean, occupyCard);
            if (e instanceof BusinessException) {
                throw e;
            }
            throw new BusinessException("创建订单异常");
        }
    }

   /**
     * 全局事务回退
     *
     * @param parentOrderId 父订单id
     * @param userId        用户id
     * @param occupyStock   占用库存
     * @param occupyCoupon  占领优惠券
     * @param occupyGbean   占领gbean
     * @param occupyCard    占领卡
     */
    public void rollBackGlobalTransaction(long parentOrderId, long userId, boolean occupyStock, boolean occupyCoupon, boolean occupyGbean, boolean occupyCard) {
        OrderCreateMessageDO orderCreateMessageDO = buildOrderCreateMessageDO(parentOrderId, userId);
        // 默认不需要回退
        // 第1位:订单;第2位:库存;第3位:优惠券;第4位:g豆;第5位:兑换卡
        StringBuilder sb = new StringBuilder("SSSSS");
        if (occupyStock) {
            // 占用了库存,需要回退库存
            sb.replace(1, 2, "A");
        }
        if (occupyCoupon) {
            // 占用了优惠券,需要回退优惠券
            sb.replace(2, 3, "A");
        }
        if (occupyGbean) {
            // 占用了G豆,需要回退G豆
            sb.replace(3, 4, "A");
        }
        if (occupyCard) {
            // 占用了兑换卡,需要回退兑换卡
            sb.replace(4, 5, "A");
        }
        orderCreateMessageDO.setStepStatus(sb.toString());
        // 发送mq消息,异步回退 库存、 优惠券、g豆、兑换卡
        try {
            rocketMQTemplate.syncSend(refreshScopeManage.getOrderGlobalAsyncTopicName(), JSON.toJSONString(orderCreateMessageDO));
        } catch (Exception e) {
            // 尽可能的把消息投递出去
            log.error(StrFormatter.format("订单号{},生产消息失败", parentOrderId), e);
        }
        orderCreateMessageService.save(orderCreateMessageDO);
    }


    /**
     * 预扣库存,即占用库存 +n, 可用库存 -n
     *
     * @param query SkuInventoryBatchDTO
     * @return ServerResult
     */
    @Override
    public ServerResult withholding(SkuInventoryBatchDTO query) {
        log.info("订单:{} 开始预扣库存", query.getParentOrderId());
        LocalDateTime now = LocalDateTime.now();
        // 构建扣减日志列表
        List<SkuInventoryChangeRecordDO> logs = query.getList()
                .stream()
                .map(sku -> SkuInventoryChangeRecordDO.builder()
                        .parentOrderId(query.getParentOrderId())
                        .skuId(sku.getSkuId())
                        .quantity(sku.getWithholding())
                        .status(CommodityConstant.WITHHOLDING_SKU_INVENTORY_SUCCESS)
                        .enableFlag(CommodityConstant.ENABLE)
                        .createTime(now)
                        .modifyTime(now)
                        .version(1L)
                        .build())
                .collect(Collectors.toList());

        List<SkuInventoryDO> list = getInventoryBySkuId(query);
        Map<Long, Integer> withholdingMap = query.getList().stream()
                .collect(Collectors.toMap(SkuInventoryDTO::getSkuId, SkuInventoryDTO::getWithholding));

        for (SkuInventoryDO inventoryDO : list) {
            int withholding = withholdingMap.get(inventoryDO.getSkuId());
            if (inventoryDO.getAvailable() < withholding) {
                return ServerResult.fail(ServerResultCode.WITHHOLDING_SKU_INVENTORY_ERROR);
            }
        }

        transactionHelper.run(() -> {
            for (SkuInventoryDO inventoryDO : list) {
                int withholding = withholdingMap.get(inventoryDO.getSkuId());
                boolean updateResult = lambdaUpdate()
                        .eq(SkuInventoryDO::getId, inventoryDO.getId())
                        .eq(SkuInventoryDO::getSkuId, inventoryDO.getSkuId())
                        .eq(SkuInventoryDO::getLocked, inventoryDO.getLocked())
                        .eq(SkuInventoryDO::getEnableFlag, CommodityConstant.ENABLE)
                        .ge(SkuInventoryDO::getAvailable, withholding)
                        .set(SkuInventoryDO::getLocked, inventoryDO.getLocked() + withholding)
                        .set(SkuInventoryDO::getAvailable, inventoryDO.getAvailable() - withholding)
                        .set(SkuInventoryDO::getModifyTime, now)
                        .update();
                if (!updateResult) {
                    log.error("订单:{} skuId:{} 预扣库存:{} 失败, 开始回退库存", query.getParentOrderId(), inventoryDO.getSkuId(), withholding);
                    throw new BusinessException(ServerResultCode.WITHHOLDING_SKU_INVENTORY_ERROR);
                }
                // 短信提醒
                skuInventoryEarlyWarningMessageProducer.send(generateMessageDTO(inventoryDO));
            }
            skuInventoryChangeRecordService.saveBatch(logs);
        });

        return ServerResult.simpleSuccess();
    }

异步处理回退资源类

MQ 的消费者异步处理


    @Override
    public void onMessage(OrderCreateMessageDO orderCreateMessage) {
        log.info("开始异步处理下单信息:父订单号{}", orderCreateMessage.getOrderId());
        // 步骤状态(A:未处理,S:处理成功,F:处理失败)
        // (第1位:订单;第2位:库存;第3位:店铺优惠券;第4位:g豆;第5位:兑换卡)
        String stepStatus = orderCreateMessage.getStepStatus().trim();
        if (orderCreateMessage.getMessageStatus() == 1 && stepStatus.equals("SSSSS")) {
            log.info("开始异步处理下单信息:父订单号{},订单已经被处理,暂不需要处理", orderCreateMessage.getOrderId());
            return;
        }
        // 处理失败次数过多,可能需要人工干预
        if (orderCreateMessage.getNum() > 20) {
            // TODO: 2022/2/25  人工干预
            return;
        }
        String uuid = IdUtil.simpleUUID();
        String key = LOCK_KEY + ":" + orderCreateMessage.getOrderId();
        // todo 锁时间需要改成动态配置的
        boolean lock = redisUtil.tryLock(key, uuid, 20000);
        if (!lock) {
            log.warn("开始异步处理下单信息:父订单号{},已经有消费者在处理当前订单,放弃本次处理", orderCreateMessage.getOrderId());
            return;
        }
        // 查询出父订单下的所有子订单
        LambdaQueryWrapper<OrderDO> orderQueryWrapper = Wrappers.lambdaQuery();
        orderQueryWrapper.eq(OrderDO::getParentOrderId, orderCreateMessage.getOrderId())
                .eq(OrderDO::getUserId,orderCreateMessage.getUserId());
        List<OrderDO> subOrderDOList = orderService.list(orderQueryWrapper);
        // 查询出子订单下的所有订单行
        List<Long> orderIds = subOrderDOList.stream().map(OrderDO::getId).collect(Collectors.toList());
        LambdaQueryWrapper<OrderItemDO> orderItemDOWrapper = Wrappers.lambdaQuery();
        orderItemDOWrapper.in(OrderItemDO::getOrderId, orderIds);
        List<OrderItemDO> orderItemDOList = orderItemService.list(orderItemDOWrapper);
        StringBuilder sb = new StringBuilder(stepStatus);
        try {
            // oms
            if (sb.charAt(0) == 'A') {
                // 如果订单状态全是0,表示下单失败,不同步oms
                boolean orderFailed = subOrderDOList.stream().noneMatch(orderDO -> orderDO.getState() > 0);
                if (orderFailed) {
                    // 同步oms
                }
                sb.replace(0, 1, "S");
            }
            // 库存
            if (sb.charAt(1) == 'A') {
                List<SkuInventoryDTO> skuInventoryDTOList = orderItemDOList.stream()
                        .map(orderItemDO -> SkuInventoryDTO.builder().skuId(orderItemDO.getSkuId()).returnLocked(orderItemDO.getQuantity()).build())
                        .collect(Collectors.toList());
                ServerResult inventoryResult = skuInventoryServiceFeign.returnLocked(SkuInventoryBatchDTO.builder().list(skuInventoryDTOList).build());
                if (inventoryResult.checkSuccess()) {
                    sb.replace(1, 2, "S");
                }
            }
            // 优惠券
            if (sb.charAt(2) == 'A') {

            }
        } finally {
            redisUtil.unLock(key, uuid);
        }

        orderCreateMessage.setMessageStatus("SSSSSS".equals(sb.toString()) ? 1 : 0);
        orderCreateMessage.setStepStatus(sb.toString());
        orderCreateMessage.setNum(orderCreateMessage.getNum() + 1);
        LambdaUpdateWrapper<OrderCreateMessageDO> updateWrapper = Wrappers.lambdaUpdate(OrderCreateMessageDO.class);
        updateWrapper.eq(OrderCreateMessageDO::getId,orderCreateMessage.getId())
                .eq(OrderCreateMessageDO::getUserId,orderCreateMessage.getUserId());

        orderCreateMessageService.update(orderCreateMessage,updateWrapper);
    }

分布式事务( 图解 + 秒懂 + 史上最全 ) - 疯狂创客圈 - 博客园 (cnblogs.com)