程序员开发实例大全宝库

网站首页 > 编程文章 正文

阅读代码深入原理21——Spring Cloud Sleuth

zazugpt 2024-08-13 13:14:49 编程文章 27 ℃ 0 评论

在分布式追踪的实现上,开源界有两种路线,一种是以pinpoint/skywalking等为代表的javaagent方式,另一种是zipkin这种结合程序钩子和AOP的实现方式。

Sleuth借鉴谷歌Dapper的理念,以span为基本单元,多个span可以构成1个树状的trace,根span的id和trace的id相同。span除了有id外,还可以有描述、时间戳、tags。tags可以附加在span上,而baggage能在一个trace传递,Sleuth会自动识别出HTTP以"baggage-"开始的请求头,或"baggage_"开始的消息请求头。

Sleuth与OpenTracing兼容,使用Brave/Zipkin作为分布式跟踪的类库。

假设用户发起一个请求,以HTTP协议被微服务接收。按照java的Servlet规范,我们可以使用到的方式有Filter。按照spring mvc的方式,我们可以采取的措施有HandlerInterceptor。那我们看下spring本尊会用到哪种方式:

# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(Tracer.class)
@ConditionalOnSleuthWeb
@Import({ SkipPatternConfiguration.class, TraceWebFluxConfiguration.class, TraceWebServletConfiguration.class }) // 1. 引入其它配置做具体处理
@EnableConfigurationProperties(SleuthWebProperties.class)
@AutoConfigureAfter(BraveAutoConfiguration.class) // 2. 在BraveAutoConfiguration之后生效
public class TraceWebAutoConfiguration {
}

在TraceWebAutoConfiguration自动配置类引入的SkipPatternConfiguration定义需要被忽略的路径格式,比如actuator/management相关接口、swagger等api文档接口、静态资源、hystrix.stream等。

先只看传统servlet形式如何处理:

# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebServletConfiguration
	@Bean
	TraceWebAspect traceWebAspect(Tracer tracer, CurrentTraceContext currentTraceContext, SpanNamer spanNamer) {
		return new TraceWebAspect(tracer, currentTraceContext, spanNamer); // 1. TraceWebAspect对类上有@Controller/@RestController注解,且方法返回值类型为Callable的方法,将返回值包装成TraceCallable;如果返回值类型是WebAsyncTask,则设置字段callable为包装后的TraceCallable
	}
	@Bean
	FilterRegistrationBean traceWebFilter(BeanFactory beanFactory, SleuthWebProperties webProperties) {
		FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new LazyTracingFilter(beanFactory)); // 2. 注册LazyTracingFilter来过滤所有请求
		filterRegistrationBean.setDispatcherTypes(DispatcherType.ASYNC, DispatcherType.ERROR, DispatcherType.FORWARD,
				DispatcherType.INCLUDE, DispatcherType.REQUEST);
		filterRegistrationBean.setOrder(webProperties.getFilterOrder());
		return filterRegistrationBean;
	}
	/**
	 * Nested config that configures Web MVC if it's present (without adding a runtime
	 * dependency to it).
	 */
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnClass(WebMvcConfigurer.class)
	@Import(TraceWebMvcConfigurer.class) // 3. 引入TraceWebMvcConfigurer,通过此配置类从BeanFactory中获取SpanCustomizingAsyncHandlerInterceptor(已在TraceWebServletConfiguration引入),添加到Spring MVC的InterceptorRegistry
	protected static class TraceWebMvcAutoConfiguration {
	}

目前还是不够明朗,既有Filter,又有HandlerInterceptor,还有之前没有提到的对异步的处理。需要继续拿LazyTracingFilter和SpanCustomizingAsyncHandlerInterceptor看个究竟:

// LazyTracingFilter主要是以CurrentTraceContext(在BraveAutoConfiguration定义)和HttpServerHandler(由BraveAutoConfiguration通过BraveHttpConfiguration间接引入的BraveHttpBridgeConfiguration定义),来创建TracingFilter,将过滤功能由TracingFilter代理
# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.servlet.TracingFilter
	public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
			throws IOException, ServletException {
		HttpServletRequest req = (HttpServletRequest) request;
		HttpServletResponse res = servlet.httpServletResponse(response);
		// Prevent duplicate spans for the same request
		TraceContext context = (TraceContext) request.getAttribute(TraceContext.class.getName());
		if (context != null) { // 1. 结束掉可能的转发跟踪
			// A forwarded request might end up on another thread, so make sure it is
			// scoped
			CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(context);
			try {
				chain.doFilter(request, response);
			}
			finally {
				scope.close();
			}
			return;
		}
		Span span = handler.handleReceive(new HttpServletRequestWrapper(req)); // 2. sr(server receive),如果当前TraceContext已存在则取出Span,否则创建MutableSpan并包装成PendingSpan,最后以RealSpan形式返回
		// Add attributes for explicit access to customization or span context
		request.setAttribute(SpanCustomizer.class.getName(), span);
		request.setAttribute(TraceContext.class.getName(), span.context());
		SendHandled sendHandled = new SendHandled();
		request.setAttribute(SendHandled.class.getName(), sendHandled);
		Throwable error = null;
		CurrentTraceContext.Scope scope = currentTraceContext.newScope(span.context());
		try {
			// any downstream code can see Tracer.currentSpan() or use
			// Tracer.currentSpanCustomizer()
			chain.doFilter(req, res);
		}
		catch (Throwable e) {
			error = e;
			throw e;
		}
		finally {
			// When async, even if we caught an exception, we don't have the final
			// response: defer
			if (servlet.isAsync(req)) {
				servlet.handleAsync(handler, req, res, span); // 3. 异步处理,使用TracingAsyncListener来监听处理完毕事件,处理方式与同步相同
			}
			else if (sendHandled.compareAndSet(false, true)) {
				// we have a synchronous response or error: finish the span
				HttpServerResponse responseWrapper = HttpServletResponseWrapper.create(req, res, error);
				handler.handleSend(responseWrapper, span); // 4. ss(server send)同步处理,移出TraceContext,使用SpanHandler(参考tracer定义时的PendingSpans处理,BraveBaggageConfiguration有定义baggageTagSpanHandler,ZipkinBraveConfiguration有定义zipkinSpanHandler)将PendingSpan结束
			}
			scope.close();
		}
	}

```

接着看下SpanCustomizingAsyncHandlerInterceptor:

# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.mvc.SpanCustomizingAsyncHandlerInterceptor
	@Autowired(required = false) // 1. 默认没有定义HandlerParser
	HandlerParser handlerParser = new HandlerParser();
	@Override
	public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) {
		Object span = request.getAttribute(SpanCustomizer.class.getName()); // 2. TracingFilter里放入,此处取出
		if (span instanceof SpanCustomizer) {
			handlerParser.preHandle(request, o, (SpanCustomizer) span); // 3. 在Span上添加类和方法的tag(mvc.controller.class、mvc.controller.method)
		}
		return true;
	}
	@Override
	public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
			ModelAndView modelAndView) {
		Object span = request.getAttribute(SpanCustomizer.class.getName());
		if (span instanceof SpanCustomizer) {
			handlerParser.postHandle(request, handler, modelAndView, (SpanCustomizer) span); // 4. 无任何处理(空方法)
		}
	}
	@Override
	public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
			Exception ex) {
		Object span = request.getAttribute(SpanCustomizer.class.getName());
		if (span instanceof SpanCustomizer) {
			setErrorAttribute(request, ex); // 5. 设置"error"属性
			setHttpRouteAttribute(request); // 6. 设置"http.route"属性
		}
	}

综上,对于HTTP请求,Filter负责主要的Span创建或延续,而HandlerInterceptor了解更多的信息,则是为Span添加tag。

那么对于微服务发出的请求如何处理呢?

以feign为例,自动配置类为TraceFeignClientAutoConfiguration。该类定义了切面类TraceFeignAspect,将feign.Client对象以TraceFeignObjectWrapper包装,代码如下:

# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignObjectWrapper
	Object wrap(Object bean) {
		if (bean instanceof Client && !(bean instanceof TracingFeignClient)
				&& !(bean instanceof LazyTracingFeignClient)) {
			if (loadBalancerPresent && bean instanceof FeignBlockingLoadBalancerClient
					&& !(bean instanceof TraceFeignBlockingLoadBalancerClient)) {// 1. 阻塞时的feign负载均衡存在,且该bean未被TraceFeignObjectWrapper处理时
				return instrumentedFeignLoadBalancerClient(bean); 
			}
			if (loadBalancerPresent && bean instanceof RetryableFeignBlockingLoadBalancerClient
					&& !(bean instanceof TraceRetryableFeignBlockingLoadBalancerClient)) {
				return instrumentedRetryableFeignLoadBalancerClient(bean);
			}
			return new LazyTracingFeignClient(this.beanFactory, (Client) bean); // 2. 普通的feign.Client,返回包装的TracingFeignClient
		}
		return this.traceFeignBuilderBeanPostProcessor.postProcessAfterInitialization(bean, null); // 使用TraceFeignBuilderBeanPostProcessor处理Feign.Builder(实质仍是在获取Client前使用TraceFeignObjectWrapper返回包装的Client)
	}
	private Object instrumentedFeignLoadBalancerClient(Object bean) {
		if (AopUtils.getTargetClass(bean).equals(FeignBlockingLoadBalancerClient.class)) {
			FeignBlockingLoadBalancerClient client = ProxyUtils.getTargetObject(bean); // 1.1 已经经过AOP处理的FeignBlockingLoadBalancerClient,获得真实对象
			return new TraceFeignBlockingLoadBalancerClient( // 1.1 将FeignBlockingLoadBalancerClient的delegate作为Client包装,再包装成支持负载均衡的TraceFeignBlockingLoadBalancerClient
					(Client) new TraceFeignObjectWrapper(this.beanFactory).wrap(client.getDelegate()),
					(LoadBalancerClient) loadBalancerClient(), loadBalancerProperties(),
					(LoadBalancerClientFactory) loadBalancerClientFactory(), this.beanFactory);
		}
		else {
			FeignBlockingLoadBalancerClient client = ProxyUtils.getTargetObject(bean);
			try {
				Field delegate = FeignBlockingLoadBalancerClient.class.getDeclaredField(DELEGATE);
				delegate.setAccessible(true);
				delegate.set(client, new TraceFeignObjectWrapper(this.beanFactory).wrap(client.getDelegate()));
			}
			catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException | SecurityException e) {
				log.warn(EXCEPTION_WARNING, e);
			}
			return new TraceFeignBlockingLoadBalancerClient(client, (LoadBalancerClient) loadBalancerClient(),
					loadBalancerProperties(), (LoadBalancerClientFactory) loadBalancerClientFactory(),
					this.beanFactory);
		}
	}

了解spring-cloud-loadbalancer或ribbon机制的一定了解,请求先被负载均衡处理,那么先看TraceFeignBlockingLoadBalancerClient。

# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignBlockingLoadBalancerClient
	public Response execute(Request request, Request.Options options) throws IOException {
		if (LOG.isDebugEnabled()) {
			LOG.debug("Before send");
		}
		Response response = null;
		Span fallbackSpan = tracer().nextSpan().start(); // 1. 从BeanFactory获取tracer,如果当前线程有TraceContext则以之为父TraceContext创建Span,记录开始时间
		try {
			if (delegateIsALoadBalancer()) { // 2. 如果委托对象支持负载均衡,使用委托对象,否则使用FeignBlockingLoadBalancerClient
				response = getDelegate().execute(request, options);
			}
			else {
				response = super.execute(request, options);
			}
			if (LOG.isDebugEnabled()) {
				LOG.debug("After receive");
			}
			return response;
		}
		catch (Exception e) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Exception thrown", e);
			}
			if (e instanceof IOException) {
				if (LOG.isDebugEnabled()) {
					LOG.debug(
							"IOException was thrown, so most likely the traced client wasn't called. Falling back to a manual span.");
				}
				tracingFeignClient().handleSendAndReceive(fallbackSpan, request, response, e); // 3. 出现异常时,使用TracingFeignClient处理cs(client send)和cr(client receive)
			}
			throw e;
		}
		finally {
			fallbackSpan.abandon(); // 4. 移出TraceContext,并以放弃信号结束Span
		}
	}

看完了外层的处理,再看内层的请求处理。内层的LazyTracingFeignClient本质是延迟获取TracingFeignClient来执行:

# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.web.client.feign.TracingFeignClient
	public Response execute(Request req, Request.Options options) throws IOException {
		RequestWrapper request = new RequestWrapper(req);
		Span span = this.handler.handleSend(request); // 1. cs(client send),handler为从BeanFactory获取的HttpClientHandler(同HttpServerHandler,由BraveAutoConfiguration引入BraveHttpConfiguration,再经过BraveHttpBridgeConfiguration间接定义braveHttpClientHandler)
		if (log.isDebugEnabled()) {
			log.debug("Handled send of " + span);
		}
		Response res = null;
		Throwable error = null;
		try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(span.context())) { // 2. 从BeanFactory获取CurrentTraceContext(由BraveAutoConfiguration定义的sleuthCurrentTraceContext),重设线程本地变量的TraceContext
			res = this.delegate.execute(request.build(), options);
			if (res == null) { // possibly null on bad implementation or mocks
				res = Response.builder().request(req).build();
			}
			return res;
		}
		catch (Throwable e) {
			error = e;
			throw e;
		}
		finally {
			ResponseWrapper response = new ResponseWrapper(request, res, error);
			this.handler.handleReceive(response, span); // 3. cr(client receive)
			if (log.isDebugEnabled()) {
				log.debug("Handled receive of " + span);
			}
		}
	}

值得一提的是client send时,必须将目前的traceId和spanId给到下一个微服务,此处如何做到呢?

此处先做简单说明,在HttpClientHandler的handleSend时,通过tracer创建新的span,并通过Injector将TraceContext注入到HttpClientRequest。

回顾bean定义,BraveHttpConfiguration定义httpTracing,HttpTracing的propagation来自tracing,tracing的propagation来自BraveBaggageConfiguration定义的sleuthPropagationWithB3Baggage提供,而Propagation.Factory类型为BaggageFactoryWrapper,Propagation的创建是通过Propagation.Factory的create方法:

# spring-cloud-sleuth-autoconfigure-3.1.0.jar!/org.springframework.cloud.sleuth.autoconfig.brave.BraveBaggageConfiguration.BaggageFactoryWrapper
		public <K> Propagation<K> create(Propagation.KeyFactory<K> keyFactory) {
			Propagation<K> propagation = delegate.create(keyFactory); // 1. 定义的委托对象类型为brave.baggage.BaggagePropagation.Factory,通过StringPropagationAdapter创建Propagation,然后包装成BaggagePropagation
			return delegateWithoutB3Baggage(factoryFromSupplier, propagation);
		}
		private <K> Propagation<K> delegateWithoutB3Baggage(Propagation.Factory factoryFromSupplier,
				Propagation<K> propagation) {
			return new Propagation<K>() {
				@Override
				public List<K> keys() {
					return propagation.keys();
				}
				@Override
				public <R> TraceContext.Injector<R> injector(Setter<R, K> setter) {
					// We don't want to inject baggage in the Brave way
					return factoryFromSupplier.get().injector((Setter<R, String>) setter); // 2. factoryFromSupplier由BraveBaggageConfiguration定义的defaultPropagationFactorySupplier,类型为brave.propagation.B3Propagation.Factory
				}
				@Override
				public <R> TraceContext.Extractor<R> extractor(Getter<R, K> getter) {
					return propagation.extractor(getter);
				}
			};
		}

HttpClientHandler.defaultInjector由HttpTracing.Propagation来调用injector方法,参数为HttpClientRequest.SETTER。

B3Propagation.Factory的injector方法相当简单,根据RemoteSetter的spanKind,返回RemoteInjector。(包含setter和InjectorFunction,InjectorFunction来源于定义defaultPropagationFactorySupplier的B3Propagation.Format.SINGLE_NO_PARENT)

则处理关系如下:


brave.http.HttpClientHandler.handleSend
	  brave.internal.propagation.InjectorFactory.RemoteInjector.inject
		    brave.propagation.B3Propagation.Format#inject

代码如下:

# brave-5.13.2.jar!/brave.propagation.B3Propagation.Format
    SINGLE_NO_PARENT() {
      @Override public List<String> keyNames() {
        return SINGLE_KEY_NAMES;
      }
      @Override public <R> void inject(Setter<R, String> setter, TraceContext context, R request) {
        setter.put(request, B3, writeB3SingleFormatWithoutParentId(context)); // 1. 从TraceContext取traceId、spanId和sampled合并为一个值,通过定义的RemoteSetter写到HTTP请求头"B3"
      }
    };

既然客户端发送时通过请求头"B3"来传递,那么服务端必然会有接收代码。

首先是TracingFilter在过滤时,HttpServerHandler处理服务端接收,委托给brave:

# brave-instrumentation-http-5.13.2.jar!/brave.http.HttpServerHandler
  public Span handleReceive(HttpServerRequest request) {
    Span span = nextSpan(defaultExtractor.extract(request), request); // 1. 从请求提取信息
    return handleStart(request, span);
  }
  
  Span nextSpan(TraceContextOrSamplingFlags extracted, HttpServerRequest request) {
    Boolean sampled = extracted.sampled();
    // only recreate the context if the http sampler made a decision
    if (sampled == null && (sampled = sampler.trySample(request)) != null) { // 2. 入口服务或者采样器返回结果不为空(sampler由httpTracing得来,需要开发者定义名为sleuthHttpServerSampler的SamplerFunction,否则使用SampleFunctions.deferDecision会返回null)
      extracted = extracted.sampled(sampled.booleanValue()); // 3. 添加采样标识
    }
    return extracted.context() != null
      ? tracer.joinSpan(extracted.context()) // 4. TraceContext存在时,且支持共享时使用之前的traceId、spanId,否则创建新的TraceContext、Span
      : tracer.nextSpan(extracted);
  }  

再看下请求头如何处理。defaultExtractor与injector类似,只不过是通过Propagation.extractor方法而已,参数为HttpServerRequest.GETTER。看B3Extractor如何处理:

# brave-5.13.2.jar!/brave.propagation.B3Propagation.B3Extractor
	@Override public TraceContextOrSamplingFlags extract(R request) {
      if (request == null) throw new NullPointerException("request == null");
      // try to extract single-header format
      String b3 = getter.get(request, B3); // 1. 获取HTTP请求头的"B3"值
      TraceContextOrSamplingFlags extracted = b3 != null ? parseB3SingleFormat(b3) : null; // 2. 解析B3值成TraceContext,再创建成TraceContextOrSamplingFlags(type为1)
      if (extracted != null) return extracted;
      // Start by looking at the sampled state as this is used regardless
      // Official sampled value is 1, though some old instrumentation send true
      String sampled = getter.get(request, SAMPLED);
      Boolean sampledV;
      if (sampled == null) {
        sampledV = null; // defer decision
      } else if (sampled.length() == 1) { // handle fast valid paths
        char sampledC = sampled.charAt(0);
        if (sampledC == '1') {
          sampledV = true;
        } else if (sampledC == '0') {
          sampledV = false;
        } else {
          Platform.get().log(SAMPLED_MALFORMED, sampled, null);
          return TraceContextOrSamplingFlags.EMPTY; // trace context is malformed so return empty
        }
      } else if (sampled.equalsIgnoreCase("true")) { // old clients
        sampledV = true;
      } else if (sampled.equalsIgnoreCase("false")) { // old clients
        sampledV = false;
      } else {
        Platform.get().log(SAMPLED_MALFORMED, sampled, null);
        return TraceContextOrSamplingFlags.EMPTY; // Restart trace instead of propagating false
      }
      // The only flag we action is 1, but it could be that any integer is present.
      // Here, we leniently parse as debug is not a primary consideration of the trace context.
      boolean debug = "1".equals(getter.get(request, FLAGS));
      String traceIdString = getter.get(request, TRACE_ID);
      // It is ok to go without a trace ID, if sampling or debug is set
      if (traceIdString == null) {
        if (debug) return TraceContextOrSamplingFlags.DEBUG;
        if (sampledV != null) {
          return sampledV
              ? TraceContextOrSamplingFlags.SAMPLED
              : TraceContextOrSamplingFlags.NOT_SAMPLED;
        }
      }
      // Try to parse the trace IDs into the context
      TraceContext.Builder result = TraceContext.newBuilder();
      if (result.parseTraceId(traceIdString, TRACE_ID)
          && result.parseSpanId(getter, request, SPAN_ID)
          && result.parseParentId(getter, request, PARENT_SPAN_ID)) {
        if (sampledV != null) result.sampled(sampledV.booleanValue());
        if (debug) result.debug(true);
        return TraceContextOrSamplingFlags.create(result.build());
      }
      return TraceContextOrSamplingFlags.EMPTY; // trace context is malformed so return empty
    }

以上便是RPC请求的处理,请求前判断是延续还是开启新的Trace,请求成功或异常时结束Span,如果过程中有新的请求发出,那么就从TraceContext获取信息,塞入到请求头。

对于过程中有发消息的,比如Kafka,类似的,是将TraceContext放入到ProducerRecord.headers;对于过程中有调用数据库的,比如使用hikari,将数据源包装成有多种事件监听的新数据源,在执行前后调用对应的事件监听方法;

对于线程池,只要被spring管理,通过ExecutorBeanPostProcessor,将Executor/ThreadPoolTaskExecutor/ScheduledExecutorService/ExecutorService/AsyncTaskExecutor包装成可跟踪的线程池,在提交/执行任务时,线程池将任务包装成TraceRunnable或TraceRunnable再真正提交/执行,TraceRunnable/TraceRunnable持有tracer和span等信息。

关于TraceRunnable代码如下:

# spring-cloud-sleuth-instrumentation-3.1.0.jar!/org.springframework.cloud.sleuth.instrument.async.TraceRunnable
	public void run() {
		Span childSpan = SleuthAsyncSpan.ASYNC_RUNNABLE_SPAN.wrap(this.tracer.nextSpan(this.parent)) // 1. 以提交线程池的线程里的Span作为父Span创建新的Span
				.name(this.spanName);
		try (Tracer.SpanInScope ws = this.tracer.withSpan(childSpan.start())) {
			this.delegate.run(); // 2. 任务执行
		}
		catch (Exception | Error e) {
			childSpan.error(e); // 3. 任务出现异常时,记录异常
			throw e;
		}
		finally {
			childSpan.end(); // 4. 任务正常结束
		}
	}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表