View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.ChannelEvent;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ChannelFutureListener;
21  import org.jboss.netty.channel.ChannelPipeline;
22  import org.jboss.netty.channel.ChannelState;
23  import org.jboss.netty.channel.ChannelStateEvent;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.logging.InternalLogger;
26  import org.jboss.netty.logging.InternalLoggerFactory;
27  
28  import java.net.SocketAddress;
29  import java.nio.channels.ClosedChannelException;
30  
31  import static org.jboss.netty.channel.Channels.*;
32  
33  class NioClientSocketPipelineSink extends AbstractNioChannelSink {
34  
35      static final InternalLogger logger =
36          InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
37  
38      private final BossPool<NioClientBoss> bossPool;
39  
40      NioClientSocketPipelineSink(BossPool<NioClientBoss> bossPool) {
41          this.bossPool = bossPool;
42      }
43  
44      public void eventSunk(
45              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
46          if (e instanceof ChannelStateEvent) {
47              ChannelStateEvent event = (ChannelStateEvent) e;
48              NioClientSocketChannel channel =
49                  (NioClientSocketChannel) event.getChannel();
50              ChannelFuture future = event.getFuture();
51              ChannelState state = event.getState();
52              Object value = event.getValue();
53  
54              switch (state) {
55              case OPEN:
56                  if (Boolean.FALSE.equals(value)) {
57                      channel.worker.close(channel, future);
58                  }
59                  break;
60              case BOUND:
61                  if (value != null) {
62                      bind(channel, future, (SocketAddress) value);
63                  } else {
64                      channel.worker.close(channel, future);
65                  }
66                  break;
67              case CONNECTED:
68                  if (value != null) {
69                      connect(channel, future, (SocketAddress) value);
70                  } else {
71                      channel.worker.close(channel, future);
72                  }
73                  break;
74              case INTEREST_OPS:
75                  channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
76                  break;
77              }
78          } else if (e instanceof MessageEvent) {
79              MessageEvent event = (MessageEvent) e;
80              NioSocketChannel channel = (NioSocketChannel) event.getChannel();
81              boolean offered = channel.writeBufferQueue.offer(event);
82              assert offered;
83              channel.worker.writeFromUserCode(channel);
84          }
85      }
86  
87      private static void bind(
88              NioClientSocketChannel channel, ChannelFuture future,
89              SocketAddress localAddress) {
90          try {
91              channel.channel.socket().bind(localAddress);
92              channel.boundManually = true;
93              channel.setBound();
94              future.setSuccess();
95              fireChannelBound(channel, channel.getLocalAddress());
96          } catch (Throwable t) {
97              future.setFailure(t);
98              fireExceptionCaught(channel, t);
99          }
100     }
101 
102     private void connect(
103             final NioClientSocketChannel channel, final ChannelFuture cf,
104             SocketAddress remoteAddress) {
105         try {
106             if (channel.channel.connect(remoteAddress)) {
107                 channel.worker.register(channel, cf);
108             } else {
109                 channel.getCloseFuture().addListener(new ChannelFutureListener() {
110                     public void operationComplete(ChannelFuture f)
111                             throws Exception {
112                         if (!cf.isDone()) {
113                             cf.setFailure(new ClosedChannelException());
114                         }
115                     }
116                 });
117                 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
118                 channel.connectFuture = cf;
119                 nextBoss().register(channel, cf);
120             }
121 
122         } catch (Throwable t) {
123             cf.setFailure(t);
124             fireExceptionCaught(channel, t);
125             channel.worker.close(channel, succeededFuture(channel));
126         }
127     }
128 
129     private NioClientBoss nextBoss() {
130         return bossPool.nextBoss();
131     }
132 
133 }