1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelState;
22 import org.jboss.netty.channel.ChannelStateEvent;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.SimpleChannelHandler;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27 import org.jboss.netty.util.DefaultObjectSizeEstimator;
28 import org.jboss.netty.util.ExternalResourceReleasable;
29 import org.jboss.netty.util.ObjectSizeEstimator;
30 import org.jboss.netty.util.Timeout;
31 import org.jboss.netty.util.Timer;
32 import org.jboss.netty.util.TimerTask;
33
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 /**
38 * AbstractTrafficShapingHandler allows to limit the global bandwidth
39 * (see {@link GlobalTrafficShapingHandler}) or per session
40 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41 * It allows too to implement an almost real time monitoring of the bandwidth using
42 * the monitors from {@link TrafficCounter} that will call back every checkInterval
43 * the method doAccounting of this handler.<br>
44 * <br>
45 *
46 * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47 * is the size of the object to be read or write accordingly to the type of
48 * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49 *
50 * If you want for any particular reasons to stop the monitoring (accounting) or to change
51 * the read/write limit or the check interval, several methods allow that for you:<br>
52 * <ul>
53 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56 * <li></li>
57 * </ul>
58 */
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61 /**
62 * Internal logger
63 */
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67 /**
68 * Default delay between two checks: 1s
69 */
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71
72 /**
73 * Default minimal time to wait
74 */
75 private static final long MINIMAL_WAIT = 10;
76
77 /**
78 * Traffic Counter
79 */
80 protected TrafficCounter trafficCounter;
81
82 /**
83 * ObjectSizeEstimator
84 */
85 private ObjectSizeEstimator objectSizeEstimator;
86
87 /**
88 * Timer to associated to any TrafficCounter
89 */
90 protected Timer timer;
91
92 /**
93 * used in releaseExternalResources() to cancel the timer
94 */
95 private volatile Timeout timeout;
96
97 /**
98 * Limit in B/s to apply to write
99 */
100 private long writeLimit;
101
102 /**
103 * Limit in B/s to apply to read
104 */
105 private long readLimit;
106
107 /**
108 * Delay between two performance snapshots
109 */
110 protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
111
112 /**
113 * Boolean associated with the release of this TrafficShapingHandler.
114 * It will be true only once when the releaseExternalRessources is called
115 * to prevent waiting when shutdown.
116 */
117 final AtomicBoolean release = new AtomicBoolean(false);
118
119 private void init(ObjectSizeEstimator newObjectSizeEstimator,
120 Timer newTimer, long newWriteLimit, long newReadLimit,
121 long newCheckInterval) {
122 objectSizeEstimator = newObjectSizeEstimator;
123 timer = newTimer;
124 writeLimit = newWriteLimit;
125 readLimit = newReadLimit;
126 checkInterval = newCheckInterval;
127 //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
128 }
129
130 /**
131 *
132 * @param newTrafficCounter the TrafficCounter to set
133 */
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138 /**
139 * Constructor using default {@link ObjectSizeEstimator}
140 *
141 * @param timer
142 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
143 * @param writeLimit
144 * 0 or a limit in bytes/s
145 * @param readLimit
146 * 0 or a limit in bytes/s
147 * @param checkInterval
148 * The delay between two computations of performances for
149 * channels or 0 if no stats are to be computed
150 */
151 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152 long readLimit, long checkInterval) {
153 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154 }
155
156 /**
157 * Constructor using the specified ObjectSizeEstimator
158 *
159 * @param objectSizeEstimator
160 * the {@link ObjectSizeEstimator} that will be used to compute
161 * the size of the message
162 * @param timer
163 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
164 * @param writeLimit
165 * 0 or a limit in bytes/s
166 * @param readLimit
167 * 0 or a limit in bytes/s
168 * @param checkInterval
169 * The delay between two computations of performances for
170 * channels or 0 if no stats are to be computed
171 */
172 protected AbstractTrafficShapingHandler(
173 ObjectSizeEstimator objectSizeEstimator, Timer timer,
174 long writeLimit, long readLimit, long checkInterval) {
175 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176 }
177
178 /**
179 * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
180 *
181 * @param timer
182 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
183 * @param writeLimit
184 * 0 or a limit in bytes/s
185 * @param readLimit
186 * 0 or a limit in bytes/s
187 */
188 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189 long readLimit) {
190 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191 }
192
193 /**
194 * Constructor using the specified ObjectSizeEstimator and using default Check Interval
195 *
196 * @param objectSizeEstimator
197 * the {@link ObjectSizeEstimator} that will be used to compute
198 * the size of the message
199 * @param timer
200 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
201 * @param writeLimit
202 * 0 or a limit in bytes/s
203 * @param readLimit
204 * 0 or a limit in bytes/s
205 */
206 protected AbstractTrafficShapingHandler(
207 ObjectSizeEstimator objectSizeEstimator, Timer timer,
208 long writeLimit, long readLimit) {
209 init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210 }
211
212 /**
213 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
214 *
215 * @param timer
216 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
217 */
218 protected AbstractTrafficShapingHandler(Timer timer) {
219 init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220 }
221
222 /**
223 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
224 *
225 * @param objectSizeEstimator
226 * the {@link ObjectSizeEstimator} that will be used to compute
227 * the size of the message
228 * @param timer
229 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
230 */
231 protected AbstractTrafficShapingHandler(
232 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234 }
235
236 /**
237 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
238 *
239 * @param timer
240 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
241 * @param checkInterval
242 * The delay between two computations of performances for
243 * channels or 0 if no stats are to be computed
244 */
245 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247 }
248
249 /**
250 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
251 *
252 * @param objectSizeEstimator
253 * the {@link ObjectSizeEstimator} that will be used to compute
254 * the size of the message
255 * @param timer
256 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257 * @param checkInterval
258 * The delay between two computations of performances for
259 * channels or 0 if no stats are to be computed
260 */
261 protected AbstractTrafficShapingHandler(
262 ObjectSizeEstimator objectSizeEstimator, Timer timer,
263 long checkInterval) {
264 init(objectSizeEstimator, timer, 0, 0, checkInterval);
265 }
266
267 /**
268 * Change the underlying limitations and check interval.
269 */
270 public void configure(long newWriteLimit, long newReadLimit,
271 long newCheckInterval) {
272 configure(newWriteLimit, newReadLimit);
273 configure(newCheckInterval);
274 }
275
276 /**
277 * Change the underlying limitations.
278 */
279 public void configure(long newWriteLimit, long newReadLimit) {
280 writeLimit = newWriteLimit;
281 readLimit = newReadLimit;
282 if (trafficCounter != null) {
283 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
284 }
285 }
286
287 /**
288 * Change the check interval.
289 */
290 public void configure(long newCheckInterval) {
291 checkInterval = newCheckInterval;
292 if (trafficCounter != null) {
293 trafficCounter.configure(checkInterval);
294 }
295 }
296
297 /**
298 * Called each time the accounting is computed from the TrafficCounters.
299 * This method could be used for instance to implement almost real time accounting.
300 *
301 * @param counter
302 * the TrafficCounter that computes its performance
303 */
304 protected void doAccounting(TrafficCounter counter) {
305 // NOOP by default
306 }
307
308 /**
309 * Class to implement setReadable at fix time
310 */
311 private class ReopenReadTimerTask implements TimerTask {
312 final ChannelHandlerContext ctx;
313 ReopenReadTimerTask(ChannelHandlerContext ctx) {
314 this.ctx = ctx;
315 }
316 public void run(Timeout timeoutArg) throws Exception {
317 //logger.warn("Start RRTT: "+release.get());
318 if (release.get()) {
319 return;
320 }
321 /*
322 logger.warn("WAKEUP! "+
323 (ctx != null && ctx.getChannel() != null &&
324 ctx.getChannel().isConnected()));
325 */
326 if (ctx != null && ctx.getChannel() != null &&
327 ctx.getChannel().isConnected()) {
328 //logger.warn(" setReadable TRUE: ");
329 // readSuspended = false;
330 ctx.setAttachment(null);
331 ctx.getChannel().setReadable(true);
332 }
333 }
334 }
335
336 /**
337 *
338 * @return the time that should be necessary to wait to respect limit. Can
339 * be negative time
340 */
341 private static long getTimeToWait(long limit, long bytes, long lastTime,
342 long curtime) {
343 long interval = curtime - lastTime;
344 if (interval == 0) {
345 // Time is too short, so just lets continue
346 return 0;
347 }
348 return (bytes * 1000 / limit - interval) / 10 * 10;
349 }
350
351 @Override
352 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
353 throws Exception {
354 try {
355 long curtime = System.currentTimeMillis();
356 long size = objectSizeEstimator.estimateSize(evt.getMessage());
357 if (trafficCounter != null) {
358 trafficCounter.bytesRecvFlowControl(size);
359 if (readLimit == 0) {
360 // no action
361 return;
362 }
363 // compute the number of ms to wait before reopening the channel
364 long wait = getTimeToWait(readLimit,
365 trafficCounter.getCurrentReadBytes(),
366 trafficCounter.getLastTime(), curtime);
367 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
368 // time in order to
369 Channel channel = ctx.getChannel();
370 // try to limit the traffic
371 if (channel != null && channel.isConnected()) {
372 // Channel version
373 if (timer == null) {
374 // Sleep since no executor
375 // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
376 if (release.get()) {
377 return;
378 }
379 Thread.sleep(wait);
380 return;
381 }
382 if (ctx.getAttachment() == null) {
383 // readSuspended = true;
384 ctx.setAttachment(Boolean.TRUE);
385 channel.setReadable(false);
386 // logger.warn("Read will wakeup after "+wait+" ms "+this);
387 TimerTask timerTask = new ReopenReadTimerTask(ctx);
388 timeout = timer.newTimeout(timerTask, wait,
389 TimeUnit.MILLISECONDS);
390 } else {
391 // should be waiting: but can occurs sometime so as
392 // a FIX
393 // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
394 if (release.get()) {
395 return;
396 }
397 Thread.sleep(wait);
398 }
399 } else {
400 // Not connected or no channel
401 // logger.warn("Read sleep "+wait+" ms for "+this);
402 if (release.get()) {
403 return;
404 }
405 Thread.sleep(wait);
406 }
407 }
408 }
409 } finally {
410 // The message is then just passed to the next handler
411 super.messageReceived(ctx, evt);
412 }
413 }
414
415 @Override
416 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
417 throws Exception {
418 try {
419 long curtime = System.currentTimeMillis();
420 long size = objectSizeEstimator.estimateSize(evt.getMessage());
421 if (trafficCounter != null) {
422 trafficCounter.bytesWriteFlowControl(size);
423 if (writeLimit == 0) {
424 return;
425 }
426 // compute the number of ms to wait before continue with the
427 // channel
428 long wait = getTimeToWait(writeLimit,
429 trafficCounter.getCurrentWrittenBytes(),
430 trafficCounter.getLastTime(), curtime);
431 if (wait >= MINIMAL_WAIT) {
432 // Global or Channel
433 if (release.get()) {
434 return;
435 }
436 Thread.sleep(wait);
437 }
438 }
439 } finally {
440 // The message is then just passed to the next handler
441 super.writeRequested(ctx, evt);
442 }
443 }
444 @Override
445 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
446 throws Exception {
447 if (e instanceof ChannelStateEvent) {
448 ChannelStateEvent cse = (ChannelStateEvent) e;
449 if (cse.getState() == ChannelState.INTEREST_OPS &&
450 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
451
452 // setReadable(true) requested
453 boolean readSuspended = ctx.getAttachment() != null;
454 if (readSuspended) {
455 // Drop the request silently if this handler has
456 // set the flag.
457 e.getFuture().setSuccess();
458 return;
459 }
460 }
461 }
462 super.handleDownstream(ctx, e);
463 }
464
465 /**
466 *
467 * @return the current TrafficCounter (if
468 * channel is still connected)
469 */
470 public TrafficCounter getTrafficCounter() {
471 return trafficCounter;
472 }
473
474 public void releaseExternalResources() {
475 if (trafficCounter != null) {
476 trafficCounter.stop();
477 }
478 release.set(true);
479 if (timeout != null) {
480 timeout.cancel();
481 }
482 }
483
484 @Override
485 public String toString() {
486 return "TrafficShaping with Write Limit: " + writeLimit +
487 " Read Limit: " + readLimit + " and Counter: " +
488 (trafficCounter != null? trafficCounter.toString() : "none");
489 }
490 }