protectedvoidimplRegister(SelectionKeyImpl ski){ synchronized (closeLock) { if (closed) thrownew ClosedSelectorException();
// Check to see if the array is large enough if (channelArray.length == totalChannels) { // Make a larger array int newSize = pollWrapper.totalChannels * 2; SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; // Copy over for (int i=channelOffset; i<totalChannels; i++) temp[i] = channelArray[i]; channelArray = temp; // Grow the NativeObject poll array pollWrapper.grow(newSize); } channelArray[totalChannels] = ski; ski.setIndex(totalChannels); pollWrapper.addEntry(ski.channel); totalChannels++; keys.add(ski); // 注解@1 } }
/** * Copy the information in the pollfd structs into the opss * of the corresponding Channels. Add the ready keys to the * ready queue. */ protectedintupdateSelectedKeys(){ int numKeysUpdated = 0; // Skip zeroth entry; it is for interrupts only for (int i=channelOffset; i<totalChannels; i++) { // 获取通道就绪操作类型(可读、可写、错误等) int rOps = pollWrapper.getReventOps(i); if (rOps != 0) { SelectionKeyImpl sk = channelArray[i]; pollWrapper.putReventOps(i, 0); if (selectedKeys.contains(sk)) { // 将ReventOps就绪的操作类型转换到SelectionKeyImpl if (sk.channel.translateAndSetReadyOps(rOps, sk)) { numKeysUpdated++; } } else { sk.channel.translateAndSetReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); // 注解@4 numKeysUpdated++; } } } } return numKeysUpdated; }
privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // ... try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 注解@13 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 注解@14 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 注解@15 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }