• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

[微服务的绊脚石--分布式事务] SEATA解决方案

武飞扬头像
翠云山柠檬丸
帮助1

微服务模式面临的挑战

在微服务大行其道的今天,很多企业都采用了这种架构模式来提供服务。采用微服务模式会带来很多好处,比如高度可扩展,更短的开发周期,更加易于部署,更加开放的技术栈等等。但同样也带来了很多问题,比如我们今天的主题:分布式事务处理(Distributed Transaction Processing, DTP)。

我司的系统采用了微服务架构,将一个庞大的单体系统拆分出了多个独立的微服务,由于存在一些需要跨服务的系统功能,服务间的调用必不可少。那么问题来了,在单体系统中,事务的管理相对简单,在SpringBoot的应用中一个@Transactional注解就可以轻松搞定。而在微服务中,下游的服务出现问题时,上游的服务可能已经提交了事务,如何管理分布式事务成了微服务不得不面对的绊脚石。

应对方法

业界有几种相对成熟的应对之法,下面简单介绍3种:

  • 2PC/XA模式: 二阶段提交(2 Phase Commit)以及基于2PC的XA规范
  • SAGA模式: 长篇小说模式
  • TCC模式: Try, Confirm / Cancel

2PC/XA

在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。2PC认为,当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。因此,二阶段提交的思路可以概括为: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。

XA规范是OpenGroup组织关于分布式事务处理(DTP)的规范。该规范引入了全局的事务管理器(TM) 和局部的资源管理器(RM),类似2PC中的协调者和参与者。XA规范主要定义了二者之间的接口,它采用二阶段提交来保证所有资源同时提交或回滚特定的事务。目前主流数据库都是支持XA协议的,比如Oracle, MySQL, DB2。在Java中使用MysqlXAConnection可以很容易地实现分布式事务处理。

SAGA

SAGA模式最早出现于1987年普林斯顿大学的一篇论文,目的是为了处理计算机系统中的长事务(Long Lived Transactions, LLTs)的问题。 那么究竟多长的事务才算LLT呢,按论文中的说法,长事务是按小时、天计算的事务。

Saga的命名源于单词本身的含义,即长篇小说。核心是将长事务分解为多个子事务的集合,失败时不做回滚,而是采用补偿动作。补偿动作从语义角度撤消了事务Ti的行为,但未必能将数据库返回到执行Ti时的状态。(例如,如果事务触发导弹发射, 则可能无法撤消此操作)

  • 每个Saga由一系列sub-transaction Ti 组成
  • 每个Ti 都有对应的补偿动作Ci,补偿动作用于撤销Ti造成的结果

Saga定义了两种恢复策略:

  • Backward recovery,向后恢复,补偿所有已完成的事务,如果任一子事务失败。执行顺序是T1, T2, ..., Tj, Cj,..., C2, C1,其中j是发生错误的sub-transaction,这种做法的效果是撤销掉之前所有成功的sub-transation,使得整个Saga的执行结果撤销。
  • Forward recovery,向前恢复,重试失败的事务,假设每个子事务最终都会成功。适用于必须要成功的场景,执行顺序是类似于这样的:T1, T2, ..., Tj(失败), Tj(重试),..., Tn,其中j是发生错误的sub-transaction。该情况下不需要Ci。

Saga有两种实现方式

  • Orchestration-based saga中央协调型

学新通

使用中央协调型SAGA创建的EC订单包含以下步骤:

  1. Order Service 接收 POST /orders 请求并创建 Create Order saga Orchestrator
  2. saga 协调器(Orchestrator)创建一个处于 PENDING 状态的订单
  3. 然后它向客户服务发送一个 Reserve Credit 命令
  4. 客户服务部尝试保留Credit
  5. 然后它会发回一条指示结果的回复消息
  6. saga 协调器批准或拒绝订单
  • Choreography-based saga地方自治型

学新通

使用地方自治型SAGA创建的EC订单包含以下步骤:

  1. OrderService接收 POST /orders 请求并创建一个处于 PENDING 状态的订单
  2. 然后它发出一个 Order Created 事件
  3. 客户服务的事件处理程序尝试保留Credit
  4. 然后它发出一个指示结果的事件
  5. OrderService 的事件处理程序批准或拒绝订单

Saga不提供ACID保证,因为原子性和隔离性不能得到满足。原论文描述如下:

full atomicity is not provided. That is, sagas may view the partial results of other sagas

TCC

TCC是三个单词的首字母缩写:Try, Confirm / Cancel。TCC模式需要准备一个协调器(orchestrator)来控制一系列进程,实现对多个资源(服务)的调用控制。 要调用的服务必须实现 Try / Confirm / Cancel 三个 API。

需要注意 TCC 模式可能需要两倍的时间。 这是因为 TCC 模式要对每个服务进行两次通信,并且需要在收到所有服务的Try响应后才开始确认。

TCC 模式的特点是服务会经过一个临时(pending)状态,确认后才进入最终状态,并且取消过程很容易。 例如,电子邮件服务发送请求将电子邮件标记为准备发送,确认请求发送电子邮件。 相应的取消请求只会被标记。 而在 Saga 模式中如果发送一封电子邮件,相应的补偿动作会发送另一封解释取消的电子邮件。

和SAGA对比

Saga相比TCC的缺点是缺少预留动作,导致补偿动作的实现比较麻烦:Ti就是commit,比如一个业务是发送邮件,在TCC模式下,先保存草稿(Try)再发送(Confirm),撤销的话直接删除草稿(Cancel)就行了。而Saga则就直接发送邮件了(Ti),如果要撤销则得再发送一份邮件说明撤销(Ci),实现起来有一些麻烦。

如果把上面的发邮件的例子换成:A服务在完成Ti后立即发送Event到ESB(企业服务总线,可以认为是一个消息中间件),下游服务监听到这个Event做自己的一些工作然后再发送Event到ESB,如果A服务执行补偿动作Ci,那么整个补偿动作的层级就很深。

不过没有预留动作也可以认为是优点:

  • 有些业务很简单,套用TCC需要修改原来的业务逻辑,而Saga只需要添加一个补偿动作就行了。
  • TCC最少通信次数为2n,而Saga为n(n=sub-transaction的数量)。
  • 有些第三方服务没有Try接口,TCC模式实现起来就比较tricky了,而Saga则很简单。
  • 没有预留动作就意味着不必担心资源释放的问题,异常处理起来也更简单(请对比Saga的恢复策略和TCC的异常处理)。

SEATA方案

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。在 Seata 开源之前,Seata 对应的内部版本在阿里经济体内部一直扮演着分布式一致性中间件的角色,帮助经济体平稳的度过历年的双11,对各BU业务进行了有力的支撑。经过多年沉淀与积累,商业化产品先后在阿里云、金融云进行售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助其技术更加可靠与完备。

AT模式

基于SEATA的用户调查,我发现在各大企业中使用最多的还是AT模式。AT这两个字母的含义我猜可能是AlibabaTransaction的缩写吧。AT是基于2PC模式和JDBC实现的。详细的实现原理请参考官方文档

使用前提

  • 基于支持本地 ACID 事务的关系型数据库。
  • Java 应用,通过 JDBC 访问数据库。

整体机制

两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

  • 二阶段:

    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

下面我们来看一个采用AT模式的实例。

服务调用

以经典的EC订单场景为例,Business服务接受用户请求后,调用库存服务扣减库存,然后调用订单服务创建订单,订单服务调用账户服务扣减账户余额。例子中的4个服务属于Seata Client端(其中Business服务是TM,其他服务属于RM)。另外还有一个Seata Server(TC),与TC的交互完全由Seata库负责,业务开发人员并无感知。

sequenceDiagram
User->>BusinessService: GET /purchase/commit
BusinessService->>StorageService: /deduct
StorageService-->>BusinessService: success
BusinessService->>OrderService: /debit
OrderService->>AccountService: /?money=5
AccountService-->>OrderService: success
OrderService-->>BusinessService: success
BusinessService-->>User: true

如果在调用链的某一处发生异常,则需要回滚前面已经执行的步骤。

sequenceDiagram
User->>BusinessService: GET /purchase/rollback
BusinessService->>StorageService: /deduct
StorageService-->>BusinessService: success
BusinessService->>OrderService: /debit
OrderService->>AccountService: /?money=5
note over AccountService: ❌ exception
AccountService-->>OrderService: failure
OrderService-->>BusinessService: failure
par
note over AccountService: rollback
note over OrderService: rollback
note over StorageService: rollback
end
BusinessService-->>User: false

代码实现

首先我们来看BusinessController,它实现了两个API,一个用于正常提交,用户ID是1001,一个用于回滚全局事务,用户ID是1002。

/**
 * 购买下单,模拟全局事务提交
 *
 * @return
 */
@RequestMapping("/purchase/commit")
public Boolean purchaseCommit(HttpServletRequest request) {
    businessService.purchase("1001", "2001", 1);
    return true;
}

/**
 * 购买下单,模拟全局事务回滚
 *
 * @return
 */
@RequestMapping("/purchase/rollback")
public Boolean purchaseRollback() {
    try {
        businessService.purchase("1002", "2001", 1);
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }

    return true;
}

接下来是BusinessService的实现,是不是太简单了!只用了一个@GlobalTransactional就实现了分布式事务处理! 其中xxxClient中利用了RestTemplate来进行远程API访问。

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    LOGGER.info("purchase begin ... xid: "   RootContext.getXID());
    storageClient.deduct(commodityCode, orderCount);
    orderClient.create(userId, commodityCode, orderCount);
}

然后是StorageService的实现,这里只是简单地操作数据库,扣减库存。我们注意到Provider的实现中可以不使用@GlobalTransactional注解。

public void deduct(String commodityCode, int count) {
    //select   for update
    Storage storage = storageMapper.findByCommodityCode(commodityCode);
    storage.setCount(storage.getCount() - count);
    storageMapper.updateById(storage);
}

接下来是OrderService的实现,将创建的订单保存到数据库之后,调用了账户服务。

public void create(String userId, String commodityCode, Integer count) {
    BigDecimal orderMoney = new BigDecimal(count).multiply(new BigDecimal(5));
    Order order = new Order();
    order.setUserId(userId);
    order.setCommodityCode(commodityCode);
    order.setCount(count);
    order.setMoney(orderMoney);

    orderMapper.insert(order);

    accountClient.debit(userId, orderMoney);

}

最后是AccountService的实现,这里扣减了账户余额,另外判断用户ID,如果是ERROR_USER_ID(1002)则抛出异常,用于模拟回滚场景。

public void debit(String userId, BigDecimal num) {
    Account account = accountMapper.selectByUserId(userId);
    account.setMoney(account.getMoney().subtract(num));
    accountMapper.updateById(account);

    if (ERROR_USER_ID.equals(userId)) {
        throw new RuntimeException("account branch exception");
    }
}

需要注意的是,在Common模块中,实现了一个RestTemplate拦截器和一个过滤器。拦截器用于在跨服务调用时,在请求Header中添加事务ID。而拦截器则是在收到请求后,获取Header中的事务ID并绑定到本地。

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {

    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }
}
@Component
public class SeataFilter implements Filter {

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }

}

数据库的定义在local-seata-env/initdb.d/all_in_one.sql中,实例相关代码已经传到Github上,有兴趣的同学可以参考这里: github.com/ningmengwan…

使用 AT 模式需要的注意事项有哪些 ?

  1. 必须使用代理数据源,有 3 种形式可以代理数据源:
  • 依赖 seata-spring-boot-starter 时,自动代理数据源,无需额外处理。
  • 依赖 seata-all 时,使用 @EnableAutoDataSourceProxy (since 1.1.0) 注解,注解参数可选择 jdk 代理或者 cglib 代理。
  • 依赖 seata-all 时,也可以手动使用 DatasourceProxy 来包装 DataSource。
  1. 配置 GlobalTransactionScanner,使用 seata-all 时需要手动配置,使用 seata-spring-boot-starter 时无需额外处理。
  2. 业务表中必须包含单列主键,若存在复合主键,暂时只支持mysql,其他类型数据库建议先建一列自增id主键,原复合主键改为唯一键来规避下。
  3. 每个业务库中必须包含 undo_log 表,若与分库分表组件联用,分库不分表。
  4. 跨微服务链路的事务需要对相应 RPC 框架支持,目前 seata-all 中已经支持:Apache Dubbo、Alibaba Dubbo、sofa-RPC、Motan、gRpc、httpClient,对于 Spring Cloud 的支持,请大家引用 spring-cloud-alibaba-seata。其他自研框架、异步模型、消息消费事务模型请结合 API 自行支持。
  5. 目前AT模式支持的数据库有:MySQL、Oracle、PostgreSQL和 TiDB。
  6. 使用注解开启分布式事务时,若默认服务 provider 端加入 consumer 端的事务,provider 可不标注注解。但是,provider 同样需要相应的依赖和配置,仅可省略注解。
  7. 使用注解开启分布式事务时,若要求事务回滚,必须将异常抛出到事务的发起方,被事务发起方的 @GlobalTransactional 注解感知到。provide 直接抛出异常 或 定义错误码由 consumer 判断再抛出异常。

 AT 模式和 Spring @Transactional 注解连用时需要注意什么 ?

@Transactional 可与 DataSourceTransactionManager 和 JTATransactionManager 连用分别表示本地事务和XA分布式事务,大家常用的是与本地事务结合。当与本地事务结合时,@Transactional和@GlobalTransaction连用,@Transactional 只能位于标注在@GlobalTransaction的同一方法层次或者位于@GlobalTransaction 标注方法的内层。这里分布式事务的概念要大于本地事务,若将 @Transactional 标注在外层会导致分布式事务空提交,当@Transactional 对应的 connection 提交时会报全局事务正在提交或者全局事务的xid不存在。

其他解决方案

我司在采用SEATA之前,也调研过基于Uber Cadence和Kafka实现的SAGA解决方案。Cadence是一个工作流平台,号称可以让你专注于的业务逻辑,把分布式系统的复杂性交给Cadence来处理。Cadence很强大,但是学习成本和定制成本相对较高,不像SEATA这般开箱即用。有兴趣的同学可以了解一下

另外SEATA的SAGA模式也是一个不错的选择,有兴趣的同学可以看看这篇文章,作者是屹远(陈龙),蚂蚁金服分布式事务核心研发,Seata Committer。

下一篇[微服务的绊脚石--分布式事务] Seata-AT模式深入分析中,我们会针对在使用Seata过程中遇到的各种问题,结合当前最新的版本Seata 1.4.2的代码实现,跟大家一起深入了解一下Seata。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhefijfj
系列文章
更多 icon
同类精品
更多 icon
继续加载