Class ConcurrentMergeScheduler

java.lang.Object
org.apache.lucene.index.MergeScheduler
org.apache.lucene.index.ConcurrentMergeScheduler
All Implemented Interfaces:
Closeable, AutoCloseable

public class ConcurrentMergeScheduler extends MergeScheduler
A MergeScheduler that runs each merge using a separate thread.

Specify the max number of threads that may run at once, and the maximum number of simultaneous merges with setMaxMergesAndThreads(int, int).

If the number of merges exceeds the max number of threads then the largest merges are paused until one of the smaller merges completes.

If more than getMaxMergeCount() merges are requested then this class will forcefully throttle the incoming threads by pausing until one more merges complete.

This class sets defaults based on Java's view of the cpu count, and it assumes a solid state disk (or similar). If you have a spinning disk and want to maximize performance, use setDefaultMaxMergesAndThreads(boolean).

  • Field Details

    • AUTO_DETECT_MERGES_AND_THREADS

      public static final int AUTO_DETECT_MERGES_AND_THREADS
      Dynamic default for maxThreadCount and maxMergeCount, based on CPU core count. maxThreadCount is set to max(1, min(4, cpuCoreCount/2)). maxMergeCount is set to maxThreadCount + 5.
      See Also:
    • DEFAULT_CPU_CORE_COUNT_PROPERTY

      public static final String DEFAULT_CPU_CORE_COUNT_PROPERTY
      Used for testing.
      See Also:
    • mergeThreads

      protected final List<ConcurrentMergeScheduler.MergeThread> mergeThreads
      List of currently active ConcurrentMergeScheduler.MergeThreads.
    • maxThreadCount

      private int maxThreadCount
    • maxMergeCount

      private int maxMergeCount
    • mergeThreadCount

      protected int mergeThreadCount
      How many ConcurrentMergeScheduler.MergeThreads have kicked off (this is use to name them).
    • MIN_MERGE_MB_PER_SEC

      private static final double MIN_MERGE_MB_PER_SEC
      Floor for IO write rate limit (we will never go any lower than this)
      See Also:
    • MAX_MERGE_MB_PER_SEC

      private static final double MAX_MERGE_MB_PER_SEC
      Ceiling for IO write rate limit (we will never go any higher than this)
      See Also:
    • START_MB_PER_SEC

      private static final double START_MB_PER_SEC
      Initial value for IO write rate limit when doAutoIOThrottle is true
      See Also:
    • MIN_BIG_MERGE_MB

      private static final double MIN_BIG_MERGE_MB
      Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount).
      See Also:
    • targetMBPerSec

      protected double targetMBPerSec
      Current IO writes throttle rate
    • doAutoIOThrottle

      private boolean doAutoIOThrottle
      true if we should rate-limit writes for each merge
    • forceMergeMBPerSec

      private double forceMergeMBPerSec
    • suppressExceptions

      private boolean suppressExceptions
  • Constructor Details

    • ConcurrentMergeScheduler

      public ConcurrentMergeScheduler()
      Sole constructor, with all settings set to default values.
  • Method Details

    • setMaxMergesAndThreads

      public void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
      Expert: directly set the maximum number of merge threads and simultaneous merges allowed.
      Parameters:
      maxMergeCount - the max # simultaneous merges that are allowed. If a merge is necessary yet we already have this many threads running, the incoming thread (that is calling add/updateDocument) will block until a merge thread has completed. Note that we will only run the smallest maxThreadCount merges at a time.
      maxThreadCount - the max # simultaneous merge threads that should be running at once. This must be <= maxMergeCount
    • setDefaultMaxMergesAndThreads

      public void setDefaultMaxMergesAndThreads(boolean spins)
      Sets max merges and threads to proper defaults for rotational or non-rotational storage.
      Parameters:
      spins - true to set defaults best for traditional rotatational storage (spinning disks), else false (e.g. for solid-state disks)
    • setForceMergeMBPerSec

      public void setForceMergeMBPerSec(double v)
      Set the per-merge IO throttle rate for forced merges (default: Double.POSITIVE_INFINITY).
    • getForceMergeMBPerSec

      public double getForceMergeMBPerSec()
      Get the per-merge IO throttle rate for forced merges.
    • enableAutoIOThrottle

      public void enableAutoIOThrottle()
      Turn on dynamic IO throttling, to adaptively rate limit writes bytes/sec to the minimal rate necessary so merges do not fall behind. By default this is enabled.
    • disableAutoIOThrottle

      public void disableAutoIOThrottle()
      Turn off auto IO throttling.
      See Also:
    • getAutoIOThrottle

      public boolean getAutoIOThrottle()
      Returns true if auto IO throttling is currently enabled.
    • getIORateLimitMBPerSec

      public double getIORateLimitMBPerSec()
      Returns the currently set per-merge IO writes rate limit, if enableAutoIOThrottle() was called, else Double.POSITIVE_INFINITY.
    • getMaxThreadCount

      public int getMaxThreadCount()
      Returns maxThreadCount.
      See Also:
    • getMaxMergeCount

      public int getMaxMergeCount()
    • removeMergeThread

      void removeMergeThread()
      Removes the calling thread from the active merge threads.
    • wrapForMerge

      public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in)
      Description copied from class: MergeScheduler
      Wraps the incoming Directory so that we can merge-throttle it using RateLimitedIndexOutput.
      Overrides:
      wrapForMerge in class MergeScheduler
    • updateMergeThreads

      protected void updateMergeThreads()
      Called whenever the running merges have changed, to set merge IO limits. This method sorts the merge threads by their merge size in descending order and then pauses/unpauses threads from first to last -- that way, smaller merges are guaranteed to run before larger ones.
    • initDynamicDefaults

      private void initDynamicDefaults(Directory directory) throws IOException
      Throws:
      IOException
    • rateToString

      private static String rateToString(double mbPerSec)
    • close

      public void close()
      Description copied from class: MergeScheduler
      Close this MergeScheduler.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Specified by:
      close in class MergeScheduler
    • sync

      public void sync()
      Wait for any running merge threads to finish. This call is not interruptible as used by close().
    • mergeThreadCount

      public int mergeThreadCount()
      Returns the number of merge threads that are alive, ignoring the calling thread if it is a merge thread. Note that this number is ≤ mergeThreads size.
    • initialize

      void initialize(InfoStream infoStream, Directory directory) throws IOException
      Description copied from class: MergeScheduler
      IndexWriter calls this on init.
      Overrides:
      initialize in class MergeScheduler
      Throws:
      IOException
    • merge

      public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws IOException
      Description copied from class: MergeScheduler
      Run the merges provided by MergeScheduler.MergeSource.getNextMerge().
      Specified by:
      merge in class MergeScheduler
      Parameters:
      mergeSource - the IndexWriter to obtain the merges from.
      trigger - the MergeTrigger that caused this merge to happen
      Throws:
      IOException
    • maybeStall

      protected boolean maybeStall(MergeScheduler.MergeSource mergeSource)
      This is invoked by merge(org.apache.lucene.index.MergeScheduler.MergeSource, org.apache.lucene.index.MergeTrigger) to possibly stall the incoming thread when there are too many merges running or pending. The default behavior is to force this thread, which is producing too many segments for merging to keep up, to wait until merges catch up. Applications that can take other less drastic measures, such as limiting how many threads are allowed to index, can do nothing here and throttle elsewhere.

      If this method wants to stall but the calling thread is a merge thread, it should return false to tell caller not to kick off any new merges.

    • doStall

      protected void doStall()
      Called from maybeStall(org.apache.lucene.index.MergeScheduler.MergeSource) to pause the calling thread for a bit.
    • doMerge

      protected void doMerge(MergeScheduler.MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException
      Throws:
      IOException
    • getMergeThread

      Create and return a new MergeThread
      Throws:
      IOException
    • runOnMergeFinished

      void runOnMergeFinished(MergeScheduler.MergeSource mergeSource)
    • handleMergeException

      protected void handleMergeException(Throwable exc)
      Called when an exception is hit in a background merge thread
    • setSuppressExceptions

      void setSuppressExceptions()
      Used for testing
    • clearSuppressExceptions

      void clearSuppressExceptions()
      Used for testing
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • isBacklog

      private boolean isBacklog(long now, MergePolicy.OneMerge merge)
    • updateIOThrottle

      private void updateIOThrottle(MergePolicy.OneMerge newMerge, MergeRateLimiter rateLimiter) throws IOException
      Tunes IO throttle when a new merge starts.
      Throws:
      IOException
    • targetMBPerSecChanged

      protected void targetMBPerSecChanged()
      Subclass can override to tweak targetMBPerSec.
    • nsToSec

      private static double nsToSec(long ns)
    • bytesToMB

      private static double bytesToMB(long bytes)
    • getSegmentName

      private static String getSegmentName(MergePolicy.OneMerge merge)