在分布式追踪的实现上,开源界有两种路线,一种是以pinpoint/skywalking等为代表的javaagent方式,另一种是zipkin这种结合程序钩子和AOP的实现方式。
Sleuth借鉴谷歌Dapper的理念,以span为基本单元,多个span可以构成1个树状的trace,根span的id和trace的id相同。span除了有id外,还可以有描述、时间戳、tags。tags可以附加在span上,而baggage能在一个trace传递,Sleuth会自动识别出HTTP以"baggage-"开始的请求头,或"baggage_"开始的消息请求头。
Sleuth与OpenTracing兼容,使用Brave/Zipkin作为分布式跟踪的类库。
假设用户发起一个请求,以HTTP协议被微服务接收。按照java的Servlet规范,我们可以使用到的方式有Filter。按照spring mvc的方式,我们可以采取的措施有HandlerInterceptor。那我们看下spring本尊会用到哪种方式:
# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(Tracer.class)
@ConditionalOnSleuthWeb
@Import({ SkipPatternConfiguration.class, TraceWebFluxConfiguration.class, TraceWebServletConfiguration.class }) // 1. 引入其它配置做具体处理
@EnableConfigurationProperties(SleuthWebProperties.class)
@AutoConfigureAfter(BraveAutoConfiguration.class) // 2. 在BraveAutoConfiguration之后生效
public class TraceWebAutoConfiguration {
}
在TraceWebAutoConfiguration自动配置类引入的SkipPatternConfiguration定义需要被忽略的路径格式,比如actuator/management相关接口、swagger等api文档接口、静态资源、hystrix.stream等。
先只看传统servlet形式如何处理:
# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebServletConfiguration
@Bean
TraceWebAspect traceWebAspect(Tracer tracer, CurrentTraceContext currentTraceContext, SpanNamer spanNamer) {
return new TraceWebAspect(tracer, currentTraceContext, spanNamer); // 1. TraceWebAspect对类上有@Controller/@RestController注解,且方法返回值类型为Callable的方法,将返回值包装成TraceCallable;如果返回值类型是WebAsyncTask,则设置字段callable为包装后的TraceCallable
}
@Bean
FilterRegistrationBean traceWebFilter(BeanFactory beanFactory, SleuthWebProperties webProperties) {
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new LazyTracingFilter(beanFactory)); // 2. 注册LazyTracingFilter来过滤所有请求
filterRegistrationBean.setDispatcherTypes(DispatcherType.ASYNC, DispatcherType.ERROR, DispatcherType.FORWARD,
DispatcherType.INCLUDE, DispatcherType.REQUEST);
filterRegistrationBean.setOrder(webProperties.getFilterOrder());
return filterRegistrationBean;
}
/**
* Nested config that configures Web MVC if it's present (without adding a runtime
* dependency to it).
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(WebMvcConfigurer.class)
@Import(TraceWebMvcConfigurer.class) // 3. 引入TraceWebMvcConfigurer,通过此配置类从BeanFactory中获取SpanCustomizingAsyncHandlerInterceptor(已在TraceWebServletConfiguration引入),添加到Spring MVC的InterceptorRegistry
protected static class TraceWebMvcAutoConfiguration {
}
目前还是不够明朗,既有Filter,又有HandlerInterceptor,还有之前没有提到的对异步的处理。需要继续拿LazyTracingFilter和SpanCustomizingAsyncHandlerInterceptor看个究竟:
// LazyTracingFilter主要是以CurrentTraceContext(在BraveAutoConfiguration定义)和HttpServerHandler(由BraveAutoConfiguration通过BraveHttpConfiguration间接引入的BraveHttpBridgeConfiguration定义),来创建TracingFilter,将过滤功能由TracingFilter代理
# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.servlet.TracingFilter
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse res = servlet.httpServletResponse(response);
// Prevent duplicate spans for the same request
TraceContext context = (TraceContext) request.getAttribute(TraceContext.class.getName());
if (context != null) { // 1. 结束掉可能的转发跟踪
// A forwarded request might end up on another thread, so make sure it is
// scoped
CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(context);
try {
chain.doFilter(request, response);
}
finally {
scope.close();
}
return;
}
Span span = handler.handleReceive(new HttpServletRequestWrapper(req)); // 2. sr(server receive),如果当前TraceContext已存在则取出Span,否则创建MutableSpan并包装成PendingSpan,最后以RealSpan形式返回
// Add attributes for explicit access to customization or span context
request.setAttribute(SpanCustomizer.class.getName(), span);
request.setAttribute(TraceContext.class.getName(), span.context());
SendHandled sendHandled = new SendHandled();
request.setAttribute(SendHandled.class.getName(), sendHandled);
Throwable error = null;
CurrentTraceContext.Scope scope = currentTraceContext.newScope(span.context());
try {
// any downstream code can see Tracer.currentSpan() or use
// Tracer.currentSpanCustomizer()
chain.doFilter(req, res);
}
catch (Throwable e) {
error = e;
throw e;
}
finally {
// When async, even if we caught an exception, we don't have the final
// response: defer
if (servlet.isAsync(req)) {
servlet.handleAsync(handler, req, res, span); // 3. 异步处理,使用TracingAsyncListener来监听处理完毕事件,处理方式与同步相同
}
else if (sendHandled.compareAndSet(false, true)) {
// we have a synchronous response or error: finish the span
HttpServerResponse responseWrapper = HttpServletResponseWrapper.create(req, res, error);
handler.handleSend(responseWrapper, span); // 4. ss(server send)同步处理,移出TraceContext,使用SpanHandler(参考tracer定义时的PendingSpans处理,BraveBaggageConfiguration有定义baggageTagSpanHandler,ZipkinBraveConfiguration有定义zipkinSpanHandler)将PendingSpan结束
}
scope.close();
}
}
```
接着看下SpanCustomizingAsyncHandlerInterceptor:
# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.mvc.SpanCustomizingAsyncHandlerInterceptor
@Autowired(required = false) // 1. 默认没有定义HandlerParser
HandlerParser handlerParser = new HandlerParser();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) {
Object span = request.getAttribute(SpanCustomizer.class.getName()); // 2. TracingFilter里放入,此处取出
if (span instanceof SpanCustomizer) {
handlerParser.preHandle(request, o, (SpanCustomizer) span); // 3. 在Span上添加类和方法的tag(mvc.controller.class、mvc.controller.method)
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) {
Object span = request.getAttribute(SpanCustomizer.class.getName());
if (span instanceof SpanCustomizer) {
handlerParser.postHandle(request, handler, modelAndView, (SpanCustomizer) span); // 4. 无任何处理(空方法)
}
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
Exception ex) {
Object span = request.getAttribute(SpanCustomizer.class.getName());
if (span instanceof SpanCustomizer) {
setErrorAttribute(request, ex); // 5. 设置"error"属性
setHttpRouteAttribute(request); // 6. 设置"http.route"属性
}
}
综上,对于HTTP请求,Filter负责主要的Span创建或延续,而HandlerInterceptor了解更多的信息,则是为Span添加tag。
那么对于微服务发出的请求如何处理呢?
以feign为例,自动配置类为TraceFeignClientAutoConfiguration。该类定义了切面类TraceFeignAspect,将feign.Client对象以TraceFeignObjectWrapper包装,代码如下:
# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignObjectWrapper
Object wrap(Object bean) {
if (bean instanceof Client && !(bean instanceof TracingFeignClient)
&& !(bean instanceof LazyTracingFeignClient)) {
if (loadBalancerPresent && bean instanceof FeignBlockingLoadBalancerClient
&& !(bean instanceof TraceFeignBlockingLoadBalancerClient)) {// 1. 阻塞时的feign负载均衡存在,且该bean未被TraceFeignObjectWrapper处理时
return instrumentedFeignLoadBalancerClient(bean);
}
if (loadBalancerPresent && bean instanceof RetryableFeignBlockingLoadBalancerClient
&& !(bean instanceof TraceRetryableFeignBlockingLoadBalancerClient)) {
return instrumentedRetryableFeignLoadBalancerClient(bean);
}
return new LazyTracingFeignClient(this.beanFactory, (Client) bean); // 2. 普通的feign.Client,返回包装的TracingFeignClient
}
return this.traceFeignBuilderBeanPostProcessor.postProcessAfterInitialization(bean, null); // 使用TraceFeignBuilderBeanPostProcessor处理Feign.Builder(实质仍是在获取Client前使用TraceFeignObjectWrapper返回包装的Client)
}
private Object instrumentedFeignLoadBalancerClient(Object bean) {
if (AopUtils.getTargetClass(bean).equals(FeignBlockingLoadBalancerClient.class)) {
FeignBlockingLoadBalancerClient client = ProxyUtils.getTargetObject(bean); // 1.1 已经经过AOP处理的FeignBlockingLoadBalancerClient,获得真实对象
return new TraceFeignBlockingLoadBalancerClient( // 1.1 将FeignBlockingLoadBalancerClient的delegate作为Client包装,再包装成支持负载均衡的TraceFeignBlockingLoadBalancerClient
(Client) new TraceFeignObjectWrapper(this.beanFactory).wrap(client.getDelegate()),
(LoadBalancerClient) loadBalancerClient(), loadBalancerProperties(),
(LoadBalancerClientFactory) loadBalancerClientFactory(), this.beanFactory);
}
else {
FeignBlockingLoadBalancerClient client = ProxyUtils.getTargetObject(bean);
try {
Field delegate = FeignBlockingLoadBalancerClient.class.getDeclaredField(DELEGATE);
delegate.setAccessible(true);
delegate.set(client, new TraceFeignObjectWrapper(this.beanFactory).wrap(client.getDelegate()));
}
catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException | SecurityException e) {
log.warn(EXCEPTION_WARNING, e);
}
return new TraceFeignBlockingLoadBalancerClient(client, (LoadBalancerClient) loadBalancerClient(),
loadBalancerProperties(), (LoadBalancerClientFactory) loadBalancerClientFactory(),
this.beanFactory);
}
}
了解spring-cloud-loadbalancer或ribbon机制的一定了解,请求先被负载均衡处理,那么先看TraceFeignBlockingLoadBalancerClient。
# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignBlockingLoadBalancerClient
public Response execute(Request request, Request.Options options) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Before send");
}
Response response = null;
Span fallbackSpan = tracer().nextSpan().start(); // 1. 从BeanFactory获取tracer,如果当前线程有TraceContext则以之为父TraceContext创建Span,记录开始时间
try {
if (delegateIsALoadBalancer()) { // 2. 如果委托对象支持负载均衡,使用委托对象,否则使用FeignBlockingLoadBalancerClient
response = getDelegate().execute(request, options);
}
else {
response = super.execute(request, options);
}
if (LOG.isDebugEnabled()) {
LOG.debug("After receive");
}
return response;
}
catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception thrown", e);
}
if (e instanceof IOException) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"IOException was thrown, so most likely the traced client wasn't called. Falling back to a manual span.");
}
tracingFeignClient().handleSendAndReceive(fallbackSpan, request, response, e); // 3. 出现异常时,使用TracingFeignClient处理cs(client send)和cr(client receive)
}
throw e;
}
finally {
fallbackSpan.abandon(); // 4. 移出TraceContext,并以放弃信号结束Span
}
}
看完了外层的处理,再看内层的请求处理。内层的LazyTracingFeignClient本质是延迟获取TracingFeignClient来执行:
# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.client.feign.TracingFeignClient
public Response execute(Request req, Request.Options options) throws IOException {
RequestWrapper request = new RequestWrapper(req);
Span span = this.handler.handleSend(request); // 1. cs(client send),handler为从BeanFactory获取的HttpClientHandler(同HttpServerHandler,由BraveAutoConfiguration引入BraveHttpConfiguration,再经过BraveHttpBridgeConfiguration间接定义braveHttpClientHandler)
if (log.isDebugEnabled()) {
log.debug("Handled send of " + span);
}
Response res = null;
Throwable error = null;
try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(span.context())) { // 2. 从BeanFactory获取CurrentTraceContext(由BraveAutoConfiguration定义的sleuthCurrentTraceContext),重设线程本地变量的TraceContext
res = this.delegate.execute(request.build(), options);
if (res == null) { // possibly null on bad implementation or mocks
res = Response.builder().request(req).build();
}
return res;
}
catch (Throwable e) {
error = e;
throw e;
}
finally {
ResponseWrapper response = new ResponseWrapper(request, res, error);
this.handler.handleReceive(response, span); // 3. cr(client receive)
if (log.isDebugEnabled()) {
log.debug("Handled receive of " + span);
}
}
}
值得一提的是client send时,必须将目前的traceId和spanId给到下一个微服务,此处如何做到呢?
此处先做简单说明,在HttpClientHandler的handleSend时,通过tracer创建新的span,并通过Injector将TraceContext注入到HttpClientRequest。
回顾bean定义,BraveHttpConfiguration定义httpTracing,HttpTracing的propagation来自tracing,tracing的propagation来自BraveBaggageConfiguration定义的sleuthPropagationWithB3Baggage提供,而Propagation.Factory类型为BaggageFactoryWrapper,Propagation的创建是通过Propagation.Factory的create方法:
# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.autoconfig.brave.BraveBaggageConfiguration.BaggageFactoryWrapper
public <K> Propagation<K> create(Propagation.KeyFactory<K> keyFactory) {
Propagation<K> propagation = delegate.create(keyFactory); // 1. 定义的委托对象类型为brave.baggage.BaggagePropagation.Factory,通过StringPropagationAdapter创建Propagation,然后包装成BaggagePropagation
return delegateWithoutB3Baggage(factoryFromSupplier, propagation);
}
private <K> Propagation<K> delegateWithoutB3Baggage(Propagation.Factory factoryFromSupplier,
Propagation<K> propagation) {
return new Propagation<K>() {
@Override
public List<K> keys() {
return propagation.keys();
}
@Override
public <R> TraceContext.Injector<R> injector(Setter<R, K> setter) {
// We don't want to inject baggage in the Brave way
return factoryFromSupplier.get().injector((Setter<R, String>) setter); // 2. factoryFromSupplier由BraveBaggageConfiguration定义的defaultPropagationFactorySupplier,类型为brave.propagation.B3Propagation.Factory
}
@Override
public <R> TraceContext.Extractor<R> extractor(Getter<R, K> getter) {
return propagation.extractor(getter);
}
};
}
HttpClientHandler.defaultInjector由HttpTracing.Propagation来调用injector方法,参数为HttpClientRequest.SETTER。
B3Propagation.Factory的injector方法相当简单,根据RemoteSetter的spanKind,返回RemoteInjector。(包含setter和InjectorFunction,InjectorFunction来源于定义defaultPropagationFactorySupplier的B3Propagation.Format.SINGLE_NO_PARENT)
则处理关系如下:
brave.http.HttpClientHandler.handleSend
brave.internal.propagation.InjectorFactory.RemoteInjector.inject
brave.propagation.B3Propagation.Format#inject
代码如下:
# brave-5.13.2.jar!/brave.propagation.B3Propagation.Format
SINGLE_NO_PARENT() {
@Override public List<String> keyNames() {
return SINGLE_KEY_NAMES;
}
@Override public <R> void inject(Setter<R, String> setter, TraceContext context, R request) {
setter.put(request, B3, writeB3SingleFormatWithoutParentId(context)); // 1. 从TraceContext取traceId、spanId和sampled合并为一个值,通过定义的RemoteSetter写到HTTP请求头"B3"
}
};
既然客户端发送时通过请求头"B3"来传递,那么服务端必然会有接收代码。
首先是TracingFilter在过滤时,HttpServerHandler处理服务端接收,委托给brave:
# brave-instrumentation-http-5.13.2.jar!/brave.http.HttpServerHandler
public Span handleReceive(HttpServerRequest request) {
Span span = nextSpan(defaultExtractor.extract(request), request); // 1. 从请求提取信息
return handleStart(request, span);
}
Span nextSpan(TraceContextOrSamplingFlags extracted, HttpServerRequest request) {
Boolean sampled = extracted.sampled();
// only recreate the context if the http sampler made a decision
if (sampled == null && (sampled = sampler.trySample(request)) != null) { // 2. 入口服务或者采样器返回结果不为空(sampler由httpTracing得来,需要开发者定义名为sleuthHttpServerSampler的SamplerFunction,否则使用SampleFunctions.deferDecision会返回null)
extracted = extracted.sampled(sampled.booleanValue()); // 3. 添加采样标识
}
return extracted.context() != null
? tracer.joinSpan(extracted.context()) // 4. TraceContext存在时,且支持共享时使用之前的traceId、spanId,否则创建新的TraceContext、Span
: tracer.nextSpan(extracted);
}
再看下请求头如何处理。defaultExtractor与injector类似,只不过是通过Propagation.extractor方法而已,参数为HttpServerRequest.GETTER。看B3Extractor如何处理:
# brave-5.13.2.jar!/brave.propagation.B3Propagation.B3Extractor
@Override public TraceContextOrSamplingFlags extract(R request) {
if (request == null) throw new NullPointerException("request == null");
// try to extract single-header format
String b3 = getter.get(request, B3); // 1. 获取HTTP请求头的"B3"值
TraceContextOrSamplingFlags extracted = b3 != null ? parseB3SingleFormat(b3) : null; // 2. 解析B3值成TraceContext,再创建成TraceContextOrSamplingFlags(type为1)
if (extracted != null) return extracted;
// Start by looking at the sampled state as this is used regardless
// Official sampled value is 1, though some old instrumentation send true
String sampled = getter.get(request, SAMPLED);
Boolean sampledV;
if (sampled == null) {
sampledV = null; // defer decision
} else if (sampled.length() == 1) { // handle fast valid paths
char sampledC = sampled.charAt(0);
if (sampledC == '1') {
sampledV = true;
} else if (sampledC == '0') {
sampledV = false;
} else {
Platform.get().log(SAMPLED_MALFORMED, sampled, null);
return TraceContextOrSamplingFlags.EMPTY; // trace context is malformed so return empty
}
} else if (sampled.equalsIgnoreCase("true")) { // old clients
sampledV = true;
} else if (sampled.equalsIgnoreCase("false")) { // old clients
sampledV = false;
} else {
Platform.get().log(SAMPLED_MALFORMED, sampled, null);
return TraceContextOrSamplingFlags.EMPTY; // Restart trace instead of propagating false
}
// The only flag we action is 1, but it could be that any integer is present.
// Here, we leniently parse as debug is not a primary consideration of the trace context.
boolean debug = "1".equals(getter.get(request, FLAGS));
String traceIdString = getter.get(request, TRACE_ID);
// It is ok to go without a trace ID, if sampling or debug is set
if (traceIdString == null) {
if (debug) return TraceContextOrSamplingFlags.DEBUG;
if (sampledV != null) {
return sampledV
? TraceContextOrSamplingFlags.SAMPLED
: TraceContextOrSamplingFlags.NOT_SAMPLED;
}
}
// Try to parse the trace IDs into the context
TraceContext.Builder result = TraceContext.newBuilder();
if (result.parseTraceId(traceIdString, TRACE_ID)
&& result.parseSpanId(getter, request, SPAN_ID)
&& result.parseParentId(getter, request, PARENT_SPAN_ID)) {
if (sampledV != null) result.sampled(sampledV.booleanValue());
if (debug) result.debug(true);
return TraceContextOrSamplingFlags.create(result.build());
}
return TraceContextOrSamplingFlags.EMPTY; // trace context is malformed so return empty
}
以上便是RPC请求的处理,请求前判断是延续还是开启新的Trace,请求成功或异常时结束Span,如果过程中有新的请求发出,那么就从TraceContext获取信息,塞入到请求头。
对于过程中有发消息的,比如Kafka,类似的,是将TraceContext放入到ProducerRecord.headers;对于过程中有调用数据库的,比如使用hikari,将数据源包装成有多种事件监听的新数据源,在执行前后调用对应的事件监听方法;
对于线程池,只要被spring管理,通过ExecutorBeanPostProcessor,将Executor/ThreadPoolTaskExecutor/ScheduledExecutorService/ExecutorService/AsyncTaskExecutor包装成可跟踪的线程池,在提交/执行任务时,线程池将任务包装成TraceRunnable或TraceRunnable再真正提交/执行,TraceRunnable/TraceRunnable持有tracer和span等信息。
关于TraceRunnable代码如下:
# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.async.TraceRunnable
public void run() {
Span childSpan = SleuthAsyncSpan.ASYNC_RUNNABLE_SPAN.wrap(this.tracer.nextSpan(this.parent)) // 1. 以提交线程池的线程里的Span作为父Span创建新的Span
.name(this.spanName);
try (Tracer.SpanInScope ws = this.tracer.withSpan(childSpan.start())) {
this.delegate.run(); // 2. 任务执行
}
catch (Exception | Error e) {
childSpan.error(e); // 3. 任务出现异常时,记录异常
throw e;
}
finally {
childSpan.end(); // 4. 任务正常结束
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)