Netty源码分析·贰

接上文

上文我们分析了initAndRegister的过程,现在接着分析tag2的doBind0过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

channel.bind(localAddress, promise)内部执行了如下代码:

1
2
3
4
5
6
7
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

ChannelPipeline

在上文中我们还留有pipeline.fireChannelRegistered()没有分析。接下来就介绍ChannelPipeline这个重要的类。DefaultChannelPipeline是ChannelPipeline的默认实现类,内部维护了final AbstractChannelHandlerContext head和final AbstractChannelHandlerContext tail两个指针,分别指向链表的头尾。而AbstractChannelHandlerContext本身是一个双向链表结构,volatile AbstractChannelHandlerContext next和volatile AbstractChannelHandlerContext prev分别为每个节点的双向指针。DefaultChannelHandlerContext是AbstractChannelHandlerContext的默认实现,其中包含一个ChannelHandler属性。这就是一个典型的Intercepting Filter模式。好了,现在我们回头再看一次ServerBootstrap.init()方法。它对NioServerSocketChannel的ChannelPipeline执行了两次addLast()操作:

1
2
3
4
5
6
7
8
9
10
11
12
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

第一次添加的是ServerBootstrap的handler,第二次添加的是匿名类ChannelInitializer的实现。所以此时负责监听的NioServerSocketChannel的职责链如下:HEAD->AbstractServerBootstrap$handler->ServerBootstrap$1->TAIL。

现在我们再来看pipeline.fireChannelRegistered()方法,它依次执行了下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}

public ChannelHandlerContext fireChannelRegistered() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRegistered(next);
return this;
}

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
return ctx;
}

public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);
}
});
}
}

public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
try {
ctx.handler().channelRegistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}

我们知道handler链是HEAD->AbstractServerBootstrap$handler->ServerBootstrap$1->TAIL。findContextInbound()方法返回的next就是ServerBootstrap$handler,找到的第一个没有被注解为Skip的handler返回。这里Skip注解了的方法表示职责链中碰到该方法时跳过。在构造方法中会通过skipFlags()方法初始化skipFlags。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int skipFlags(ChannelHandler handler) {
WeakHashMap<Class<?>, Integer> cache = skipFlagsCache.get();
Class<? extends ChannelHandler> handlerType = handler.getClass();
int flagsVal;
Integer flags = cache.get(handlerType);
if (flags != null) {
flagsVal = flags;
} else {
flagsVal = skipFlags0(handlerType);
cache.put(handlerType, Integer.valueOf(flagsVal));
}

return flagsVal;
}

在EchoServer对应的职责链变成HEAD->LoggingHandler->ServerBootstrap$1->TAIL,所以找到的第一个handler是LoggingHandler,执行如下方法:

1
2
3
4
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log(Event.REGISTERED);
ctx.fireChannelRegistered();
}

记录日志之后,该方法会继续调用ctx.fireChannelRegistered(),所以重复上面的过程,找到职责链中的下一个handler为匿名类ServerBootstrap$1。
由于匿名类继承于ChannelInitializer,所以执行ChannelInitializer.channelRegistered()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}

方法中回调了ServerBootstrap$1中的initChannel()方法:

1
2
3
4
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}

该方法中把ServerBootstrapAcceptor添加到pipeline中,所以此时职责链变成了HEAD->LoggingHandler->ServerBootstrap$1->ServerBootstrapAcceptor->TAIL。接着pipeline.remove(this)语句把自己从pipeline中删掉了,所以职责链变成HEAD->LoggingHandler->ServerBootstrapAcceptor->TAIL。接着继续往下找,找到TAIL这个handler,并执行对应的channelRegistered方法。至此pipeline.fireChannelRegistered()方法完成。

doBind

分析完pipeline,我们再来看pipeline.bind(localAddress, promise)方法,它最终调用了tail.bind(localAddress, promise)方法,注意是tail不是head,bind操作在职责链中是反向进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeBind(next, localAddress, promise);
return promise;
}

public void invokeBind(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}

if (executor.inEventLoop()) {
invokeBindNow(ctx, localAddress, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeBindNow(ctx, localAddress, promise);
}
}, promise);
}
}

public static void invokeBindNow(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
try {
ctx.handler().bind(ctx, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

根据上面的分析,EchoServer的职责链完成register之后是这样的:HEAD->LoggingHandler->ServerBootstrapAcceptor->TAIL。从TAIL向前,执行每个handler的bind方法。实际最后只执行了head中的bind方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}

boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

其中真正执行bind操作的在doBind方法中:

1
2
3
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}

绑定端口后执行invokeLater方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void invokeLater(Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().unwrap().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
}

该方法只是把task加入到EventLoopGroup的队列中,并不会马上执行,需要等到safeSetSuccess(promise)执行完后的下个循环才会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}

public boolean trySuccess() {
return trySuccess(null);
}

public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}

private boolean setSuccess0(V result) {
if (isDone()) {
return false;
}

synchronized (this) {
// Allow only once.
if (isDone()) {
return false;
}
if (result == null) {
this.result = SUCCESS;
} else {
this.result = result;
}
if (hasWaiters()) {
notifyAll();
}
}
return true;
}

safeSetSuccess把状态设为成功以后,执行notifyListeners()方法,由于此时listeners为null,所以直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
private void notifyListeners() {
// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.
// Hence any listener list modification happens-before this method.
// 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener()

Object listeners = this.listeners;
if (listeners == null) {
return;
}
// 省略
}

设置成功状态后,开始执行上面invokeLater中的task,也就是pipeline.fireChannelActive()

1
2
3
4
5
6
7
8
9
public ChannelPipeline fireChannelActive() {
head.fireChannelActive(); // 执行每个handler的channelActive

if (channel.config().isAutoRead()) {
channel.read();
}

return this;
}

由于channel.config().isAutoRead()默认返回true,所以执行channel.read()。

1
2
3
4
public Channel read() {
pipeline.read();
return this;
}

同样的,pipeline从后往前执行每个handler的read()方法,最后执行到head.read()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

public final void beginRead() {
if (!isActive()) {
return;
}

try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}

final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

在NioServerSocketChannel初始化时,我们指定了readInterestOp为SelectionKey.OP_ACCEPT,也就是16。最后执行selectionKey.interestOps(interestOps | readInterestOp),等同于selectionKey.interestOps(SelectionKey.OP_ACCEPT),也就是向selector注册了accept事件的监听。至此bind操作完成,接着来看下一步sync()。

1
2
3
4
5
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}

由于setSafeSuccess中已经吧状态设置为成功,所以await()方法会直接返回。到此ChannelFuture f = b.bind(PORT).sync()完成。最后一步f.channel().closeFuture().sync(),而由于closeFuture这个属性的执行结果一直没有赋值,所以一直处于wait状态。至此,主线程处于wait状态,并通过子线程无限循环,来完成客户端请求。

小结

  • 通过channel()方法设置不同类型的Socket。通过childHandler()设置SocketChannel的职责链。
  • bind()不同于JavaSocket的bind,主要完成initAndRegister()和doBind0()过程。
  • initAndRegister中主要可以分为三个步骤:createChannel(), init(channel), channel.unsafe().register()。
  • doBind0主要完成javaChannel().register(eventLoop().selector, 0, this)功能,并触发channelActive()事件,设置selectionKey.interestOps(SelectionKey.OP_ACCEPT)。

待续