View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.MessageEvent;
21  import org.jboss.netty.channel.socket.Worker;
22  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
23  import org.jboss.netty.util.ThreadNameDeterminer;
24  import org.jboss.netty.util.ThreadRenamingRunnable;
25  
26  import java.io.IOException;
27  import java.nio.channels.AsynchronousCloseException;
28  import java.nio.channels.CancelledKeyException;
29  import java.nio.channels.ClosedChannelException;
30  import java.nio.channels.NotYetConnectedException;
31  import java.nio.channels.SelectionKey;
32  import java.nio.channels.Selector;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.Iterator;
35  import java.util.Queue;
36  import java.util.Set;
37  import java.util.concurrent.Executor;
38  
39  
40  import static org.jboss.netty.channel.Channels.*;
41  
42  abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
43  
44      protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
45  
46      AbstractNioWorker(Executor executor) {
47          super(executor);
48      }
49  
50      AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
51          super(executor, determiner);
52      }
53  
54      public void executeInIoThread(Runnable task) {
55          executeInIoThread(task, false);
56      }
57  
58      /**
59       * Execute the {@link Runnable} in a IO-Thread
60       *
61       * @param task
62       *            the {@link Runnable} to execute
63       * @param alwaysAsync
64       *            {@code true} if the {@link Runnable} should be executed
65       *            in an async fashion even if the current Thread == IO Thread
66       */
67      public void executeInIoThread(Runnable task, boolean alwaysAsync) {
68          if (!alwaysAsync && isIoThread()) {
69              task.run();
70          } else {
71              registerTask(task);
72          }
73      }
74  
75      @Override
76      protected void close(SelectionKey k) {
77          AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
78          close(ch, succeededFuture(ch));
79      }
80  
81      @Override
82      protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
83          return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
84      }
85  
86      @Override
87      public void run() {
88          super.run();
89          sendBufferPool.releaseExternalResources();
90      }
91  
92      @Override
93      protected void process(Selector selector) throws IOException {
94          Set<SelectionKey> selectedKeys = selector.selectedKeys();
95          // check if the set is empty and if so just return to not create garbage by
96          // creating a new Iterator every time even if there is nothing to process.
97          // See https://github.com/netty/netty/issues/597
98          if (selectedKeys.isEmpty()) {
99              return;
100         }
101         for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
102             SelectionKey k = i.next();
103             i.remove();
104             try {
105                 int readyOps = k.readyOps();
106                 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
107                     if (!read(k)) {
108                         // Connection already closed - no need to handle write.
109                         continue;
110                     }
111                 }
112                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
113                     writeFromSelectorLoop(k);
114                 }
115             } catch (CancelledKeyException e) {
116                 close(k);
117             }
118 
119             if (cleanUpCancelledKeys()) {
120                 break; // break the loop to avoid ConcurrentModificationException
121             }
122         }
123     }
124 
125     void writeFromUserCode(final AbstractNioChannel<?> channel) {
126         if (!channel.isConnected()) {
127             cleanUpWriteBuffer(channel);
128             return;
129         }
130 
131         if (scheduleWriteIfNecessary(channel)) {
132             return;
133         }
134 
135         // From here, we are sure Thread.currentThread() == workerThread.
136 
137         if (channel.writeSuspended) {
138             return;
139         }
140 
141         if (channel.inWriteNowLoop) {
142             return;
143         }
144 
145         write0(channel);
146     }
147 
148     void writeFromTaskLoop(AbstractNioChannel<?> ch) {
149         if (!ch.writeSuspended) {
150             write0(ch);
151         }
152     }
153 
154     void writeFromSelectorLoop(final SelectionKey k) {
155         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
156         ch.writeSuspended = false;
157         write0(ch);
158     }
159 
160     protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
161 
162     protected void write0(AbstractNioChannel<?> channel) {
163         boolean open = true;
164         boolean addOpWrite = false;
165         boolean removeOpWrite = false;
166         boolean iothread = isIoThread(channel);
167 
168         long writtenBytes = 0;
169 
170         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
171         final WritableByteChannel ch = channel.channel;
172         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
173         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
174         synchronized (channel.writeLock) {
175             channel.inWriteNowLoop = true;
176             for (;;) {
177 
178                 MessageEvent evt = channel.currentWriteEvent;
179                 SendBuffer buf = null;
180                 ChannelFuture future = null;
181                 try {
182                     if (evt == null) {
183                         if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
184                             removeOpWrite = true;
185                             channel.writeSuspended = false;
186                             break;
187                         }
188                         future = evt.getFuture();
189 
190                         channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
191                     } else {
192                         future = evt.getFuture();
193                         buf = channel.currentWriteBuffer;
194                     }
195 
196                     long localWrittenBytes = 0;
197                     for (int i = writeSpinCount; i > 0; i --) {
198                         localWrittenBytes = buf.transferTo(ch);
199                         if (localWrittenBytes != 0) {
200                             writtenBytes += localWrittenBytes;
201                             break;
202                         }
203                         if (buf.finished()) {
204                             break;
205                         }
206                     }
207 
208                     if (buf.finished()) {
209                         // Successful write - proceed to the next message.
210                         buf.release();
211                         channel.currentWriteEvent = null;
212                         channel.currentWriteBuffer = null;
213                         // Mark the event object for garbage collection.
214                         //noinspection UnusedAssignment
215                         evt = null;
216                         buf = null;
217                         future.setSuccess();
218                     } else {
219                         // Not written fully - perhaps the kernel buffer is full.
220                         addOpWrite = true;
221                         channel.writeSuspended = true;
222 
223                         if (localWrittenBytes > 0) {
224                             // Notify progress listeners if necessary.
225                             future.setProgress(
226                                     localWrittenBytes,
227                                     buf.writtenBytes(), buf.totalBytes());
228                         }
229                         break;
230                     }
231                 } catch (AsynchronousCloseException e) {
232                     // Doesn't need a user attention - ignore.
233                 } catch (Throwable t) {
234                     if (buf != null) {
235                         buf.release();
236                     }
237                     channel.currentWriteEvent = null;
238                     channel.currentWriteBuffer = null;
239                     // Mark the event object for garbage collection.
240                     //noinspection UnusedAssignment
241                     buf = null;
242                     //noinspection UnusedAssignment
243                     evt = null;
244                     if (future != null) {
245                         future.setFailure(t);
246                     }
247                     if (iothread) {
248                         fireExceptionCaught(channel, t);
249                     } else {
250                         fireExceptionCaughtLater(channel, t);
251                     }
252                     if (t instanceof IOException) {
253                         open = false;
254                         close(channel, succeededFuture(channel));
255                     }
256                 }
257             }
258             channel.inWriteNowLoop = false;
259 
260             // Initially, the following block was executed after releasing
261             // the writeLock, but there was a race condition, and it has to be
262             // executed before releasing the writeLock:
263             //
264             //     https://issues.jboss.org/browse/NETTY-410
265             //
266             if (open) {
267                 if (addOpWrite) {
268                     setOpWrite(channel);
269                 } else if (removeOpWrite) {
270                     clearOpWrite(channel);
271                 }
272             }
273         }
274         if (iothread) {
275             fireWriteComplete(channel, writtenBytes);
276         } else {
277             fireWriteCompleteLater(channel, writtenBytes);
278         }
279     }
280 
281     static boolean isIoThread(AbstractNioChannel<?> channel) {
282         return Thread.currentThread() == channel.worker.thread;
283     }
284 
285     protected void setOpWrite(AbstractNioChannel<?> channel) {
286         Selector selector = this.selector;
287         SelectionKey key = channel.channel.keyFor(selector);
288         if (key == null) {
289             return;
290         }
291         if (!key.isValid()) {
292             close(key);
293             return;
294         }
295 
296         int interestOps = channel.getRawInterestOps();
297         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
298             interestOps |= SelectionKey.OP_WRITE;
299             key.interestOps(interestOps);
300             channel.setRawInterestOpsNow(interestOps);
301         }
302     }
303 
304     protected void clearOpWrite(AbstractNioChannel<?> channel) {
305         Selector selector = this.selector;
306         SelectionKey key = channel.channel.keyFor(selector);
307         if (key == null) {
308             return;
309         }
310         if (!key.isValid()) {
311             close(key);
312             return;
313         }
314 
315         int interestOps = channel.getRawInterestOps();
316         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
317             interestOps &= ~SelectionKey.OP_WRITE;
318             key.interestOps(interestOps);
319             channel.setRawInterestOpsNow(interestOps);
320         }
321     }
322 
323     protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
324         boolean connected = channel.isConnected();
325         boolean bound = channel.isBound();
326         boolean iothread = isIoThread(channel);
327 
328         try {
329             channel.channel.close();
330             increaseCancelledKeys();
331 
332             if (channel.setClosed()) {
333                 future.setSuccess();
334                 if (connected) {
335                     if (iothread) {
336                         fireChannelDisconnected(channel);
337                     } else {
338                         fireChannelDisconnectedLater(channel);
339                     }
340                 }
341                 if (bound) {
342                     if (iothread) {
343                         fireChannelUnbound(channel);
344                     } else {
345                         fireChannelUnboundLater(channel);
346                     }
347                 }
348 
349                 cleanUpWriteBuffer(channel);
350                 if (iothread) {
351                     fireChannelClosed(channel);
352                 } else {
353                     fireChannelClosedLater(channel);
354                 }
355             } else {
356                 future.setSuccess();
357             }
358         } catch (Throwable t) {
359             future.setFailure(t);
360             if (iothread) {
361                 fireExceptionCaught(channel, t);
362             } else {
363                 fireExceptionCaughtLater(channel, t);
364             }
365         }
366     }
367 
368     protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
369         Exception cause = null;
370         boolean fireExceptionCaught = false;
371 
372         // Clean up the stale messages in the write buffer.
373         synchronized (channel.writeLock) {
374             MessageEvent evt = channel.currentWriteEvent;
375             if (evt != null) {
376                 // Create the exception only once to avoid the excessive overhead
377                 // caused by fillStackTrace.
378                 if (channel.isOpen()) {
379                     cause = new NotYetConnectedException();
380                 } else {
381                     cause = new ClosedChannelException();
382                 }
383 
384                 ChannelFuture future = evt.getFuture();
385                 if (channel.currentWriteBuffer != null) {
386                     channel.currentWriteBuffer.release();
387                     channel.currentWriteBuffer = null;
388                 }
389                 channel.currentWriteEvent = null;
390                 // Mark the event object for garbage collection.
391                 //noinspection UnusedAssignment
392                 evt = null;
393                 future.setFailure(cause);
394                 fireExceptionCaught = true;
395             }
396 
397             Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
398             for (;;) {
399                 evt = writeBuffer.poll();
400                 if (evt == null) {
401                     break;
402                 }
403                 // Create the exception only once to avoid the excessive overhead
404                 // caused by fillStackTrace.
405                 if (cause == null) {
406                     if (channel.isOpen()) {
407                         cause = new NotYetConnectedException();
408                     } else {
409                         cause = new ClosedChannelException();
410                     }
411                     fireExceptionCaught = true;
412                 }
413                 evt.getFuture().setFailure(cause);
414             }
415         }
416 
417         if (fireExceptionCaught) {
418             if (isIoThread(channel)) {
419                 fireExceptionCaught(channel, cause);
420             } else {
421                 fireExceptionCaughtLater(channel, cause);
422             }
423         }
424     }
425 
426     void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
427         boolean iothread = isIoThread(channel);
428         if (!iothread) {
429             channel.getPipeline().execute(new Runnable() {
430                 public void run() {
431                     setInterestOps(channel, future, interestOps);
432                 }
433             });
434             return;
435         }
436 
437         boolean changed = false;
438         try {
439             Selector selector = this.selector;
440             SelectionKey key = channel.channel.keyFor(selector);
441 
442             // Override OP_WRITE flag - a user cannot change this flag.
443             int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
444 
445             if (key == null || selector == null) {
446                 if (channel.getRawInterestOps() != newInterestOps) {
447                     changed = true;
448                 }
449 
450                 // Not registered to the worker yet.
451                 // Set the rawInterestOps immediately; RegisterTask will pick it up.
452                 channel.setRawInterestOpsNow(newInterestOps);
453 
454                 future.setSuccess();
455                 if (changed) {
456                     if (iothread) {
457                         fireChannelInterestChanged(channel);
458                     } else {
459                         fireChannelInterestChangedLater(channel);
460                     }
461                 }
462 
463                 return;
464             }
465 
466             if (channel.getRawInterestOps() != newInterestOps) {
467                 key.interestOps(newInterestOps);
468                 if (Thread.currentThread() != thread &&
469                     wakenUp.compareAndSet(false, true)) {
470                     selector.wakeup();
471                 }
472                 channel.setRawInterestOpsNow(newInterestOps);
473             }
474 
475             future.setSuccess();
476             if (changed) {
477                 fireChannelInterestChanged(channel);
478             }
479         } catch (CancelledKeyException e) {
480             // setInterestOps() was called on a closed channel.
481             ClosedChannelException cce = new ClosedChannelException();
482             future.setFailure(cce);
483             fireExceptionCaught(channel, cce);
484         } catch (Throwable t) {
485             future.setFailure(t);
486             fireExceptionCaught(channel, t);
487         }
488     }
489 
490     /**
491      * Read is called when a Selector has been notified that the underlying channel
492      * was something to be read. The channel would previously have registered its interest
493      * in read operations.
494      *
495      * @param k The selection key which contains the Selector registration information.
496      */
497     protected abstract boolean read(SelectionKey k);
498 
499 }