分布式调用追踪系统Zipkin集成

首先给出demo地址,有问题也可以直接看代码。https://github.com/licheng-xd/dubbo-sleuth

准备

sleuth是springcloud对zipkin接入的封装,从2.0开始直接采用了brave提供的zipkin接入实现。

由于sleuth对dubbo的支持是从2.0开始的,所以一咬牙直接把整个项目升级成了springboot2.0,springboot2.0和springboot1.x有很多不兼容的地方,这里就不一一细说了,同时springboot升级到2.0以后,dubbo也必须要从0.1.0升级到0.2.0(Apache旗下的dubbo版本),整个过踩坑踩了一个星期。

因为springcloud的微服务框架是基于http构建的,所以sleuth默认是只支持http。在2.0版本中提供了对dubbo的支持,其实看一下源码就知道,就只是一个DubboFilter。后面实现自己的rpc接入sleuth就是参考了它的实现。zipkin本身支持多种collector和storage,默认采用异步http的collector,存储默认在内存中。考虑到对应用本身的性能影响,我们采用kafka来做collector,最大程度的解耦以及减少性能影响。存储选用es,用mysql的话当数据量较大时会影响zipkin-server的查询速度。

本地测试环境部署:
本地采用vagrant管理开发环境,在debian的虚拟机中装好,zookeeper,kafka,es,其中es建议用docker镜像直接运行,不需要修改很多系统参数。

启动zipkinserver,采用kafka-collector和es-storage。

我本地的vagrant虚拟机ip是192.168.100.101,首先下载zipkin-server,启动命令:

1
KAFKA_BOOTSTRAP_SERVERS=192.168.100.101:9092 STORAGE_TYPE=elasticsearch ES_HOSTS=http://192.168.100.101:9200  java -jar zipkin-server-2.9.4-exec.jar

项目添加sleuth依赖

由于采用kafka作为collector所以需要原来spring-kafka,同时添加了brave-dubbo的依赖。

1
2
3
4
5
6
7
8
9
10
// sleuth
compile "org.springframework.kafka:spring-kafka:2.1.7.RELEASE"
compile "org.springframework.cloud:spring-cloud-starter-sleuth:${SleuthVersion}"
compile "org.springframework.cloud:spring-cloud-starter-zipkin:${SleuthVersion}"
compile "org.springframework.cloud:spring-cloud-sleuth-core:${SleuthVersion}"
compile "org.springframework.cloud:spring-cloud-sleuth-zipkin:${SleuthVersion}"
compile "org.springframework.cloud:spring-cloud-sleuth-dependencies:${SleuthVersion}"
compile ("io.zipkin.brave:brave-instrumentation-dubbo-rpc:5.1.0") {
exclude(group: 'com.alibaba', module: 'dubbo')
}

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
application:
name: nim-bima-service-user
zipkin:
sender:
type: kafka
sleuth:
enabled: true
traceId128: true
sampler:
probability: 1.0
kafka:
bootstrap-servers: xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092

dubbo服务和消费端都需要加上对应的tracing filter

1
2
3
4
5
dubbo:
provider:
filter: 'tracing'
consumer:
filter: 'tracing'

扩展

自研RPC通过filter接入sleuth追踪。filter逻辑的核心在于filterchain的实现,而filterchain的核心逻辑其实就是一个链式的递归调用:

1
2
3
4
5
6
7
8
if (this.currentPosition == this.realFilters.size()) {
invocation.invoke();
}
else {
this.currentPosition++;
NrpcFilter nextFilter = this.realFilters.get(this.currentPosition - 1);
nextFilter.doFilter(invocation, this);
}

然后通过实现filter来加入追踪功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@FilterActivate
public class TracingNrpcFilter implements NrpcFilter {
private static final Logger logger = LoggerFactory.getLogger(TracingNrpcFilter.class);

private Tracer tracer;

private TraceContext.Injector<Map<String, String>> injector;

@Autowired
public void setTracing(Tracing tracing) {
tracer = tracing.tracer();
injector = tracing.propagation().injector(SETTER);
}

@Override
public void doFilter(TaskInvocation invocation, NrpcFilterChain filterChain) throws Exception {
if (tracer == null) {
filterChain.doFilter(invocation, filterChain);
return;
}

logger.debug("NrpcTracingFilter hit ...");
String className = invocation.getTask().getMethod().getDeclaringClass().getSimpleName();
String methodName = invocation.getTask().getMethod().getName();
final Span span = tracer.nextSpan();
injector.inject(span.context(), invocation.getAttachments());
if (!span.isNoop()) {
span.kind(Span.Kind.SERVER).start();
span.name(className + "/" + methodName);
InetSocketAddress remoteAddress = invocation.getRemoteAddress();
Endpoint.Builder remoteEndpoint = Endpoint.newBuilder().port(remoteAddress.getPort());
if (!remoteEndpoint.parseIp(remoteAddress.getAddress())) {
remoteEndpoint.parseIp(remoteAddress.getHostName());
}
span.remoteEndpoint(remoteEndpoint.build());
}
String traceid = span.context().traceIdString();
DubboTraceFilter.initTraceid(traceid);
invocation.getTask().setTraceid(traceid);
try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)) {
filterChain.doFilter(invocation, filterChain);
} catch (Error | RuntimeException e) {
onError(e, span);
throw e;
} finally {
span.finish();
}
}

static void onError(Throwable error, Span span) {
span.error(error);
if (error instanceof RpcException) {
span.tag("nrpc.error_code", Integer.toString(((RpcException) error).getCode()));
}
}

static final Propagation.Setter<Map<String, String>, String> SETTER =
new Propagation.Setter<Map<String, String>, String>() {
@Override
public void put(Map<String, String> carrier, String key, String value) {
carrier.put(key, value);
}

@Override
public String toString() {
return "Map::set";
}
};
}

小技巧:通过tracer.currentSpan().context().traceIdString();把tracing中的traceid拿出来放到业务日志中,结合elk日志收集可以非常容易定位问题。

结果

可以从zipkin的数据分析中清楚的看出来整个请求中的rpc调用链,以及每次rpc消耗的时间。

依赖分析

你会发现依赖分析页面没有数据。对了,只有当数据在内存中存储的时候,zipkin-server才能直接分析出调用依赖关系。当采用外部存储的时候,就需要单独的依赖分析来实现了。下载zipkin-dependencies.jar并在crontab中创建定时任务,定时对存储数据进行分析,这样就可以看到依赖关系了。

奇怪的小问题

  1. 由于引入了springcloud-sleuth,而springcloud会在进程启动时启动一个自己的bootstrap context,作为当前应用ApplicationContext的父ApplicationContext,由于每初始化一个ApplicationContext就会加载一遍spring.factories配置文件中的ApplicationListener,所以配置在spring.factories中的listener都会被执行两次。因此dubbo的WelcomeLogoListener会被执行两次。

  2. 使用feign作为http客户端时,build传入的client必须是spring的bean,否则sleuth无法做拦截。