seata源码全局事务id是传递的
通过Dubbo方式进行调用
想在Dubbo应用之间进行参数传递,其实非常简单。通过Dubbo提供的隐式传递功能即可实现。使用方式如下所示。
// 1. A服务设置参数
RpcContext.getContext().setAttachment("name", "小明");
// 2. A服务通过RPC调用B服务
// 3. B服务可以获取到A服务设置的参数
RpcContext.getContext().getAttachment("name");
所以我们可以把全局事务id放在RpcContext(rpc)中,然后在下游取出来。但是seata是从RootContext取出事务id的,我们如何先把事务id从RpcContext取出来然后再放到RootContext中呢?其实可以用到Dubbo Fillter
我们先看一下Dubbo Filter在哪个环节起作用,当我们调用远程方法的时候,实际上是通过代理对象调用的,将调用信息通过网络发送到服务端,服务端也是通过代理对象来接收请求的,然后根据请求调用服务端的方法,并返回结果。
而Dubbo Filter的作用则可以让用户在发送请求之前或者执行本地方法之前执行用户自定义的逻辑,增加可扩展性
所以我们可以在Consumer端通过Filter将事务id放在RpcContext中,在Provider端将事务id从RpcContext中取出来然后放入到RootContext中
@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)
public class ApacheDubboTransactionPropagationFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();
String rpcXid = getRpcXid();
String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
// 事务id为空,说明是在客户端调用,将事务id设置进去
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
} else {
// rpcXid不为空,说明这是一个服务端的方法,将上游传递下来的事务id绑定到上下文
if (rpcXid != null) {
// 将事务id绑定到上下文中
RootContext.bind(rpcXid);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
BranchType previousBranchType = RootContext.getBranchType();
// 将事务id解绑
String unbindXid = RootContext.unbind();
if (BranchType.TCC == previousBranchType) {
RootContext.unbindBranchType();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType != null ? rpcBranchType : "AT", previousBranchType);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
if (BranchType.TCC == previousBranchType) {
RootContext.bindBranchType(BranchType.TCC);
LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);
}
}
}
}
}
}
/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}
}
通过Http方式进行调用
- 当通过feign调用时,客户端在请求head中设置xid
- 服务端从请求head中获取到xid并设置到RootContext中
客户端在head中设置xid
// 这个类在spring-cloud-starter-alibaba-seata包中
public class SeataFeignClient implements Client {
SeataFeignClient(BeanFactory beanFactory, Client delegate) {
this.delegate = delegate;
this.beanFactory = beanFactory;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
private Request getModifyRequest(Request request) {
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
// 往head中设置xid
headers.put(RootContext.KEY_XID, seataXid);
return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}
}
服务端在拦截器中从请求head中获取到xid并设置到RootContext中
服务端在HandlerInterceptor#preHandle方法中设置xid,调用完毕再清除xid
public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
}
if (xid == null && rpcXid != null) {
RootContext.bind(rpcXid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] to RootContext", rpcXid);
}
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) {
if (RootContext.inGlobalTransaction()) {
XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
}
}
}
我在刚学seata的过程中,就遇到过分布式事务不回滚的情况,此时,你就需要查看一下XID是否正常传递。当XID不能正常传递,就不会认为方法处于分布式事务中,自然就不会注册分支事务,当全局事务失败时就不会回滚
参考博客
[1]
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcbjjg
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01