View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import java.nio.channels.Selector;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.RejectedExecutionException;
22  
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelPipeline;
25  import org.jboss.netty.channel.group.ChannelGroup;
26  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
27  import org.jboss.netty.channel.socket.SocketChannel;
28  import org.jboss.netty.util.ExternalResourceReleasable;
29  import org.jboss.netty.util.HashedWheelTimer;
30  import org.jboss.netty.util.Timer;
31  
32  /**
33   * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based
34   * {@link SocketChannel}.  It utilizes the non-blocking I/O mode which was
35   * introduced with NIO to serve many number of concurrent connections
36   * efficiently.
37   *
38   * <h3>How threads work</h3>
39   * <p>
40   * There are two types of threads in a {@link NioClientSocketChannelFactory};
41   * one is boss thread and the other is worker thread.
42   *
43   * <h4>Boss thread</h4>
44   * <p>
45   * One {@link NioClientSocketChannelFactory} has one boss thread.  It makes
46   * a connection attempt on request.  Once a connection attempt succeeds,
47   * the boss thread passes the connected {@link Channel} to one of the worker
48   * threads that the {@link NioClientSocketChannelFactory} manages.
49   *
50   * <h4>Worker threads</h4>
51   * <p>
52   * One {@link NioClientSocketChannelFactory} can have one or more worker
53   * threads.  A worker thread performs non-blocking read and write for one or
54   * more {@link Channel}s in a non-blocking mode.
55   *
56   * <h3>Life cycle of threads and graceful shutdown</h3>
57   * <p>
58   * All threads are acquired from the {@link Executor}s which were specified
59   * when a {@link NioClientSocketChannelFactory} was created.  A boss thread is
60   * acquired from the {@code bossExecutor}, and worker threads are acquired from
61   * the {@code workerExecutor}.  Therefore, you should make sure the specified
62   * {@link Executor}s are able to lend the sufficient number of threads.
63   * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
64   * <p>
65   * Both boss and worker threads are acquired lazily, and then released when
66   * there's nothing left to process.  All the related resources such as
67   * {@link Selector} are also released when the boss and worker threads are
68   * released.  Therefore, to shut down a service gracefully, you should do the
69   * following:
70   *
71   * <ol>
72   * <li>close all channels created by the factory usually using
73   *     {@link ChannelGroup#close()}, and</li>
74   * <li>call {@link #releaseExternalResources()}.</li>
75   * </ol>
76   *
77   * Please make sure not to shut down the executor until all channels are
78   * closed.  Otherwise, you will end up with a {@link RejectedExecutionException}
79   * and the related resources might not be released properly.
80   *
81   * @apiviz.landmark
82   */
83  public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
84  
85      private static final int DEFAULT_BOSS_COUNT = 1;
86  
87      private final BossPool<NioClientBoss> bossPool;
88      private final WorkerPool<NioWorker> workerPool;
89      private final NioClientSocketPipelineSink sink;
90      private boolean releasePools;
91  
92      /**
93       * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()}
94       * for the worker and boss executors.
95       *
96       * See {@link #NioClientSocketChannelFactory(Executor, Executor)}
97       */
98      public NioClientSocketChannelFactory() {
99          this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
100         releasePools = true;
101     }
102 
103     /**
104      * Creates a new instance.  Calling this constructor is same with calling
105      * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
106      * 1 and (2 * the number of available processors in the machine) for
107      * <tt>bossCount</tt> and <tt>workerCount</tt> respectively.  The number of
108      * available processors is obtained by {@link Runtime#availableProcessors()}.
109      *
110      * @param bossExecutor
111      *        the {@link Executor} which will execute the boss thread
112      * @param workerExecutor
113      *        the {@link Executor} which will execute the worker threads
114      */
115     public NioClientSocketChannelFactory(
116             Executor bossExecutor, Executor workerExecutor) {
117         this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS);
118     }
119 
120     /**
121      * Creates a new instance.  Calling this constructor is same with calling
122      * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
123      * 1 as <tt>bossCount</tt>.
124      *
125      * @param bossExecutor
126      *        the {@link Executor} which will execute the boss thread
127      * @param workerExecutor
128      *        the {@link Executor} which will execute the worker threads
129      * @param workerCount
130      *        the maximum number of I/O worker threads
131      */
132     public NioClientSocketChannelFactory(
133             Executor bossExecutor, Executor workerExecutor, int workerCount) {
134         this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
135     }
136 
137     /**
138      * Creates a new instance.
139      *
140      * @param bossExecutor
141      *        the {@link Executor} which will execute the boss thread
142      * @param workerExecutor
143      *        the {@link Executor} which will execute the worker threads
144      * @param bossCount
145      *        the maximum number of boss threads
146      * @param workerCount
147      *        the maximum number of I/O worker threads
148      */
149     public NioClientSocketChannelFactory(
150             Executor bossExecutor, Executor workerExecutor,
151             int bossCount, int workerCount) {
152         this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
153     }
154 
155     /**
156      * Creates a new instance.
157      *
158      * @param bossExecutor
159      *        the {@link Executor} which will execute the boss thread
160      * @param bossCount
161      *        the maximum number of boss threads
162      * @param workerPool
163      *        the {@link WorkerPool} to use to do the IO
164      */
165     public NioClientSocketChannelFactory(
166             Executor bossExecutor, int bossCount,
167             WorkerPool<NioWorker> workerPool) {
168         this(bossExecutor, bossCount, workerPool, new HashedWheelTimer());
169     }
170 
171     /**
172      * Creates a new instance.
173      *
174      * @param bossExecutor
175      *        the {@link Executor} which will execute the boss thread
176      * @param bossCount
177      *        the maximum number of boss threads
178      * @param workerPool
179      *        the {@link WorkerPool} to use to do the IO
180      * @param timer
181      *        the {@link Timer} to use to handle the connection timeouts
182      */
183     public NioClientSocketChannelFactory(
184             Executor bossExecutor, int bossCount,
185             WorkerPool<NioWorker> workerPool, Timer timer) {
186         this(new NioClientBossPool(bossExecutor, bossCount, timer, null), workerPool);
187     }
188 
189     /**
190      * Creates a new instance.
191      *
192      * @param bossPool
193      *        the {@link BossPool} to use to handle the connects
194      * @param workerPool
195      *        the {@link WorkerPool} to use to do the IO
196      */
197     public NioClientSocketChannelFactory(
198             BossPool<NioClientBoss> bossPool,
199             WorkerPool<NioWorker> workerPool) {
200 
201         if (bossPool == null) {
202             throw new NullPointerException("bossPool");
203         }
204         if (workerPool == null) {
205             throw new NullPointerException("workerPool");
206         }
207         this.bossPool = bossPool;
208         this.workerPool = workerPool;
209         sink = new NioClientSocketPipelineSink(bossPool);
210     }
211 
212     public SocketChannel newChannel(ChannelPipeline pipeline) {
213         return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker());
214     }
215 
216     public void shutdown() {
217         bossPool.shutdown();
218         workerPool.shutdown();
219         if (releasePools) {
220             releasePools();
221         }
222     }
223 
224     public void releaseExternalResources() {
225         shutdown();
226         releasePools();
227     }
228 
229     private void releasePools() {
230         if (bossPool instanceof ExternalResourceReleasable) {
231             ((ExternalResourceReleasable) bossPool).releaseExternalResources();
232         }
233         if (workerPool instanceof ExternalResourceReleasable) {
234             ((ExternalResourceReleasable) workerPool).releaseExternalResources();
235         }
236     }
237 }