1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.MessageEvent;
21 import org.jboss.netty.channel.socket.Worker;
22 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25
26 import java.io.IOException;
27 import java.nio.channels.AsynchronousCloseException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.NotYetConnectedException;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.Iterator;
35 import java.util.Queue;
36 import java.util.Set;
37 import java.util.concurrent.Executor;
38
39
40 import static org.jboss.netty.channel.Channels.*;
41
42 abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
43
44 protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
45
46 AbstractNioWorker(Executor executor) {
47 super(executor);
48 }
49
50 AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
51 super(executor, determiner);
52 }
53
54 public void executeInIoThread(Runnable task) {
55 executeInIoThread(task, false);
56 }
57
58
59
60
61
62
63
64
65
66
67 public void executeInIoThread(Runnable task, boolean alwaysAsync) {
68 if (!alwaysAsync && isIoThread()) {
69 task.run();
70 } else {
71 registerTask(task);
72 }
73 }
74
75 @Override
76 protected void close(SelectionKey k) {
77 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
78 close(ch, succeededFuture(ch));
79 }
80
81 @Override
82 protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
83 return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
84 }
85
86 @Override
87 public void run() {
88 super.run();
89 sendBufferPool.releaseExternalResources();
90 }
91
92 @Override
93 protected void process(Selector selector) throws IOException {
94 Set<SelectionKey> selectedKeys = selector.selectedKeys();
95
96
97
98 if (selectedKeys.isEmpty()) {
99 return;
100 }
101 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
102 SelectionKey k = i.next();
103 i.remove();
104 try {
105 int readyOps = k.readyOps();
106 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
107 if (!read(k)) {
108
109 continue;
110 }
111 }
112 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
113 writeFromSelectorLoop(k);
114 }
115 } catch (CancelledKeyException e) {
116 close(k);
117 }
118
119 if (cleanUpCancelledKeys()) {
120 break;
121 }
122 }
123 }
124
125 void writeFromUserCode(final AbstractNioChannel<?> channel) {
126 if (!channel.isConnected()) {
127 cleanUpWriteBuffer(channel);
128 return;
129 }
130
131 if (scheduleWriteIfNecessary(channel)) {
132 return;
133 }
134
135
136
137 if (channel.writeSuspended) {
138 return;
139 }
140
141 if (channel.inWriteNowLoop) {
142 return;
143 }
144
145 write0(channel);
146 }
147
148 void writeFromTaskLoop(AbstractNioChannel<?> ch) {
149 if (!ch.writeSuspended) {
150 write0(ch);
151 }
152 }
153
154 void writeFromSelectorLoop(final SelectionKey k) {
155 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
156 ch.writeSuspended = false;
157 write0(ch);
158 }
159
160 protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
161
162 protected void write0(AbstractNioChannel<?> channel) {
163 boolean open = true;
164 boolean addOpWrite = false;
165 boolean removeOpWrite = false;
166 boolean iothread = isIoThread(channel);
167
168 long writtenBytes = 0;
169
170 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
171 final WritableByteChannel ch = channel.channel;
172 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
173 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
174 synchronized (channel.writeLock) {
175 channel.inWriteNowLoop = true;
176 for (;;) {
177
178 MessageEvent evt = channel.currentWriteEvent;
179 SendBuffer buf = null;
180 ChannelFuture future = null;
181 try {
182 if (evt == null) {
183 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
184 removeOpWrite = true;
185 channel.writeSuspended = false;
186 break;
187 }
188 future = evt.getFuture();
189
190 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
191 } else {
192 future = evt.getFuture();
193 buf = channel.currentWriteBuffer;
194 }
195
196 long localWrittenBytes = 0;
197 for (int i = writeSpinCount; i > 0; i --) {
198 localWrittenBytes = buf.transferTo(ch);
199 if (localWrittenBytes != 0) {
200 writtenBytes += localWrittenBytes;
201 break;
202 }
203 if (buf.finished()) {
204 break;
205 }
206 }
207
208 if (buf.finished()) {
209
210 buf.release();
211 channel.currentWriteEvent = null;
212 channel.currentWriteBuffer = null;
213
214
215 evt = null;
216 buf = null;
217 future.setSuccess();
218 } else {
219
220 addOpWrite = true;
221 channel.writeSuspended = true;
222
223 if (localWrittenBytes > 0) {
224
225 future.setProgress(
226 localWrittenBytes,
227 buf.writtenBytes(), buf.totalBytes());
228 }
229 break;
230 }
231 } catch (AsynchronousCloseException e) {
232
233 } catch (Throwable t) {
234 if (buf != null) {
235 buf.release();
236 }
237 channel.currentWriteEvent = null;
238 channel.currentWriteBuffer = null;
239
240
241 buf = null;
242
243 evt = null;
244 if (future != null) {
245 future.setFailure(t);
246 }
247 if (iothread) {
248 fireExceptionCaught(channel, t);
249 } else {
250 fireExceptionCaughtLater(channel, t);
251 }
252 if (t instanceof IOException) {
253 open = false;
254 close(channel, succeededFuture(channel));
255 }
256 }
257 }
258 channel.inWriteNowLoop = false;
259
260
261
262
263
264
265
266 if (open) {
267 if (addOpWrite) {
268 setOpWrite(channel);
269 } else if (removeOpWrite) {
270 clearOpWrite(channel);
271 }
272 }
273 }
274 if (iothread) {
275 fireWriteComplete(channel, writtenBytes);
276 } else {
277 fireWriteCompleteLater(channel, writtenBytes);
278 }
279 }
280
281 static boolean isIoThread(AbstractNioChannel<?> channel) {
282 return Thread.currentThread() == channel.worker.thread;
283 }
284
285 protected void setOpWrite(AbstractNioChannel<?> channel) {
286 Selector selector = this.selector;
287 SelectionKey key = channel.channel.keyFor(selector);
288 if (key == null) {
289 return;
290 }
291 if (!key.isValid()) {
292 close(key);
293 return;
294 }
295
296 int interestOps = channel.getRawInterestOps();
297 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
298 interestOps |= SelectionKey.OP_WRITE;
299 key.interestOps(interestOps);
300 channel.setRawInterestOpsNow(interestOps);
301 }
302 }
303
304 protected void clearOpWrite(AbstractNioChannel<?> channel) {
305 Selector selector = this.selector;
306 SelectionKey key = channel.channel.keyFor(selector);
307 if (key == null) {
308 return;
309 }
310 if (!key.isValid()) {
311 close(key);
312 return;
313 }
314
315 int interestOps = channel.getRawInterestOps();
316 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
317 interestOps &= ~SelectionKey.OP_WRITE;
318 key.interestOps(interestOps);
319 channel.setRawInterestOpsNow(interestOps);
320 }
321 }
322
323 protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
324 boolean connected = channel.isConnected();
325 boolean bound = channel.isBound();
326 boolean iothread = isIoThread(channel);
327
328 try {
329 channel.channel.close();
330 increaseCancelledKeys();
331
332 if (channel.setClosed()) {
333 future.setSuccess();
334 if (connected) {
335 if (iothread) {
336 fireChannelDisconnected(channel);
337 } else {
338 fireChannelDisconnectedLater(channel);
339 }
340 }
341 if (bound) {
342 if (iothread) {
343 fireChannelUnbound(channel);
344 } else {
345 fireChannelUnboundLater(channel);
346 }
347 }
348
349 cleanUpWriteBuffer(channel);
350 if (iothread) {
351 fireChannelClosed(channel);
352 } else {
353 fireChannelClosedLater(channel);
354 }
355 } else {
356 future.setSuccess();
357 }
358 } catch (Throwable t) {
359 future.setFailure(t);
360 if (iothread) {
361 fireExceptionCaught(channel, t);
362 } else {
363 fireExceptionCaughtLater(channel, t);
364 }
365 }
366 }
367
368 protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
369 Exception cause = null;
370 boolean fireExceptionCaught = false;
371
372
373 synchronized (channel.writeLock) {
374 MessageEvent evt = channel.currentWriteEvent;
375 if (evt != null) {
376
377
378 if (channel.isOpen()) {
379 cause = new NotYetConnectedException();
380 } else {
381 cause = new ClosedChannelException();
382 }
383
384 ChannelFuture future = evt.getFuture();
385 if (channel.currentWriteBuffer != null) {
386 channel.currentWriteBuffer.release();
387 channel.currentWriteBuffer = null;
388 }
389 channel.currentWriteEvent = null;
390
391
392 evt = null;
393 future.setFailure(cause);
394 fireExceptionCaught = true;
395 }
396
397 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
398 for (;;) {
399 evt = writeBuffer.poll();
400 if (evt == null) {
401 break;
402 }
403
404
405 if (cause == null) {
406 if (channel.isOpen()) {
407 cause = new NotYetConnectedException();
408 } else {
409 cause = new ClosedChannelException();
410 }
411 fireExceptionCaught = true;
412 }
413 evt.getFuture().setFailure(cause);
414 }
415 }
416
417 if (fireExceptionCaught) {
418 if (isIoThread(channel)) {
419 fireExceptionCaught(channel, cause);
420 } else {
421 fireExceptionCaughtLater(channel, cause);
422 }
423 }
424 }
425
426 void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
427 boolean iothread = isIoThread(channel);
428 if (!iothread) {
429 channel.getPipeline().execute(new Runnable() {
430 public void run() {
431 setInterestOps(channel, future, interestOps);
432 }
433 });
434 return;
435 }
436
437 boolean changed = false;
438 try {
439 Selector selector = this.selector;
440 SelectionKey key = channel.channel.keyFor(selector);
441
442
443 int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
444
445 if (key == null || selector == null) {
446 if (channel.getRawInterestOps() != newInterestOps) {
447 changed = true;
448 }
449
450
451
452 channel.setRawInterestOpsNow(newInterestOps);
453
454 future.setSuccess();
455 if (changed) {
456 if (iothread) {
457 fireChannelInterestChanged(channel);
458 } else {
459 fireChannelInterestChangedLater(channel);
460 }
461 }
462
463 return;
464 }
465
466 if (channel.getRawInterestOps() != newInterestOps) {
467 key.interestOps(newInterestOps);
468 if (Thread.currentThread() != thread &&
469 wakenUp.compareAndSet(false, true)) {
470 selector.wakeup();
471 }
472 channel.setRawInterestOpsNow(newInterestOps);
473 }
474
475 future.setSuccess();
476 if (changed) {
477 fireChannelInterestChanged(channel);
478 }
479 } catch (CancelledKeyException e) {
480
481 ClosedChannelException cce = new ClosedChannelException();
482 future.setFailure(cce);
483 fireExceptionCaught(channel, cce);
484 } catch (Throwable t) {
485 future.setFailure(t);
486 fireExceptionCaught(channel, t);
487 }
488 }
489
490
491
492
493
494
495
496
497 protected abstract boolean read(SelectionKey k);
498
499 }