• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spring方法调用异步方法进行事务控制

武飞扬头像
收藏=学会了
帮助1

Spring 异步事务控制

前言:这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async 单独的方法进行事务控制。

Spring 事务源码逻辑

在进行 Spring 异步事务控制编码前,先了解下 Spring 是如何进行事务控制的。

一、事务拦截器拦截

定义了@Transactional的方法会被代理,由代理对象执行方法。会进入TransactionInterceptor#invoke()方法

public Object invoke(MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
	// ...
        }
    });
}

二、进行事务控制

进入invokeWithinTransaction()方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                         final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 获取事务管理器
    final TransactionManager tm = determineTransactionManager(txAttr);
    // ...

    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 定义事务相关信息,如隔离级别、传播机制等
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // 调用被代理方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            // 设置ThraedLocal数据为null
            cleanupTransactionInfo(txInfo);
        }
       	// ...
        
        // 提交/回滚事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
    // ...
}
学新通

进入 createTransactionIfNecessary()方法

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                       @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // ...

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 构建事务状态,如:启动事务、设置数据库连接信息等
            status = tm.getTransaction(txAttr);
        }
        // ...
    }
    // 返回事务信息
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
学新通

getTransaction() 方法

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {

    // 使用默认的事务定义
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
	// 获取事务对象,进入这个方法可以看到事务信息都是存在 ThreadLocal 中的
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // ...
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        
        try {
            // 开启事务
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        }
        // ...
    }
}
学新通

进入 DataSourceTransactionManager # doGetTransaction()方法

protected Object doGetTransaction() {
    // DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

// 进入 TransactionSynchronizationManager # getResource()方法
private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        map.remove(actualKey);
        // Remove entire ThreadLocal if empty...
        if (map.isEmpty()) {
            resources.remove();
        }
        value = null;
    }
    return value;
}

// 观察 TransactionSynchronizationManager 对象
public abstract class TransactionSynchronizationManager {

	private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");

	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<>("Transaction synchronizations");

	private static final ThreadLocal<String> currentTransactionName =
			new NamedThreadLocal<>("Current transaction name");

	private static final ThreadLocal<Boolean> currentTransactionReadOnly =
			new NamedThreadLocal<>("Current transaction read-only status");

	private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
			new NamedThreadLocal<>("Current transaction isolation level");

	private static final ThreadLocal<Boolean> actualTransactionActive =
			new NamedThreadLocal<>("Actual transaction active");
}
学新通

三、事务开启 / 回滚 /提交操作

事务的控制操作都是由 DataSourceTransactionManager 来进行的,如:

  • doBegin:开启事务
  • doCommit:提交事务
  • doRollback:回滚事务

其中主要是从 ThreadLocal 中取出 Connection 对象进行事务控制,以 doCommit 为例:

protected void doCommit(DefaultTransactionStatus status) {
    // 获取连接对象
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction on Connection ["   con   "]");
    }
    try {
        // 提交
        con.commit();
    }
    catch (SQLException ex) {
        throw translateException("JDBC commit", ex);
    }
}

四、事务完成,清除事务信息

在第二步结尾,进入commitTransactionAfterReturning()

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for ["   txInfo.getJoinpointIdentification()   "]");
        }
        // 提交/回滚事务
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        // 回滚
        processRollback(defStatus, false);
        return;
    }

    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        // 回滚
        processRollback(defStatus, true);
        return;
    }
	// 提交
    processCommit(defStatus);
}
学新通

以提交为例,进入processCommit()

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            // 各种异常、判断等前置处理
        }
        finally {
            // 清除事务信息
            cleanupAfterCompletion(status);
        }
    }
}


private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        if (status.isDebug()) {
            logger.debug("Resuming suspended transaction after completion of inner transaction");
        }
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}
学新通

简单总结

由上面的源码可以知道,Spring 的事务控制主要是一下步骤:

  1. 对标注了事务注解的方法进行动态代理
  2. 代理方法的前置处理是获取数据库连接,定义事务信息等,存储在 ThreadLocal 中
  3. 开启事务
  4. 执行方法逻辑
  5. 提交 / 回滚事务
  6. 清除事务信息

异步方法事务控制

这里的异步方法事务控制指的是:A 方法中异步调用 B 方法,使 A 方法和 B 方法的提交 / 回滚能保持一致;而不是对 @Async 单独的方法进行事务控制。

方案一:自身编码控制数据库连接

这个方法是自己结合对 Spring 事务控制理解想到的,许多细节并没有考虑到,只提供一种思路。

说到底 Spring 的事务控制还是基于数据库连接的,只不过它帮助我们简化了操作。所以如果我们自己去维护这个数据库连接,然后再对它进行手动的事务控制即可。

PS:要注意的是,使用 Mybatis 结合 Spring 时,Mybatis 获取的数据库连接,是通过 Spring 获得的,所以如果自己在执行 Mybatis 方法前创建数据库连接,再手动控制是没有用的,因为自己创建的数据库连接和 Mybatis 使用的不是同一个。

由于如上情况,如果完全的靠自己获取事务连接进行事务控制,那么就还需要改写 Mybatis 执行的逻辑,这样就很麻烦。所以这里使用的方案是依旧利用 Spring 获取数据库连接对象,只不过要将这个对象拿出来,自己维护。

编码

通过之前的源码知道,Spring 获取数据库连接获取后,将其存储到 DataSourceTransactionObject 类中的

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection ["   newCon   "] for JDBC transaction");
            }
            // 将获取的 Connection 对象设置到 DataSourceTransactionObject
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }
        // ...
    }
}
学新通

DataSourceTransactionObject 对象是 DataSourceTransactionManager 的私有静态内部类,所以没法在外部使用它,所以这里利用了反射来使用这个对象。

方法 A

需要注意下面的代码获取到数据库连接对象后只进行了简单处理,将其设置到了该类的一个属性中,实际使用需要考虑如何维护问题

public void asyncTrans(Boolean flag) {
    DefaultTransactionStatus status = null;
    try {
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        // 设置事务信息(事务名、传播方式)
        def.setName("SomeTxName");
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        // 获取事务状态
        status = (DefaultTransactionStatus) transactionManager.getTransaction(def);
        // 获取事务信息(实际类型 DataSourceTransactionObject,是DataSourceTransactionManager的私有静态内部类)
        Object transaction = status.getTransaction();
        // 由于是私有静态内部类,所以根据反射调用,获取数据库连接
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(transaction.getClass());

        for (Method method : methods) {
            // 获取数据库连接
            if ("getConnectionHolder".equals(method.getName())) {
                ConnectionHolder connectionHolder = (ConnectionHolder) ReflectionUtils.invokeMethod(method, transaction);
                Connection connection = connectionHolder.getConnection();
                // 将该连接自己维护(这里简单处理,只是设置到了该类的一个属性中,实际使用需要考虑如何维护问题)
                this.connection = connection;
            }
        }

        // 执行方法逻辑
        AreaZoneRel zoneRel = new AreaZoneRel();
        zoneRel.setZoneName("华北");
        zoneRel.setAreaName("北京");
        zoneRelMapper.insert(zoneRel);
        // 执行异步方法
        asyncMethod(flag);

    } catch (Exception ex) {
        // 回滚
        transactionManager.rollback(status);
        throw new RuntimeException(ex);
    } finally {
        status.setCompleted();
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();
        }
    }
}
学新通

异步方法 B

PS:下面针对事务 A 的回滚 / 提交异常只是简单处理,直接将连接关闭(事务会自动回滚),实际使用需要考量。

此外下面的实现有许多细节没有注意,如:A 提交失败,B 的回滚处理问题。

private void asyncMethod(Boolean flag) {
    // 执行异步方法
    Callable callable = () -> {
        try {
            // 子线程本身的事务控制使用Spring的事务控制
            transactionTemplate.execute(status -> {

                AreaGraphRel graphRel = new AreaGraphRel();
                graphRel.setAreaName("上海");
                graphRel.setGraphName("上海.svg");
                graphRelMapper.insert(graphRel);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // 异常模拟
                if (flag) {
                    int a = 1 / 0;
                }
                return null;
            });
        } catch (Exception e) {
            // 如果异步方法抛出异常,则回滚外部线程的事务
            try {
                connection.rollback();
            } catch (Exception exception) {
                // 粗暴处理,异常直接关闭连接
                connection.close();
                throw new RuntimeException(exception);
            }
            throw new RuntimeException(e);
        }
        try {
            // 提交
            connection.commit();
        } catch (Exception exception) {
            // 粗暴处理,异常直接关闭连接
            connection.close();
            throw new RuntimeException(exception);
        }
        return 200;
    };
    FutureTask task = new FutureTask(callable);
    new Thread(task).start();
}
学新通

方案二:基于快照实现

这个方案是根据 Seata AT 模式的思路实现的

这种方式就是对需要进行控制的方法生成数据快照,如:将 A 方法提交事务前数据信息记录下来,作为前一个版本的数据快照;这时执行 B 方法时,当 B 方法执行失败,其事务回滚,那么就根据快照将 A 方法的数据更新为上一个版本。

PS:这里有如下几点需要注意:

  1. 当利用快照回滚时,需要考虑是否要保证当前数据信息和快照修改后的数据一致问题。
    因为如果在基于快照进行更新时,该数据被其他调用进行数据修改,此时再基于快照更新数据则是将期间其他的正常操作结果都覆盖了,那么要考虑这样是否会对系统数据造成影响。
  2. 当利用快照回滚时,调用失败的异常处理。
    如果是上面 1 比较不一致导致回滚异常,那么只能通过告警,人为处理;如果是超时等情况可以利用 MQ 等消息中间件进行不断的失败重试,最后再结合异常报警通知,来通知人手动处理。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgjcfaj
系列文章
更多 icon
同类精品
更多 icon
继续加载