View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.util.ThreadNameDeterminer;
21  import org.jboss.netty.util.ThreadRenamingRunnable;
22  import org.jboss.netty.util.Timeout;
23  import org.jboss.netty.util.Timer;
24  import org.jboss.netty.util.TimerTask;
25  
26  import java.io.IOException;
27  import java.net.ConnectException;
28  import java.nio.channels.ClosedChannelException;
29  import java.nio.channels.SelectionKey;
30  import java.nio.channels.Selector;
31  import java.util.Iterator;
32  import java.util.Set;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.TimeUnit;
35  
36  import static org.jboss.netty.channel.Channels.*;
37  
38  /**
39   * {@link Boss} implementation that handles the  connection attempts of clients
40   */
41  public final class NioClientBoss extends AbstractNioSelector implements Boss {
42  
43      private final TimerTask wakeupTask = new TimerTask() {
44          public void run(Timeout timeout) throws Exception {
45              // This is needed to prevent a possible race that can lead to a NPE
46              // when the selector is closed before this is run
47              //
48              // See https://github.com/netty/netty/issues/685
49              Selector selector = NioClientBoss.this.selector;
50  
51              if (selector != null) {
52                  if (wakenUp.compareAndSet(false, true)) {
53                      selector.wakeup();
54                  }
55              }
56          }
57      };
58  
59      private final Timer timer;
60  
61      NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) {
62          super(bossExecutor, determiner);
63          this.timer = timer;
64      }
65  
66      @Override
67      protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
68          return new ThreadRenamingRunnable(this, "New I/O boss #" + id, determiner);
69      }
70  
71      @Override
72      protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
73          return new RegisterTask(this, (NioClientSocketChannel) channel);
74      }
75  
76      @Override
77      protected void process(Selector selector) {
78          processSelectedKeys(selector.selectedKeys());
79  
80          // Handle connection timeout every 10 milliseconds approximately.
81          long currentTimeNanos = System.nanoTime();
82          processConnectTimeout(selector.keys(), currentTimeNanos);
83      }
84  
85      private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
86  
87          // check if the set is empty and if so just return to not create garbage by
88          // creating a new Iterator every time even if there is nothing to process.
89          // See https://github.com/netty/netty/issues/597
90          if (selectedKeys.isEmpty()) {
91              return;
92          }
93          for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
94              SelectionKey k = i.next();
95              i.remove();
96  
97              if (!k.isValid()) {
98                  close(k);
99                  continue;
100             }
101 
102             try {
103                 if (k.isConnectable()) {
104                     connect(k);
105                 }
106             } catch (Throwable t) {
107                 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
108                 ch.connectFuture.setFailure(t);
109                 fireExceptionCaught(ch, t);
110                 k.cancel(); // Some JDK implementations run into an infinite loop without this.
111                 ch.worker.close(ch, succeededFuture(ch));
112             }
113         }
114     }
115 
116     private static void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
117         ConnectException cause = null;
118         for (SelectionKey k: keys) {
119             if (!k.isValid()) {
120                 // Comment the close call again as it gave us major problems
121                 // with ClosedChannelExceptions.
122                 //
123                 // See:
124                 // * https://github.com/netty/netty/issues/142
125                 // * https://github.com/netty/netty/issues/138
126                 //
127                 // close(k);
128                 continue;
129             }
130 
131             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
132             if (ch.connectDeadlineNanos > 0 &&
133                     currentTimeNanos >= ch.connectDeadlineNanos) {
134 
135                 if (cause == null) {
136                     cause = new ConnectException("connection timed out");
137                 }
138 
139                 ch.connectFuture.setFailure(cause);
140                 fireExceptionCaught(ch, cause);
141                 ch.worker.close(ch, succeededFuture(ch));
142             }
143         }
144     }
145 
146     private static void connect(SelectionKey k) throws IOException {
147         NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
148         if (ch.channel.finishConnect()) {
149             k.cancel();
150             if (ch.timoutTimer != null) {
151                 ch.timoutTimer.cancel();
152             }
153             ch.worker.register(ch, ch.connectFuture);
154         }
155     }
156 
157     @Override
158     protected void close(SelectionKey k) {
159         NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
160         ch.worker.close(ch, succeededFuture(ch));
161     }
162 
163     private final class RegisterTask implements Runnable {
164         private final NioClientBoss boss;
165         private final NioClientSocketChannel channel;
166 
167         RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) {
168             this.boss = boss;
169             this.channel = channel;
170         }
171 
172         public void run() {
173             int timeout = channel.getConfig().getConnectTimeoutMillis();
174             if (timeout > 0) {
175                 if (!channel.isConnected()) {
176                     channel.timoutTimer = timer.newTimeout(wakeupTask,
177                             timeout, TimeUnit.MILLISECONDS);
178                 }
179             }
180             try {
181                 channel.channel.register(
182                         boss.selector, SelectionKey.OP_CONNECT, channel);
183             } catch (ClosedChannelException e) {
184                 channel.worker.close(channel, succeededFuture(channel));
185             }
186 
187             int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
188             if (connectTimeout > 0) {
189                 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
190             }
191         }
192     }
193 }