1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.RejectedExecutionException;
28
29
30
31
32
33
34 public class DefaultChannelPipeline implements ChannelPipeline {
35
36 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
37 static final ChannelSink discardingSink = new DiscardingChannelSink();
38
39 private volatile Channel channel;
40 private volatile ChannelSink sink;
41 private volatile DefaultChannelHandlerContext head;
42 private volatile DefaultChannelHandlerContext tail;
43 private final Map<String, DefaultChannelHandlerContext> name2ctx =
44 new HashMap<String, DefaultChannelHandlerContext>(4);
45
46 public Channel getChannel() {
47 return channel;
48 }
49
50 public ChannelSink getSink() {
51 ChannelSink sink = this.sink;
52 if (sink == null) {
53 return discardingSink;
54 }
55 return sink;
56 }
57
58 public void attach(Channel channel, ChannelSink sink) {
59 if (channel == null) {
60 throw new NullPointerException("channel");
61 }
62 if (sink == null) {
63 throw new NullPointerException("sink");
64 }
65 if (this.channel != null || this.sink != null) {
66 throw new IllegalStateException("attached already");
67 }
68 this.channel = channel;
69 this.sink = sink;
70 }
71
72 public boolean isAttached() {
73 return sink != null;
74 }
75
76 public synchronized void addFirst(String name, ChannelHandler handler) {
77 if (name2ctx.isEmpty()) {
78 init(name, handler);
79 } else {
80 checkDuplicateName(name);
81 DefaultChannelHandlerContext oldHead = head;
82 DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
83
84 callBeforeAdd(newHead);
85
86 oldHead.prev = newHead;
87 head = newHead;
88 name2ctx.put(name, newHead);
89
90 callAfterAdd(newHead);
91 }
92 }
93
94 public synchronized void addLast(String name, ChannelHandler handler) {
95 if (name2ctx.isEmpty()) {
96 init(name, handler);
97 } else {
98 checkDuplicateName(name);
99 DefaultChannelHandlerContext oldTail = tail;
100 DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
101
102 callBeforeAdd(newTail);
103
104 oldTail.next = newTail;
105 tail = newTail;
106 name2ctx.put(name, newTail);
107
108 callAfterAdd(newTail);
109 }
110 }
111
112 public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
113 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
114 if (ctx == head) {
115 addFirst(name, handler);
116 } else {
117 checkDuplicateName(name);
118 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
119
120 callBeforeAdd(newCtx);
121
122 ctx.prev.next = newCtx;
123 ctx.prev = newCtx;
124 name2ctx.put(name, newCtx);
125
126 callAfterAdd(newCtx);
127 }
128 }
129
130 public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
131 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
132 if (ctx == tail) {
133 addLast(name, handler);
134 } else {
135 checkDuplicateName(name);
136 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
137
138 callBeforeAdd(newCtx);
139
140 ctx.next.prev = newCtx;
141 ctx.next = newCtx;
142 name2ctx.put(name, newCtx);
143
144 callAfterAdd(newCtx);
145 }
146 }
147
148 public synchronized void remove(ChannelHandler handler) {
149 remove(getContextOrDie(handler));
150 }
151
152 public synchronized ChannelHandler remove(String name) {
153 return remove(getContextOrDie(name)).getHandler();
154 }
155
156 @SuppressWarnings("unchecked")
157 public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
158 return (T) remove(getContextOrDie(handlerType)).getHandler();
159 }
160
161 private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
162 if (head == tail) {
163 head = tail = null;
164 name2ctx.clear();
165 } else if (ctx == head) {
166 removeFirst();
167 } else if (ctx == tail) {
168 removeLast();
169 } else {
170 callBeforeRemove(ctx);
171
172 DefaultChannelHandlerContext prev = ctx.prev;
173 DefaultChannelHandlerContext next = ctx.next;
174 prev.next = next;
175 next.prev = prev;
176 name2ctx.remove(ctx.getName());
177
178 callAfterRemove(ctx);
179 }
180 return ctx;
181 }
182
183 public synchronized ChannelHandler removeFirst() {
184 if (name2ctx.isEmpty()) {
185 throw new NoSuchElementException();
186 }
187
188 DefaultChannelHandlerContext oldHead = head;
189 if (oldHead == null) {
190 throw new NoSuchElementException();
191 }
192
193 callBeforeRemove(oldHead);
194
195 if (oldHead.next == null) {
196 head = tail = null;
197 name2ctx.clear();
198 } else {
199 oldHead.next.prev = null;
200 head = oldHead.next;
201 name2ctx.remove(oldHead.getName());
202 }
203
204 callAfterRemove(oldHead);
205
206 return oldHead.getHandler();
207 }
208
209 public synchronized ChannelHandler removeLast() {
210 if (name2ctx.isEmpty()) {
211 throw new NoSuchElementException();
212 }
213
214 DefaultChannelHandlerContext oldTail = tail;
215 if (oldTail == null) {
216 throw new NoSuchElementException();
217 }
218
219 callBeforeRemove(oldTail);
220
221 if (oldTail.prev == null) {
222 head = tail = null;
223 name2ctx.clear();
224 } else {
225 oldTail.prev.next = null;
226 tail = oldTail.prev;
227 name2ctx.remove(oldTail.getName());
228 }
229
230 callBeforeRemove(oldTail);
231
232 return oldTail.getHandler();
233 }
234
235 public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
236 replace(getContextOrDie(oldHandler), newName, newHandler);
237 }
238
239 public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
240 return replace(getContextOrDie(oldName), newName, newHandler);
241 }
242
243 @SuppressWarnings("unchecked")
244 public synchronized <T extends ChannelHandler> T replace(
245 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
246 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
247 }
248
249 private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
250 if (ctx == head) {
251 removeFirst();
252 addFirst(newName, newHandler);
253 } else if (ctx == tail) {
254 removeLast();
255 addLast(newName, newHandler);
256 } else {
257 boolean sameName = ctx.getName().equals(newName);
258 if (!sameName) {
259 checkDuplicateName(newName);
260 }
261
262 DefaultChannelHandlerContext prev = ctx.prev;
263 DefaultChannelHandlerContext next = ctx.next;
264 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
265
266 callBeforeRemove(ctx);
267 callBeforeAdd(newCtx);
268
269 prev.next = newCtx;
270 next.prev = newCtx;
271
272 if (!sameName) {
273 name2ctx.remove(ctx.getName());
274 }
275 name2ctx.put(newName, newCtx);
276
277 ChannelHandlerLifeCycleException removeException = null;
278 ChannelHandlerLifeCycleException addException = null;
279 boolean removed = false;
280 try {
281 callAfterRemove(ctx);
282 removed = true;
283 } catch (ChannelHandlerLifeCycleException e) {
284 removeException = e;
285 }
286
287 boolean added = false;
288 try {
289 callAfterAdd(newCtx);
290 added = true;
291 } catch (ChannelHandlerLifeCycleException e) {
292 addException = e;
293 }
294
295 if (!removed && !added) {
296 logger.warn(removeException.getMessage(), removeException);
297 logger.warn(addException.getMessage(), addException);
298 throw new ChannelHandlerLifeCycleException(
299 "Both " + ctx.getHandler().getClass().getName() +
300 ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
301 ".afterAdd() failed; see logs.");
302 } else if (!removed) {
303 throw removeException;
304 } else if (!added) {
305 throw addException;
306 }
307 }
308
309 return ctx.getHandler();
310 }
311
312 private static void callBeforeAdd(ChannelHandlerContext ctx) {
313 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
314 return;
315 }
316
317 LifeCycleAwareChannelHandler h =
318 (LifeCycleAwareChannelHandler) ctx.getHandler();
319
320 try {
321 h.beforeAdd(ctx);
322 } catch (Throwable t) {
323 throw new ChannelHandlerLifeCycleException(
324 h.getClass().getName() +
325 ".beforeAdd() has thrown an exception; not adding.", t);
326 }
327 }
328
329 private void callAfterAdd(ChannelHandlerContext ctx) {
330 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
331 return;
332 }
333
334 LifeCycleAwareChannelHandler h =
335 (LifeCycleAwareChannelHandler) ctx.getHandler();
336
337 try {
338 h.afterAdd(ctx);
339 } catch (Throwable t) {
340 boolean removed = false;
341 try {
342 remove((DefaultChannelHandlerContext) ctx);
343 removed = true;
344 } catch (Throwable t2) {
345 if (logger.isWarnEnabled()) {
346 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
347 }
348 }
349
350 if (removed) {
351 throw new ChannelHandlerLifeCycleException(
352 h.getClass().getName() +
353 ".afterAdd() has thrown an exception; removed.", t);
354 } else {
355 throw new ChannelHandlerLifeCycleException(
356 h.getClass().getName() +
357 ".afterAdd() has thrown an exception; also failed to remove.", t);
358 }
359 }
360 }
361
362 private static void callBeforeRemove(ChannelHandlerContext ctx) {
363 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
364 return;
365 }
366
367 LifeCycleAwareChannelHandler h =
368 (LifeCycleAwareChannelHandler) ctx.getHandler();
369
370 try {
371 h.beforeRemove(ctx);
372 } catch (Throwable t) {
373 throw new ChannelHandlerLifeCycleException(
374 h.getClass().getName() +
375 ".beforeRemove() has thrown an exception; not removing.", t);
376 }
377 }
378
379 private static void callAfterRemove(ChannelHandlerContext ctx) {
380 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
381 return;
382 }
383
384 LifeCycleAwareChannelHandler h =
385 (LifeCycleAwareChannelHandler) ctx.getHandler();
386
387 try {
388 h.afterRemove(ctx);
389 } catch (Throwable t) {
390 throw new ChannelHandlerLifeCycleException(
391 h.getClass().getName() +
392 ".afterRemove() has thrown an exception.", t);
393 }
394 }
395
396 public synchronized ChannelHandler getFirst() {
397 DefaultChannelHandlerContext head = this.head;
398 if (head == null) {
399 return null;
400 }
401 return head.getHandler();
402 }
403
404 public synchronized ChannelHandler getLast() {
405 DefaultChannelHandlerContext tail = this.tail;
406 if (tail == null) {
407 return null;
408 }
409 return tail.getHandler();
410 }
411
412 public synchronized ChannelHandler get(String name) {
413 DefaultChannelHandlerContext ctx = name2ctx.get(name);
414 if (ctx == null) {
415 return null;
416 } else {
417 return ctx.getHandler();
418 }
419 }
420
421 public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
422 ChannelHandlerContext ctx = getContext(handlerType);
423 if (ctx == null) {
424 return null;
425 } else {
426 @SuppressWarnings("unchecked")
427 T handler = (T) ctx.getHandler();
428 return handler;
429 }
430 }
431
432 public synchronized ChannelHandlerContext getContext(String name) {
433 if (name == null) {
434 throw new NullPointerException("name");
435 }
436 return name2ctx.get(name);
437 }
438
439 public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
440 if (handler == null) {
441 throw new NullPointerException("handler");
442 }
443 if (name2ctx.isEmpty()) {
444 return null;
445 }
446 DefaultChannelHandlerContext ctx = head;
447 for (;;) {
448 if (ctx.getHandler() == handler) {
449 return ctx;
450 }
451
452 ctx = ctx.next;
453 if (ctx == null) {
454 break;
455 }
456 }
457 return null;
458 }
459
460 public synchronized ChannelHandlerContext getContext(
461 Class<? extends ChannelHandler> handlerType) {
462 if (handlerType == null) {
463 throw new NullPointerException("handlerType");
464 }
465
466 if (name2ctx.isEmpty()) {
467 return null;
468 }
469 DefaultChannelHandlerContext ctx = head;
470 for (;;) {
471 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
472 return ctx;
473 }
474
475 ctx = ctx.next;
476 if (ctx == null) {
477 break;
478 }
479 }
480 return null;
481 }
482
483 public List<String> getNames() {
484 List<String> list = new ArrayList<String>();
485 if (name2ctx.isEmpty()) {
486 return list;
487 }
488
489 DefaultChannelHandlerContext ctx = head;
490 for (;;) {
491 list.add(ctx.getName());
492 ctx = ctx.next;
493 if (ctx == null) {
494 break;
495 }
496 }
497 return list;
498 }
499
500 public Map<String, ChannelHandler> toMap() {
501 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
502 if (name2ctx.isEmpty()) {
503 return map;
504 }
505
506 DefaultChannelHandlerContext ctx = head;
507 for (;;) {
508 map.put(ctx.getName(), ctx.getHandler());
509 ctx = ctx.next;
510 if (ctx == null) {
511 break;
512 }
513 }
514 return map;
515 }
516
517
518
519
520 @Override
521 public String toString() {
522 StringBuilder buf = new StringBuilder();
523 buf.append(getClass().getSimpleName());
524 buf.append('{');
525 DefaultChannelHandlerContext ctx = head;
526 if (ctx != null) {
527 for (;;) {
528 buf.append('(');
529 buf.append(ctx.getName());
530 buf.append(" = ");
531 buf.append(ctx.getHandler().getClass().getName());
532 buf.append(')');
533 ctx = ctx.next;
534 if (ctx == null) {
535 break;
536 }
537 buf.append(", ");
538 }
539 }
540 buf.append('}');
541 return buf.toString();
542 }
543
544 public void sendUpstream(ChannelEvent e) {
545 DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
546 if (head == null) {
547 if (logger.isWarnEnabled()) {
548 logger.warn(
549 "The pipeline contains no upstream handlers; discarding: " + e);
550 }
551
552 return;
553 }
554
555 sendUpstream(head, e);
556 }
557
558 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
559 try {
560 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
561 } catch (Throwable t) {
562 notifyHandlerException(e, t);
563 }
564 }
565
566 public void sendDownstream(ChannelEvent e) {
567 DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
568 if (tail == null) {
569 try {
570 getSink().eventSunk(this, e);
571 return;
572 } catch (Throwable t) {
573 notifyHandlerException(e, t);
574 return;
575 }
576 }
577
578 sendDownstream(tail, e);
579 }
580
581 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
582 if (e instanceof UpstreamMessageEvent) {
583 throw new IllegalArgumentException("cannot send an upstream event to downstream");
584 }
585
586 try {
587 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
588 } catch (Throwable t) {
589
590
591
592
593
594 e.getFuture().setFailure(t);
595 notifyHandlerException(e, t);
596 }
597 }
598
599 private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
600 if (ctx == null) {
601 return null;
602 }
603
604 DefaultChannelHandlerContext realCtx = ctx;
605 while (!realCtx.canHandleUpstream()) {
606 realCtx = realCtx.next;
607 if (realCtx == null) {
608 return null;
609 }
610 }
611
612 return realCtx;
613 }
614
615 private DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
616 if (ctx == null) {
617 return null;
618 }
619
620 DefaultChannelHandlerContext realCtx = ctx;
621 while (!realCtx.canHandleDownstream()) {
622 realCtx = realCtx.prev;
623 if (realCtx == null) {
624 return null;
625 }
626 }
627
628 return realCtx;
629 }
630
631 public ChannelFuture execute(Runnable task) {
632 return getSink().execute(this, task);
633 }
634
635 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
636 if (e instanceof ExceptionEvent) {
637 if (logger.isWarnEnabled()) {
638 logger.warn(
639 "An exception was thrown by a user handler " +
640 "while handling an exception event (" + e + ')', t);
641 }
642
643 return;
644 }
645
646 ChannelPipelineException pe;
647 if (t instanceof ChannelPipelineException) {
648 pe = (ChannelPipelineException) t;
649 } else {
650 pe = new ChannelPipelineException(t);
651 }
652
653 try {
654 sink.exceptionCaught(this, e, pe);
655 } catch (Exception e1) {
656 if (logger.isWarnEnabled()) {
657 logger.warn("An exception was thrown by an exception handler.", e1);
658 }
659 }
660 }
661
662 private void init(String name, ChannelHandler handler) {
663 DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
664 callBeforeAdd(ctx);
665 head = tail = ctx;
666 name2ctx.clear();
667 name2ctx.put(name, ctx);
668 callAfterAdd(ctx);
669 }
670
671 private void checkDuplicateName(String name) {
672 if (name2ctx.containsKey(name)) {
673 throw new IllegalArgumentException("Duplicate handler name: " + name);
674 }
675 }
676
677 private DefaultChannelHandlerContext getContextOrDie(String name) {
678 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
679 if (ctx == null) {
680 throw new NoSuchElementException(name);
681 } else {
682 return ctx;
683 }
684 }
685
686 private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
687 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
688 if (ctx == null) {
689 throw new NoSuchElementException(handler.getClass().getName());
690 } else {
691 return ctx;
692 }
693 }
694
695 private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
696 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
697 if (ctx == null) {
698 throw new NoSuchElementException(handlerType.getName());
699 } else {
700 return ctx;
701 }
702 }
703
704 private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
705 volatile DefaultChannelHandlerContext next;
706 volatile DefaultChannelHandlerContext prev;
707 private final String name;
708 private final ChannelHandler handler;
709 private final boolean canHandleUpstream;
710 private final boolean canHandleDownstream;
711 private volatile Object attachment;
712
713 DefaultChannelHandlerContext(
714 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
715 String name, ChannelHandler handler) {
716
717 if (name == null) {
718 throw new NullPointerException("name");
719 }
720 if (handler == null) {
721 throw new NullPointerException("handler");
722 }
723 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
724 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
725
726 if (!canHandleUpstream && !canHandleDownstream) {
727 throw new IllegalArgumentException(
728 "handler must be either " +
729 ChannelUpstreamHandler.class.getName() + " or " +
730 ChannelDownstreamHandler.class.getName() + '.');
731 }
732
733 this.prev = prev;
734 this.next = next;
735 this.name = name;
736 this.handler = handler;
737 }
738
739 public Channel getChannel() {
740 return getPipeline().getChannel();
741 }
742
743 public ChannelPipeline getPipeline() {
744 return DefaultChannelPipeline.this;
745 }
746
747 public boolean canHandleDownstream() {
748 return canHandleDownstream;
749 }
750
751 public boolean canHandleUpstream() {
752 return canHandleUpstream;
753 }
754
755 public ChannelHandler getHandler() {
756 return handler;
757 }
758
759 public String getName() {
760 return name;
761 }
762
763 public Object getAttachment() {
764 return attachment;
765 }
766
767 public void setAttachment(Object attachment) {
768 this.attachment = attachment;
769 }
770
771 public void sendDownstream(ChannelEvent e) {
772 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
773 if (prev == null) {
774 try {
775 getSink().eventSunk(DefaultChannelPipeline.this, e);
776 } catch (Throwable t) {
777 notifyHandlerException(e, t);
778 }
779 } else {
780 DefaultChannelPipeline.this.sendDownstream(prev, e);
781 }
782 }
783
784 public void sendUpstream(ChannelEvent e) {
785 DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
786 if (next != null) {
787 DefaultChannelPipeline.this.sendUpstream(next, e);
788 }
789 }
790 }
791
792 private static final class DiscardingChannelSink implements ChannelSink {
793 DiscardingChannelSink() {
794 }
795
796 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
797 if (logger.isWarnEnabled()) {
798 logger.warn("Not attached yet; discarding: " + e);
799 }
800 }
801
802 public void exceptionCaught(ChannelPipeline pipeline,
803 ChannelEvent e, ChannelPipelineException cause) throws Exception {
804 throw cause;
805 }
806
807 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
808 if (logger.isWarnEnabled()) {
809 logger.warn("Not attached yet; rejecting: " + task);
810 }
811 return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
812 }
813 }
814 }