• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

链路日志追踪traceId

武飞扬头像
MikeTeas
帮助1

一,使用traceId概述

平时出现问题查询日志是程序员的解决方式,日志一般会从服务器的日志文件,然后查找自己需要的日志,或者日志输出到es中,在es中进行搜索日志,可是对于目前流行的微服务或者单体服务,将日志串起来去查看并不是一件容易的事情,一般微服务会调用多个系统,有http请求的,有mq的等会产生大量的日志,根据日志快速定位到具体的问题才是我们想要的解决方案,毕竟要用最短的时间找到问题所在,并快速解决。目前的elk搜集日志,也只是把所有的日志搜集起来,并没有将具体的日志按照请求串起来,所以这个目前需要在日志中添加traceId进行日志的追踪。

二,请求的源头

1,http请求
思路
在最开始请求系统时候生成一个全局唯一的traceId,放在http 请求header中,系统接收到请求后,从header中取出这个traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期,即当一个服务调用另外一个服务的时候,需要将traceId往下传递,从而形成一条链路。
实现
@Service
public class TraceInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceId = request.getHeader(ElkConstants.TRACE_ID);
if (StringUtils.isNotEmpty(traceId)) {
MDC.put(ElkConstants.TRACE_ID, traceId);
} else {
MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
}
return true;
}

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        MDC.remove(ElkConstants.TRACE_ID);
    }
}

@Configuration
public class InterceptorConfig implements WebMvcConfigurer {

    @Resource
    private TraceInterceptor traceInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(traceInterceptor).addPathPatterns("/**");
    }
}

public class ElkConstants {

/**
 * TRACE_ID
 */
public static final String TRACE_ID = "traceId";

}
2,定时任务Task
思路
在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
@Component
@Slf4j
public class SyncTask extends BaseTask {

/**
 * 定时任务
 */
public void sync() {
    //设置traceId
    setTraceId();
    //自己的业务逻辑
}

}
public abstract class BaseTask {

public void setTraceId() {
    MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
}

}
3,微服务(Feign)
思路
在请求的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
@Configuration
public class FeignInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
requestTemplate.header(ElkConstants.TRACE_ID, (String) MDC.get(ElkConstants.TRACE_ID));
}
}
4,Mq的方式
思路
使用mq进行消息发送的时候,可以将请求发送消息的traceId从mdc中取出来,我们可以放到消息体中,当做一个字段,然后在消息端消费的时候,从消息体中获取到traceId,并放入到MDC中,伴随着此次的请求
实现(kafka为列子,其他也可以按照思路进行不同的实现)
@Component
public class KafkaSender implements MessageSender {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private KafkaConfig kafkaConfig;

private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);

/**
 * 异步发送
 *
 * @param mqSendEvent
 * @return
 */
@Override
public boolean sendAsyncMessage(MqSendEvent mqSendEvent) {
   try {
        mqSendEvent.setTraceId(MDC.get("traceId"));
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(mqSendEvent.getTopic(),
                JSON.toJSONString(mqSendEvent));
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka sendAsyncMessage error, ex = {}, topic = {}, data = {}", ex, mqSendEvent.getTopic(),
                        JSON.toJSONString(mqSendEvent));
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("kafka sendAsyncMessage success topic = {}, data = {}", mqSendEvent.getTopic(),
                        JSON.toJSONString(mqSendEvent));
            }
        });
    } finally {
        MDC.clear();
    }
    return true;
}
学新通

}

public interface MessageSender {

/**
 * 异步发送
 *
 * @param mqSendEvent
 */
public boolean sendAsyncMessage(MqSendEvent mqSendEvent);

}
public class MqSendEvent implements Serializable {
//topic
private String topic;
//数据
private String data;
//traceId数据
private String traceId;

public MqSendEvent(String topic, String data) {
    this.topic = topic;
    this.data = data;
}

public String getTopic() {
    return topic;
}

public void setTopic(String topic) {
    this.topic = topic;
}

public String getData() {
    return data;
}

public void setData(String data) {
    this.data = data;
}

public String getTraceId() {
    return traceId;
}

public void setTraceId(String traceId) {
    this.traceId = traceId;
}
学新通

}
@Component
@Slf4j
public class UserRegisterListener extends MessageBaseListener {

/**
 * 监听消息
 *
 * @param consumerRecord
 * @param ack
 */
@Override
@KafkaListener(topics = {MessageConstants.REGISTER_USER_TOPIC}, containerFactory = MessageConstants.CONSUMER_CONTAINER_FACTORY_NAME)
protected void receiverMessage(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
    try {
        log.info("consumer message record:{}", consumerRecord);
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object value = optional.get();
            MqSendEvent mqSendEvent = JSON.parseObject((String) value, MqSendEvent.class);
            JSONObject jsonObject = JSON.parseObject(mqSendEvent.getData());
            MDC.put(ElkConstants.TRACE_ID, mqSendEvent.getTraceId());
         //具体业务逻辑
        }
    } finally {
        MDC.clear();
    }
    ack.acknowledge();
}
学新通

}
5,多线程方式
思路
在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现

三,MDC实现链路追踪的原理

1,概述
MDC(Mapped Diagnostic Contexts),是Slf4J类日志系统中实现分布式多线程日志数据传递的重要工具,用户可利用MDC将一些运行时的上下文数据打印出来。目前只有log4j和logback提供原生的MDC支持
2,使用(具体不同场景使用见(二,请求源头))
在logback配置文件中配置MDC容器中的变量%X{trackId}


%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{trackId} [.15t] %class{39}.%method[%L] : %m%n
UTF-8f


3,源码分析
MDC中的put方法
public static void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException(“key parameter cannot be null”);
} else if (mdcAdapter == null) {
throw new IllegalStateException(“MDCAdapter cannot be null. See also http://www.slf4j.org/codes.html#null_MDCA”);
} else {
mdcAdapter.put(key, val);
}
}
MDCAdapter接口
public interface MDCAdapter {
//设置当前线程MDC上下文中指定key的值和value的值
void put(String var1, String var2);
// 获取当前线程MDC上下文中指定key的值
String get(String var1);
// 移除当前线程MDC上下文中指定key的值
void remove(String var1);
// 清空MDC上下文
void clear();
// 获取MDC上下文
Map<String, String> getCopyOfContextMap();
// 设置MDC上下文
void setContextMap(Map<String, String> var1);
}
LogbackMDCAdapter实现

public class LogbackMDCAdapter implements MDCAdapter {
final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal();
private static final int WRITE_OPERATION = 1;
private static final int MAP_COPY_OPERATION = 2;
//threadlocal线程级别的,这就是为什么需要remove或者clear的原因所在,
//防止内存泄露,具体为啥,可以研究一下ThreadLocal底层原理就明白了
final ThreadLocal lastOperation = new ThreadLocal();

public LogbackMDCAdapter() {
}
......

可以看到LogbackMDCAdapter声明了类型为ThreadLocal的map。ThreadLocal 提供了线程本地的实例。ThreadLocal变量在线程之间隔离而在方法或类间能够共享,是属于线程单独私有的,线程之间相互隔离,到这里就可以大致理解为其实使用的就是ThreadLocal的私有线程的特性,大概就可以明白其中的原理了。
LogbackMDCAdapter 的put方法
public void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException(“key cannot be null”);
} else {
//复制当前线程的threadLocal
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
//设置当前操作为写操作,并返回上一次的操作
Integer lastOp = this.getAndSetLastOperation(1);
//上一次不是读操作或者null,已经初始化了,有内容的话在里面设置内容
if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
oldMap.put(key, val);
} else {
// 复制一个当前线程的副本
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.put(key, val);
}

    }
}

private Integer getAndSetLastOperation(int op) {
    //设置写操作,并返回上一次操作
    Integer lastOp = (Integer)this.lastOperation.get();
    this.lastOperation.set(op);
    return lastOp;
}

private boolean wasLastOpReadOrNull(Integer lastOp) {
    //判断操作类型 是null或者读操作
    return lastOp == null || lastOp == 2;
}

创建线程安全的map放到threadLocal中
private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
if (oldMap != null) {
synchronized(oldMap) {
newMap.putAll(oldMap);
}
}

    this.copyOnThreadLocal.set(newMap);
    return newMap;
}

get方法
public String get(String key) {
//从当前线程获取map
Map<String, String> map = (Map)this.copyOnThreadLocal.get();
return map != null && key != null ? (String)map.get(key) : null;
}
remove方法
public void remove(String key) {
if (key != null) {
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
if (oldMap != null) {
//设置为写操作
Integer lastOp = this.getAndSetLastOperation(1);
if (this.wasLastOpReadOrNull(lastOp)) {
//读操作或者null的时候,复制新map并移除当前key
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.remove(key);
} else {
oldMap.remove(key);
}

        }
    }
}

clear方法
public void clear() {
//设置为写操作
this.lastOperation.set(1);
//移除复制
this.copyOnThreadLocal.remove();
}

四,MDC使用总结

mdc就是基于Threadlocal进行的一个流程周转的标志物的传递,就是根据这种标志,可以追踪到日志的整体请求记录,便于进行定位到问题所在,而且对于用户的影响极小

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfckeaf
系列文章
更多 icon
同类精品
更多 icon
继续加载