• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

seata源码全局事务id是传递的

武飞扬头像
Java识堂
帮助1

学新通

通过Dubbo方式进行调用

想在Dubbo应用之间进行参数传递,其实非常简单。通过Dubbo提供的隐式传递功能即可实现。使用方式如下所示。

// 1. A服务设置参数
RpcContext.getContext().setAttachment("name", "小明");

// 2. A服务通过RPC调用B服务

// 3. B服务可以获取到A服务设置的参数
RpcContext.getContext().getAttachment("name");

所以我们可以把全局事务id放在RpcContext(rpc)中,然后在下游取出来。但是seata是从RootContext取出事务id的,我们如何先把事务id从RpcContext取出来然后再放到RootContext中呢?其实可以用到Dubbo Fillter
学新通
我们先看一下Dubbo Filter在哪个环节起作用,当我们调用远程方法的时候,实际上是通过代理对象调用的,将调用信息通过网络发送到服务端,服务端也是通过代理对象来接收请求的,然后根据请求调用服务端的方法,并返回结果。

而Dubbo Filter的作用则可以让用户在发送请求之前或者执行本地方法之前执行用户自定义的逻辑,增加可扩展性

所以我们可以在Consumer端通过Filter将事务id放在RpcContext中,在Provider端将事务id从RpcContext中取出来然后放入到RootContext中

@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)
public class ApacheDubboTransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        BranchType branchType = RootContext.getBranchType();

        String rpcXid = getRpcXid();
        String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
        }
        boolean bind = false;
        // 事务id为空,说明是在客户端调用,将事务id设置进去
        if (xid != null) {
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
            RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
        } else {
        	// rpcXid不为空,说明这是一个服务端的方法,将上游传递下来的事务id绑定到上下文
            if (rpcXid != null) {
            	// 将事务id绑定到上下文中
                RootContext.bind(rpcXid);
                if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
                    RootContext.bindBranchType(BranchType.TCC);
                }
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
                }
            }
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            if (bind) {
                BranchType previousBranchType = RootContext.getBranchType();
				// 将事务id解绑
                String unbindXid = RootContext.unbind();
                if (BranchType.TCC == previousBranchType) {
                    RootContext.unbindBranchType();
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
                            rpcBranchType != null ? rpcBranchType : "AT", previousBranchType);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
                        if (BranchType.TCC == previousBranchType) {
                            RootContext.bindBranchType(BranchType.TCC);
                            LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);
                        }
                    }
                }
            }
        }
    }

    /**
     * get rpc xid
     * @return
     */
    private String getRpcXid() {
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (rpcXid == null) {
            rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
        }
        return rpcXid;
    }

}
学新通

通过Http方式进行调用

  1. 当通过feign调用时,客户端在请求head中设置xid
  2. 服务端从请求head中获取到xid并设置到RootContext中

客户端在head中设置xid

// 这个类在spring-cloud-starter-alibaba-seata包中
public class SeataFeignClient implements Client {

	SeataFeignClient(BeanFactory beanFactory, Client delegate) {
		this.delegate = delegate;
		this.beanFactory = beanFactory;
	}

	@Override
	public Response execute(Request request, Request.Options options) throws IOException {

		Request modifiedRequest = getModifyRequest(request);
		return this.delegate.execute(modifiedRequest, options);
	}

	private Request getModifyRequest(Request request) {

		String xid = RootContext.getXID();

		if (StringUtils.isEmpty(xid)) {
			return request;
		}

		Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
		headers.putAll(request.headers());

		List<String> seataXid = new ArrayList<>();
		seataXid.add(xid);
		// 往head中设置xid
		headers.put(RootContext.KEY_XID, seataXid);

		return Request.create(request.method(), request.url(), headers, request.body(),
				request.charset());
	}

}
学新通

服务端在拦截器中从请求head中获取到xid并设置到RootContext中

学新通
服务端在HandlerInterceptor#preHandle方法中设置xid,调用完毕再清除xid

public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {


    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);


    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader(RootContext.KEY_XID);

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
        }
        if (xid == null && rpcXid != null) {
            RootContext.bind(rpcXid);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind[{}] to RootContext", rpcXid);
            }
        }

        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
        ModelAndView modelAndView) {
        if (RootContext.inGlobalTransaction()) {
            XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
        }
    }

}
学新通

我在刚学seata的过程中,就遇到过分布式事务不回滚的情况,此时,你就需要查看一下XID是否正常传递。当XID不能正常传递,就不会认为方法处于分布式事务中,自然就不会注册分支事务,当全局事务失败时就不会回滚

参考博客

[1]

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcbjjg
系列文章
更多 icon
同类精品
更多 icon
继续加载