001/*
002 * Copyright 2009-2017 UnboundID Corp.
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2009-2017 UnboundID Corp.
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.logging.Level;
030
031import static com.unboundid.util.Debug.*;
032
033
034
035/**
036 * Instances of this class are used to ensure that certain actions are performed
037 * at a fixed rate per interval (e.g. 10000 search operations per second).
038 * <p>
039 * Once a class is constructed with the duration of an interval and the target
040 * per interval, the {@link #await} method only releases callers at the
041 * specified number of times per interval.  This class is most useful when
042 * the target number per interval exceeds the limits of other approaches
043 * such as {@code java.util.Timer} or
044 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}.  For instance,
045 * this does a good job of ensuring that something happens about 10000 times
046 * per second, but it's overkill to ensure something happens five times per
047 * hour.  This does come at a cost.  In the worst case, a single thread is
048 * tied up in a loop doing a small amount of computation followed by a
049 * Thread.yield().  Calling Thread.sleep() is not possible because many
050 * platforms sleep for a minimum of 10ms, and all platforms require sleeping
051 * for at least 1ms.
052 * <p>
053 * Testing has shown that this class is accurate for a "no-op"
054 * action up to two million per second, which vastly exceeds its
055 * typical use in tools such as {@code searchrate} and {@code modrate}.  This
056 * class is designed to be called by multiple threads, however, it does not
057 * make any fairness guarantee between threads; a single-thread might be
058 * released from the {@link #await} method many times before another thread
059 * that is blocked in that method.
060 * <p>
061 * This class attempts to smooth out the target per interval throughout each
062 * interval.  At a given ratio, R between 0 and 1, through the interval, the
063 * expected number of actions to have been performed in the interval at that
064 * time is R times the target per interval.  That is, 10% of the way through
065 * the interval, approximately 10% of the actions have been performed, and
066 * 80% of the way through the interval, 80% of the actions have been performed.
067 */
068@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
069public final class FixedRateBarrier
070       implements Serializable
071{
072  /**
073   * The serial version UID for this serializable class.
074   */
075  private static final long serialVersionUID = -3490156685189909611L;
076
077  /**
078   * The minimum number of milliseconds that Thread.sleep() can handle
079   * accurately.  This varies from platform to platform, so we measure it
080   * once in the static initializer below.  When using a low rate (such as
081   * 100 per second), we can often sleep between iterations instead of having
082   * to spin calling Thread.yield().
083   */
084  private static final long minSleepMillis;
085
086  static
087  {
088    // Calibrate the minimum number of milliseconds that we can reliably
089    // sleep on this system.  We take several measurements and take the median,
090    // which keeps us from choosing an outlier.
091    //
092    // It varies from system to system.  Testing on three systems, yielded
093    // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms),
094    // Windows 7 (1 ms).
095
096    final List<Long> minSleepMillisMeasurements = new ArrayList<Long>();
097
098    for (int i = 0; i < 11; i++)
099    {
100      final long timeBefore = System.currentTimeMillis();
101      try
102      {
103        Thread.sleep(1);
104      }
105      catch (InterruptedException e)
106      {
107        debugException(e);
108      }
109      final long sleepMillis = System.currentTimeMillis() - timeBefore;
110      minSleepMillisMeasurements.add(sleepMillis);
111    }
112
113    Collections.sort(minSleepMillisMeasurements);
114    final long medianSleepMillis = minSleepMillisMeasurements.get(
115            minSleepMillisMeasurements.size()/2);
116
117    minSleepMillis = Math.max(medianSleepMillis, 1);
118
119    final String message = "Calibrated FixedRateBarrier to use " +
120          "minSleepMillis=" + minSleepMillis + ".  " +
121          "Minimum sleep measurements = " + minSleepMillisMeasurements;
122    debug(Level.INFO, DebugType.OTHER, message);
123  }
124
125
126  // This tracks when this class is shut down.  Calls to await() after
127  // shutdownRequested() is called, will return immediately with a value of
128  // true.
129  private volatile boolean shutdownRequested = false;
130
131
132  //
133  // The following class variables are guarded by synchronized(this).
134  //
135
136  // The duration of the target interval in nano-seconds.
137  private long intervalDurationNanos;
138
139  // This tracks the number of milliseconds between each iteration if they
140  // were evenly spaced.
141  //
142  // If intervalDurationMs=1000 and perInterval=100, then this is 100.
143  // If intervalDurationMs=1000 and perInterval=10000, then this is .1.
144  private double millisBetweenIterations;
145
146  // The target number of times to release a thread per interval.
147  private int perInterval;
148
149  // A count of the number of times that await has returned within the current
150  // interval.
151  private long countInThisInterval;
152
153  // The start of this interval in terms of System.nanoTime().
154  private long intervalStartNanos;
155
156  // The end of this interval in terms of System.nanoTime().
157  private long intervalEndNanos;
158
159
160
161  /**
162   * Constructs a new FixedRateBarrier, which is active until
163   * {@link #shutdownRequested} is called.
164   *
165   * @param  intervalDurationMs  The duration of the interval in milliseconds.
166   * @param  perInterval  The target number of times that {@link #await} should
167   *                      return per interval.
168   */
169  public FixedRateBarrier(final long intervalDurationMs, final int perInterval)
170  {
171    setRate(intervalDurationMs, perInterval);
172  }
173
174
175
176  /**
177   * Updates the rates associated with this FixedRateBarrier.  The new rate
178   * will be in effect when this method returns.
179   *
180   * @param  intervalDurationMs  The duration of the interval in milliseconds.
181   * @param  perInterval  The target number of times that {@link #await} should
182   *                      return per interval.
183   */
184  public synchronized void setRate(final long intervalDurationMs,
185                                   final int perInterval)
186  {
187    Validator.ensureTrue(intervalDurationMs > 0,
188         "FixedRateBarrier.intervalDurationMs must be at least 1.");
189    Validator.ensureTrue(perInterval > 0,
190         "FixedRateBarrier.perInterval must be at least 1.");
191
192    this.perInterval = perInterval;
193
194    intervalDurationNanos = 1000L * 1000L * intervalDurationMs;
195
196    millisBetweenIterations = (double)intervalDurationMs/(double)perInterval;
197
198    // Reset the intervals and all of the counters.
199    countInThisInterval = 0;
200    intervalStartNanos = 0;
201    intervalEndNanos = 0;
202  }
203
204
205
206  /**
207   * This method waits until it is time for the next 'action' to be performed
208   * based on the specified interval duration and target per interval.  This
209   * method can be called by multiple threads simultaneously.  This method
210   * returns immediately if shutdown has been requested.
211   *
212   * @return  {@code true} if shutdown has been requested and {@code} false
213   *          otherwise.
214   */
215  public synchronized boolean await()
216  {
217    // Loop forever until we are requested to shutdown or it is time to perform
218    // the next 'action' in which case we break from the loop.
219    while (!shutdownRequested)
220    {
221      final long now = System.nanoTime();
222
223      if ((intervalStartNanos == 0) ||   // Handles the first time we're called.
224          (now < intervalStartNanos))    // Handles a change in the clock.
225      {
226        intervalStartNanos = now;
227        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
228      }
229      else if (now >= intervalEndNanos)  // End of an interval.
230      {
231        countInThisInterval = 0;
232
233        if (now < (intervalEndNanos + intervalDurationNanos))
234        {
235          // If we have already passed the end of the next interval, then we
236          // don't try to catch up.  Instead we just reset the start of the
237          // next interval to now.  This could happen if the system clock
238          // was set to the future, we're running in a debugger, or we have
239          // very short intervals and are unable to keep up.
240          intervalStartNanos = now;
241        }
242        else
243        {
244          // Usually we're some small fraction into the next interval, so
245          // we set the start of the current interval to the end of the
246          // previous one.
247          intervalStartNanos = intervalEndNanos;
248        }
249        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
250      }
251
252      final long intervalRemaining = intervalEndNanos - now;
253      if (intervalRemaining <= 0)
254      {
255        // This shouldn't happen, but we're careful not to divide by 0.
256        continue;
257      }
258
259      final double intervalFractionRemaining =
260           (double) intervalRemaining / intervalDurationNanos;
261
262      final double expectedRemaining = intervalFractionRemaining * perInterval;
263      final long actualRemaining = perInterval - countInThisInterval;
264
265      if (actualRemaining >= expectedRemaining)
266      {
267        // We are on schedule or behind schedule so let the next 'action'
268        // happen.
269        countInThisInterval++;
270        break;
271      }
272      else
273      {
274        // If we can sleep until it's time to leave this barrier, then do
275        // so to keep from spinning on a CPU doing Thread.yield().
276
277        final double gapIterations = expectedRemaining - actualRemaining;
278        final long remainingMillis =
279             (long) Math.floor(millisBetweenIterations * gapIterations);
280
281        if (remainingMillis >= minSleepMillis)
282        {
283          // Cap how long we sleep so that we can respond to a change in the
284          // rate without too much delay.
285          final long waitTime = Math.min(remainingMillis, 10);
286          try
287          {
288            // We need to wait here instead of Thread.sleep so that we don't
289            // block setRate.
290            this.wait(waitTime);
291          }
292          catch (InterruptedException e)
293          {
294            debugException(e);
295            Thread.currentThread().interrupt();
296            return shutdownRequested;
297          }
298        }
299        else
300        {
301          // We're ahead of schedule so yield to other threads, and then try
302          // again.  Note: this is the most costly part of the algorithm because
303          // we have to busy wait due to the lack of sleeping for very small
304          // amounts of time.
305          Thread.yield();
306        }
307      }
308    }
309
310    return shutdownRequested;
311  }
312
313
314
315  /**
316   * Retrieves information about the current target rate for this barrier.  The
317   * value returned will include a {@code Long} that specifies the duration of
318   * the current interval in milliseconds and an {@code Integer} that specifies
319   * the number of times that the {@link #await} method should return per
320   * interval.
321   *
322   * @return  Information about hte current target rate for this barrier.
323   */
324  public synchronized ObjectPair<Long,Integer> getTargetRate()
325  {
326    return new ObjectPair<Long,Integer>(
327         (intervalDurationNanos / (1000L * 1000L)),
328         perInterval);
329  }
330
331
332
333  /**
334   * Shuts down this barrier.  Future calls to await() will return immediately.
335   */
336  public void shutdownRequested()
337  {
338    shutdownRequested = true;
339  }
340
341
342
343  /**
344   * Returns {@code true} if shutdown has been requested.
345   *
346   * @return  {@code true} if shutdown has been requested and {@code false}
347   *          otherwise.
348   */
349  public boolean isShutdownRequested()
350  {
351    return shutdownRequested;
352  }
353}