View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelEvent;
20  import org.jboss.netty.channel.ChannelHandlerContext;
21  import org.jboss.netty.channel.ChannelState;
22  import org.jboss.netty.channel.ChannelStateEvent;
23  import org.jboss.netty.channel.MessageEvent;
24  import org.jboss.netty.channel.SimpleChannelHandler;
25  import org.jboss.netty.logging.InternalLogger;
26  import org.jboss.netty.logging.InternalLoggerFactory;
27  import org.jboss.netty.util.DefaultObjectSizeEstimator;
28  import org.jboss.netty.util.ExternalResourceReleasable;
29  import org.jboss.netty.util.ObjectSizeEstimator;
30  import org.jboss.netty.util.Timeout;
31  import org.jboss.netty.util.Timer;
32  import org.jboss.netty.util.TimerTask;
33  
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  /**
38   * AbstractTrafficShapingHandler allows to limit the global bandwidth
39   * (see {@link GlobalTrafficShapingHandler}) or per session
40   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41   * It allows too to implement an almost real time monitoring of the bandwidth using
42   * the monitors from {@link TrafficCounter} that will call back every checkInterval
43   * the method doAccounting of this handler.<br>
44   * <br>
45   *
46   * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47   * is the size of the object to be read or write accordingly to the type of
48   * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49   *
50   * If you want for any particular reasons to stop the monitoring (accounting) or to change
51   * the read/write limit or the check interval, several methods allow that for you:<br>
52   * <ul>
53   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56   * <li></li>
57   * </ul>
58   */
59  public abstract class AbstractTrafficShapingHandler extends
60          SimpleChannelHandler implements ExternalResourceReleasable {
61      /**
62       * Internal logger
63       */
64      static InternalLogger logger = InternalLoggerFactory
65              .getInstance(AbstractTrafficShapingHandler.class);
66  
67      /**
68       * Default delay between two checks: 1s
69       */
70      public static final long DEFAULT_CHECK_INTERVAL = 1000;
71  
72      /**
73       * Default minimal time to wait
74       */
75      private static final long MINIMAL_WAIT = 10;
76  
77      /**
78       * Traffic Counter
79       */
80      protected TrafficCounter trafficCounter;
81  
82      /**
83       * ObjectSizeEstimator
84       */
85      private ObjectSizeEstimator objectSizeEstimator;
86  
87      /**
88       * Timer to associated to any TrafficCounter
89       */
90      protected Timer timer;
91  
92      /**
93       * used in releaseExternalResources() to cancel the timer
94       */
95      private volatile Timeout timeout;
96  
97      /**
98       * Limit in B/s to apply to write
99       */
100     private long writeLimit;
101 
102     /**
103      * Limit in B/s to apply to read
104      */
105     private long readLimit;
106 
107     /**
108      * Delay between two performance snapshots
109      */
110     protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
111 
112     /**
113      * Boolean associated with the release of this TrafficShapingHandler.
114      * It will be true only once when the releaseExternalRessources is called
115      * to prevent waiting when shutdown.
116      */
117     final AtomicBoolean release = new AtomicBoolean(false);
118 
119      private void init(ObjectSizeEstimator newObjectSizeEstimator,
120              Timer newTimer, long newWriteLimit, long newReadLimit,
121              long newCheckInterval) {
122          objectSizeEstimator = newObjectSizeEstimator;
123          timer = newTimer;
124          writeLimit = newWriteLimit;
125          readLimit = newReadLimit;
126          checkInterval = newCheckInterval;
127          //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
128      }
129 
130     /**
131      *
132      * @param newTrafficCounter the TrafficCounter to set
133      */
134     void setTrafficCounter(TrafficCounter newTrafficCounter) {
135         trafficCounter = newTrafficCounter;
136     }
137 
138     /**
139      * Constructor using default {@link ObjectSizeEstimator}
140      *
141      * @param timer
142      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
143      * @param writeLimit
144      *          0 or a limit in bytes/s
145      * @param readLimit
146      *          0 or a limit in bytes/s
147      * @param checkInterval
148      *          The delay between two computations of performances for
149      *            channels or 0 if no stats are to be computed
150      */
151     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152                                             long readLimit, long checkInterval) {
153         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154     }
155 
156     /**
157      * Constructor using the specified ObjectSizeEstimator
158      *
159      * @param objectSizeEstimator
160      *            the {@link ObjectSizeEstimator} that will be used to compute
161      *            the size of the message
162      * @param timer
163      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
164      * @param writeLimit
165      *          0 or a limit in bytes/s
166      * @param readLimit
167      *          0 or a limit in bytes/s
168      * @param checkInterval
169      *          The delay between two computations of performances for
170      *            channels or 0 if no stats are to be computed
171      */
172     protected AbstractTrafficShapingHandler(
173             ObjectSizeEstimator objectSizeEstimator, Timer timer,
174             long writeLimit, long readLimit, long checkInterval) {
175         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176     }
177 
178     /**
179      * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
180      *
181      * @param timer
182      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
183      * @param writeLimit
184      *          0 or a limit in bytes/s
185      * @param readLimit
186      *          0 or a limit in bytes/s
187      */
188     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189                                             long readLimit) {
190         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191     }
192 
193     /**
194      * Constructor using the specified ObjectSizeEstimator and using default Check Interval
195      *
196      * @param objectSizeEstimator
197      *            the {@link ObjectSizeEstimator} that will be used to compute
198      *            the size of the message
199      * @param timer
200      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
201      * @param writeLimit
202      *          0 or a limit in bytes/s
203      * @param readLimit
204      *          0 or a limit in bytes/s
205      */
206     protected AbstractTrafficShapingHandler(
207             ObjectSizeEstimator objectSizeEstimator, Timer timer,
208             long writeLimit, long readLimit) {
209         init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210     }
211 
212     /**
213      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
214      *
215      * @param timer
216      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
217      */
218     protected AbstractTrafficShapingHandler(Timer timer) {
219         init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220     }
221 
222     /**
223      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
224      *
225      * @param objectSizeEstimator
226      *            the {@link ObjectSizeEstimator} that will be used to compute
227      *            the size of the message
228      * @param timer
229      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
230      */
231     protected AbstractTrafficShapingHandler(
232             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233         init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234     }
235 
236     /**
237      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
238      *
239      * @param timer
240      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
241      * @param checkInterval
242      *          The delay between two computations of performances for
243      *            channels or 0 if no stats are to be computed
244      */
245     protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246         init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247     }
248 
249     /**
250      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
251      *
252      * @param objectSizeEstimator
253      *            the {@link ObjectSizeEstimator} that will be used to compute
254      *            the size of the message
255      * @param timer
256      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257      * @param checkInterval
258      *          The delay between two computations of performances for
259      *            channels or 0 if no stats are to be computed
260      */
261     protected AbstractTrafficShapingHandler(
262             ObjectSizeEstimator objectSizeEstimator, Timer timer,
263             long checkInterval) {
264         init(objectSizeEstimator, timer, 0, 0, checkInterval);
265     }
266 
267     /**
268      * Change the underlying limitations and check interval.
269      */
270     public void configure(long newWriteLimit, long newReadLimit,
271             long newCheckInterval) {
272         configure(newWriteLimit, newReadLimit);
273         configure(newCheckInterval);
274     }
275 
276     /**
277      * Change the underlying limitations.
278      */
279     public void configure(long newWriteLimit, long newReadLimit) {
280         writeLimit = newWriteLimit;
281         readLimit = newReadLimit;
282         if (trafficCounter != null) {
283             trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
284         }
285     }
286 
287     /**
288      * Change the check interval.
289      */
290     public void configure(long newCheckInterval) {
291         checkInterval = newCheckInterval;
292         if (trafficCounter != null) {
293             trafficCounter.configure(checkInterval);
294         }
295     }
296 
297     /**
298      * Called each time the accounting is computed from the TrafficCounters.
299      * This method could be used for instance to implement almost real time accounting.
300      *
301      * @param counter
302      *            the TrafficCounter that computes its performance
303      */
304     protected void doAccounting(TrafficCounter counter) {
305         // NOOP by default
306     }
307 
308     /**
309      * Class to implement setReadable at fix time
310      */
311     private class ReopenReadTimerTask implements TimerTask {
312         final ChannelHandlerContext ctx;
313         ReopenReadTimerTask(ChannelHandlerContext ctx) {
314             this.ctx = ctx;
315         }
316         public void run(Timeout timeoutArg) throws Exception {
317             //logger.warn("Start RRTT: "+release.get());
318             if (release.get()) {
319                 return;
320             }
321             /*
322             logger.warn("WAKEUP! "+
323                     (ctx != null && ctx.getChannel() != null &&
324                             ctx.getChannel().isConnected()));
325              */
326             if (ctx != null && ctx.getChannel() != null &&
327                     ctx.getChannel().isConnected()) {
328                 //logger.warn(" setReadable TRUE: ");
329                 // readSuspended = false;
330                 ctx.setAttachment(null);
331                 ctx.getChannel().setReadable(true);
332             }
333         }
334     }
335 
336     /**
337     *
338     * @return the time that should be necessary to wait to respect limit. Can
339     *         be negative time
340     */
341     private static long getTimeToWait(long limit, long bytes, long lastTime,
342             long curtime) {
343         long interval = curtime - lastTime;
344         if (interval == 0) {
345             // Time is too short, so just lets continue
346             return 0;
347         }
348         return (bytes * 1000 / limit - interval) / 10 * 10;
349     }
350 
351     @Override
352     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
353             throws Exception {
354         try {
355             long curtime = System.currentTimeMillis();
356             long size = objectSizeEstimator.estimateSize(evt.getMessage());
357             if (trafficCounter != null) {
358                 trafficCounter.bytesRecvFlowControl(size);
359                 if (readLimit == 0) {
360                     // no action
361                     return;
362                 }
363                 // compute the number of ms to wait before reopening the channel
364                 long wait = getTimeToWait(readLimit,
365                         trafficCounter.getCurrentReadBytes(),
366                         trafficCounter.getLastTime(), curtime);
367                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
368                                             // time in order to
369                     Channel channel = ctx.getChannel();
370                     // try to limit the traffic
371                     if (channel != null && channel.isConnected()) {
372                         // Channel version
373                         if (timer == null) {
374                             // Sleep since no executor
375                             // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
376                             if (release.get()) {
377                                 return;
378                             }
379                             Thread.sleep(wait);
380                             return;
381                         }
382                         if (ctx.getAttachment() == null) {
383                             // readSuspended = true;
384                             ctx.setAttachment(Boolean.TRUE);
385                             channel.setReadable(false);
386                             // logger.warn("Read will wakeup after "+wait+" ms "+this);
387                             TimerTask timerTask = new ReopenReadTimerTask(ctx);
388                             timeout = timer.newTimeout(timerTask, wait,
389                                     TimeUnit.MILLISECONDS);
390                         } else {
391                             // should be waiting: but can occurs sometime so as
392                             // a FIX
393                             // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
394                             if (release.get()) {
395                                 return;
396                             }
397                             Thread.sleep(wait);
398                         }
399                     } else {
400                         // Not connected or no channel
401                         // logger.warn("Read sleep "+wait+" ms for "+this);
402                         if (release.get()) {
403                             return;
404                         }
405                         Thread.sleep(wait);
406                     }
407                 }
408             }
409         } finally {
410             // The message is then just passed to the next handler
411             super.messageReceived(ctx, evt);
412         }
413     }
414 
415     @Override
416     public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
417             throws Exception {
418         try {
419             long curtime = System.currentTimeMillis();
420             long size = objectSizeEstimator.estimateSize(evt.getMessage());
421             if (trafficCounter != null) {
422                 trafficCounter.bytesWriteFlowControl(size);
423                 if (writeLimit == 0) {
424                     return;
425                 }
426                 // compute the number of ms to wait before continue with the
427                 // channel
428                 long wait = getTimeToWait(writeLimit,
429                         trafficCounter.getCurrentWrittenBytes(),
430                         trafficCounter.getLastTime(), curtime);
431                 if (wait >= MINIMAL_WAIT) {
432                     // Global or Channel
433                     if (release.get()) {
434                         return;
435                     }
436                     Thread.sleep(wait);
437                 }
438             }
439         } finally {
440             // The message is then just passed to the next handler
441             super.writeRequested(ctx, evt);
442         }
443     }
444     @Override
445     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
446             throws Exception {
447         if (e instanceof ChannelStateEvent) {
448             ChannelStateEvent cse = (ChannelStateEvent) e;
449             if (cse.getState() == ChannelState.INTEREST_OPS &&
450                     (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
451 
452                 // setReadable(true) requested
453                 boolean readSuspended = ctx.getAttachment() != null;
454                 if (readSuspended) {
455                     // Drop the request silently if this handler has
456                     // set the flag.
457                     e.getFuture().setSuccess();
458                     return;
459                 }
460             }
461         }
462         super.handleDownstream(ctx, e);
463     }
464 
465     /**
466      *
467      * @return the current TrafficCounter (if
468      *         channel is still connected)
469      */
470     public TrafficCounter getTrafficCounter() {
471         return trafficCounter;
472     }
473 
474     public void releaseExternalResources() {
475         if (trafficCounter != null) {
476             trafficCounter.stop();
477         }
478         release.set(true);
479         if (timeout != null) {
480             timeout.cancel();
481         }
482     }
483 
484     @Override
485     public String toString() {
486         return "TrafficShaping with Write Limit: " + writeLimit +
487                 " Read Limit: " + readLimit + " and Counter: " +
488                 (trafficCounter != null? trafficCounter.toString() : "none");
489     }
490 }