1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.util.ThreadNameDeterminer;
21 import org.jboss.netty.util.ThreadRenamingRunnable;
22 import org.jboss.netty.util.Timeout;
23 import org.jboss.netty.util.Timer;
24 import org.jboss.netty.util.TimerTask;
25
26 import java.io.IOException;
27 import java.net.ConnectException;
28 import java.nio.channels.ClosedChannelException;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.util.Iterator;
32 import java.util.Set;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.TimeUnit;
35
36 import static org.jboss.netty.channel.Channels.*;
37
38
39
40
41 public final class NioClientBoss extends AbstractNioSelector implements Boss {
42
43 private final TimerTask wakeupTask = new TimerTask() {
44 public void run(Timeout timeout) throws Exception {
45
46
47
48
49 Selector selector = NioClientBoss.this.selector;
50
51 if (selector != null) {
52 if (wakenUp.compareAndSet(false, true)) {
53 selector.wakeup();
54 }
55 }
56 }
57 };
58
59 private final Timer timer;
60
61 NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) {
62 super(bossExecutor, determiner);
63 this.timer = timer;
64 }
65
66 @Override
67 protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
68 return new ThreadRenamingRunnable(this, "New I/O boss #" + id, determiner);
69 }
70
71 @Override
72 protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
73 return new RegisterTask(this, (NioClientSocketChannel) channel);
74 }
75
76 @Override
77 protected void process(Selector selector) {
78 processSelectedKeys(selector.selectedKeys());
79
80
81 long currentTimeNanos = System.nanoTime();
82 processConnectTimeout(selector.keys(), currentTimeNanos);
83 }
84
85 private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
86
87
88
89
90 if (selectedKeys.isEmpty()) {
91 return;
92 }
93 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
94 SelectionKey k = i.next();
95 i.remove();
96
97 if (!k.isValid()) {
98 close(k);
99 continue;
100 }
101
102 try {
103 if (k.isConnectable()) {
104 connect(k);
105 }
106 } catch (Throwable t) {
107 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
108 ch.connectFuture.setFailure(t);
109 fireExceptionCaught(ch, t);
110 k.cancel();
111 ch.worker.close(ch, succeededFuture(ch));
112 }
113 }
114 }
115
116 private static void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
117 ConnectException cause = null;
118 for (SelectionKey k: keys) {
119 if (!k.isValid()) {
120
121
122
123
124
125
126
127
128 continue;
129 }
130
131 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
132 if (ch.connectDeadlineNanos > 0 &&
133 currentTimeNanos >= ch.connectDeadlineNanos) {
134
135 if (cause == null) {
136 cause = new ConnectException("connection timed out");
137 }
138
139 ch.connectFuture.setFailure(cause);
140 fireExceptionCaught(ch, cause);
141 ch.worker.close(ch, succeededFuture(ch));
142 }
143 }
144 }
145
146 private static void connect(SelectionKey k) throws IOException {
147 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
148 if (ch.channel.finishConnect()) {
149 k.cancel();
150 if (ch.timoutTimer != null) {
151 ch.timoutTimer.cancel();
152 }
153 ch.worker.register(ch, ch.connectFuture);
154 }
155 }
156
157 @Override
158 protected void close(SelectionKey k) {
159 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
160 ch.worker.close(ch, succeededFuture(ch));
161 }
162
163 private final class RegisterTask implements Runnable {
164 private final NioClientBoss boss;
165 private final NioClientSocketChannel channel;
166
167 RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) {
168 this.boss = boss;
169 this.channel = channel;
170 }
171
172 public void run() {
173 int timeout = channel.getConfig().getConnectTimeoutMillis();
174 if (timeout > 0) {
175 if (!channel.isConnected()) {
176 channel.timoutTimer = timer.newTimeout(wakeupTask,
177 timeout, TimeUnit.MILLISECONDS);
178 }
179 }
180 try {
181 channel.channel.register(
182 boss.selector, SelectionKey.OP_CONNECT, channel);
183 } catch (ClosedChannelException e) {
184 channel.worker.close(channel, succeededFuture(channel));
185 }
186
187 int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
188 if (connectTimeout > 0) {
189 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
190 }
191 }
192 }
193 }