// Start the server. ChannelFuturef= b.bind(PORT).sync();
// Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
children = newEventExecutor[nEventExecutors]; if (isPowerOfTwo(children.length)) { chooser = newPowerOfTwoEventExecutorChooser(); } else { chooser = newGenericEventExecutorChooser(); }
for (inti=0; i < nEventExecutors; i ++) { booleansuccess=false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type thrownewIllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (intj=0; j < i; j ++) { children[j].shutdownGracefully(); }
for (intj=0; j < i; j ++) { EventExecutore= children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
finalbooleanshutdownExecutor0= shutdownExecutor; finalExecutorexecutor0= executor; final FutureListener<Object> terminationListener = newFutureListener<Object>() { @Override publicvoidoperationComplete(Future<Object> future)throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); if (shutdownExecutor0) { // This cast is correct because shutdownExecutor0 is only try if // executor0 is of type ExecutorService. ((ExecutorService) executor0).shutdown(); } } } };
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
// Ensure the current selector implementation is what we can instrument. if (!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; }
public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { thrownewChannelException("Unable to create Channel from class " + clazz, t); } }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromisepromise= channel.newPromise(); //重要方法2,tag2 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. finalPendingRegistrationPromisepromise=newPendingRegistrationPromise(channel); regFuture.addListener(newChannelFutureListener() { @Override publicvoidoperationComplete(ChannelFuture future)throws Exception { Throwablecause= future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
final ChannelFuture initAndRegister() { finalChannelchannel= channelFactory().newChannel(); //创建Channel实例,tag1.1 try { //初始化channel,tag1.2 init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor returnnewDefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); }
//把channel注册到selector上,tag1.3 ChannelFutureregFuture= group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
privatestatic ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { thrownewChannelException( "Failed to open a server socket.", e); } }
接着继续调用父类AbstractNioChannel的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
protectedAbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } }
thrownewChannelException("Failed to enter non-blocking mode.", e); } }
privatevoidregister0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } booleanfirstRegistration= neverRegistered; //重要代码1 doRegister(); neverRegistered = false; registered = true; eventLoop.acceptNewTasks(); safeSetSuccess(promise); // 重要代码2 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
@Override protectedvoiddoRegister()throws Exception { booleanselected=false; for (;;) { try { selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. ((NioEventLoop) eventLoop().unwrap()).selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }