1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.ChannelEvent;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelFutureListener;
21 import org.jboss.netty.channel.ChannelPipeline;
22 import org.jboss.netty.channel.ChannelState;
23 import org.jboss.netty.channel.ChannelStateEvent;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27
28 import java.net.SocketAddress;
29 import java.nio.channels.ClosedChannelException;
30
31 import static org.jboss.netty.channel.Channels.*;
32
33 class NioClientSocketPipelineSink extends AbstractNioChannelSink {
34
35 static final InternalLogger logger =
36 InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
37
38 private final BossPool<NioClientBoss> bossPool;
39
40 NioClientSocketPipelineSink(BossPool<NioClientBoss> bossPool) {
41 this.bossPool = bossPool;
42 }
43
44 public void eventSunk(
45 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
46 if (e instanceof ChannelStateEvent) {
47 ChannelStateEvent event = (ChannelStateEvent) e;
48 NioClientSocketChannel channel =
49 (NioClientSocketChannel) event.getChannel();
50 ChannelFuture future = event.getFuture();
51 ChannelState state = event.getState();
52 Object value = event.getValue();
53
54 switch (state) {
55 case OPEN:
56 if (Boolean.FALSE.equals(value)) {
57 channel.worker.close(channel, future);
58 }
59 break;
60 case BOUND:
61 if (value != null) {
62 bind(channel, future, (SocketAddress) value);
63 } else {
64 channel.worker.close(channel, future);
65 }
66 break;
67 case CONNECTED:
68 if (value != null) {
69 connect(channel, future, (SocketAddress) value);
70 } else {
71 channel.worker.close(channel, future);
72 }
73 break;
74 case INTEREST_OPS:
75 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
76 break;
77 }
78 } else if (e instanceof MessageEvent) {
79 MessageEvent event = (MessageEvent) e;
80 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
81 boolean offered = channel.writeBufferQueue.offer(event);
82 assert offered;
83 channel.worker.writeFromUserCode(channel);
84 }
85 }
86
87 private static void bind(
88 NioClientSocketChannel channel, ChannelFuture future,
89 SocketAddress localAddress) {
90 try {
91 channel.channel.socket().bind(localAddress);
92 channel.boundManually = true;
93 channel.setBound();
94 future.setSuccess();
95 fireChannelBound(channel, channel.getLocalAddress());
96 } catch (Throwable t) {
97 future.setFailure(t);
98 fireExceptionCaught(channel, t);
99 }
100 }
101
102 private void connect(
103 final NioClientSocketChannel channel, final ChannelFuture cf,
104 SocketAddress remoteAddress) {
105 try {
106 if (channel.channel.connect(remoteAddress)) {
107 channel.worker.register(channel, cf);
108 } else {
109 channel.getCloseFuture().addListener(new ChannelFutureListener() {
110 public void operationComplete(ChannelFuture f)
111 throws Exception {
112 if (!cf.isDone()) {
113 cf.setFailure(new ClosedChannelException());
114 }
115 }
116 });
117 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
118 channel.connectFuture = cf;
119 nextBoss().register(channel, cf);
120 }
121
122 } catch (Throwable t) {
123 cf.setFailure(t);
124 fireExceptionCaught(channel, t);
125 channel.worker.close(channel, succeededFuture(channel));
126 }
127 }
128
129 private NioClientBoss nextBoss() {
130 return bossPool.nextBoss();
131 }
132
133 }