001/*
002 * Copyright 2016-2017 UnboundID Corp.
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2016-2017 UnboundID Corp.
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.ldap.listener;
022
023
024
025import java.util.List;
026import java.util.concurrent.Semaphore;
027import java.util.concurrent.TimeUnit;
028
029import com.unboundid.ldap.protocol.AbandonRequestProtocolOp;
030import com.unboundid.ldap.protocol.AddRequestProtocolOp;
031import com.unboundid.ldap.protocol.AddResponseProtocolOp;
032import com.unboundid.ldap.protocol.BindRequestProtocolOp;
033import com.unboundid.ldap.protocol.BindResponseProtocolOp;
034import com.unboundid.ldap.protocol.CompareRequestProtocolOp;
035import com.unboundid.ldap.protocol.CompareResponseProtocolOp;
036import com.unboundid.ldap.protocol.DeleteRequestProtocolOp;
037import com.unboundid.ldap.protocol.DeleteResponseProtocolOp;
038import com.unboundid.ldap.protocol.ExtendedRequestProtocolOp;
039import com.unboundid.ldap.protocol.ExtendedResponseProtocolOp;
040import com.unboundid.ldap.protocol.LDAPMessage;
041import com.unboundid.ldap.protocol.ModifyRequestProtocolOp;
042import com.unboundid.ldap.protocol.ModifyResponseProtocolOp;
043import com.unboundid.ldap.protocol.ModifyDNRequestProtocolOp;
044import com.unboundid.ldap.protocol.ModifyDNResponseProtocolOp;
045import com.unboundid.ldap.protocol.SearchRequestProtocolOp;
046import com.unboundid.ldap.protocol.SearchResultDoneProtocolOp;
047import com.unboundid.ldap.sdk.Control;
048import com.unboundid.ldap.sdk.LDAPException;
049import com.unboundid.ldap.sdk.OperationType;
050import com.unboundid.ldap.sdk.ResultCode;
051import com.unboundid.util.Debug;
052import com.unboundid.util.NotMutable;
053import com.unboundid.util.StaticUtils;
054import com.unboundid.util.ThreadSafety;
055import com.unboundid.util.ThreadSafetyLevel;
056import com.unboundid.util.Validator;
057
058import static com.unboundid.ldap.listener.ListenerMessages.*;
059
060
061
062/**
063 * This class provides an implementation of an LDAP listener request handler
064 * that can be used to limit the number of requests that may be processed
065 * concurrently.  It uses one or more {@link Semaphore} instances to limit the
066 * number of requests that may be processed at any time, and provides the
067 * ability to impose limiting on a per-operation-type basis.
068 */
069@NotMutable()
070@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
071public final class ConcurrentRequestLimiterRequestHandler
072       extends LDAPListenerRequestHandler
073{
074  // The downstream request handler that will be used to process the requests
075  // after any appropriate concurrent request limiting has been performed.
076  private final LDAPListenerRequestHandler downstreamRequestHandler;
077
078  // A timeout value (expressed in milliseconds) that will cause the operation
079  // to be rejected rather than processed if the associated semaphore cannot be
080  // acquired in this length of time.
081  private final long rejectTimeoutMillis;
082
083  // The semaphores that will be used for each type of operation.
084  private final Semaphore abandonSemaphore;
085  private final Semaphore addSemaphore;
086  private final Semaphore bindSemaphore;
087  private final Semaphore compareSemaphore;
088  private final Semaphore deleteSemaphore;
089  private final Semaphore extendedSemaphore;
090  private final Semaphore modifySemaphore;
091  private final Semaphore modifyDNSemaphore;
092  private final Semaphore searchSemaphore;
093
094
095
096  /**
097   * Creates a new concurrent request limiter request handler that will impose
098   * the specified limit on the number of operations that may be in progress at
099   * any time.  The limit will be enforced for all types of operations except
100   * abandon and unbind operations, which will not be limited.
101   *
102   * @param  downstreamRequestHandler  The downstream request handler that will
103   *                                   be used to actually process the requests
104   *                                   after any appropriate limiting has been
105   *                                   performed.
106   * @param  maxConcurrentRequests     The maximum number of requests that may
107   *                                   be processed at any given time.  This
108   *                                   limit will be enforced for all operation
109   *                                   types except abandon and unbind, which
110   *                                   will not be limited.
111   * @param  rejectTimeoutMillis       A timeout value (expressed in
112   *                                   milliseconds) that will cause a requested
113   *                                   operation to be rejected rather than
114   *                                   processed if the associate semaphore
115   *                                   cannot be acquired in this length of
116   *                                   time.  A value of zero indicates that the
117   *                                   operation should be rejected immediately
118   *                                   if the maximum number of concurrent
119   *                                   requests are already in progress.  A
120   *                                   value that is less than zero indicates
121   *                                   that no timeout should be imposed and
122   *                                   that requests should be forced to wait as
123   *                                   long as necessary until they can be
124   *                                   processed.
125   */
126  public ConcurrentRequestLimiterRequestHandler(
127              final LDAPListenerRequestHandler downstreamRequestHandler,
128              final int maxConcurrentRequests, final long rejectTimeoutMillis)
129  {
130    this(downstreamRequestHandler, new Semaphore(maxConcurrentRequests),
131         rejectTimeoutMillis);
132  }
133
134
135
136  /**
137   * Creates a new concurrent request limiter request handler that will use the
138   * provided semaphore to limit on the number of operations that may be in
139   * progress at any time.  The limit will be enforced for all types of
140   * operations except abandon and unbind operations, which will not be limited.
141   *
142   * @param  downstreamRequestHandler  The downstream request handler that will
143   *                                   be used to actually process the requests
144   *                                   after any appropriate limiting has been
145   *                                   performed.
146   * @param  semaphore                 The semaphore that will be used to limit
147   *                                   the number of concurrent operations in
148   *                                   progress, for all operation types except
149   *                                   abandon and unbind.
150   * @param  rejectTimeoutMillis       A timeout value (expressed in
151   *                                   milliseconds) that will cause a requested
152   *                                   operation to be rejected rather than
153   *                                   processed if the associate semaphore
154   *                                   cannot be acquired in this length of
155   *                                   time.  A value of zero indicates that the
156   *                                   operation should be rejected immediately
157   *                                   if the maximum number of concurrent
158   *                                   requests are already in progress.  A
159   *                                   value that is less than zero indicates
160   *                                   that no timeout should be imposed and
161   *                                   that requests should be forced to wait as
162   *                                   long as necessary until they can be
163   *                                   processed.
164   */
165  public ConcurrentRequestLimiterRequestHandler(
166              final LDAPListenerRequestHandler downstreamRequestHandler,
167              final Semaphore semaphore, final long rejectTimeoutMillis)
168  {
169    this(downstreamRequestHandler, null, semaphore, semaphore, semaphore,
170         semaphore, semaphore, semaphore, semaphore, semaphore,
171         rejectTimeoutMillis);
172  }
173
174
175
176  /**
177   * Creates a new concurrent request limiter request handler that can use the
178   * provided semaphore instances to limit the number of operations in progress
179   * concurrently for each type of operation.  The same semaphore instance can
180   * be provided for multiple operation types if performance for those
181   * operations should be limited in aggregate rather than individually (e.g.,
182   * if you don't want the total combined number of search and modify operations
183   * in progress at any time to exceed a given threshold, then you could provide
184   * the same semaphore instance for the {@code modifySemaphore} and
185   * {@code searchSemaphore} arguments).
186   *
187   * @param  downstreamRequestHandler  The downstream request handler that will
188   *                                   be used to actually process the requests
189   *                                   after any appropriate rate limiting has
190   *                                   been performed.  It must not be
191   *                                   {@code null}.
192   * @param  abandonSemaphore          The semaphore to use when processing
193   *                                   abandon operations.  It may be
194   *                                   {@code null} if no concurrent request
195   *                                   limiting should be performed for abandon
196   *                                   operations.
197   * @param  addSemaphore              The semaphore to use when processing add
198   *                                   operations.  It may be {@code null} if no
199   *                                   concurrent request limiting should be
200   *                                   performed for add operations.
201   * @param  bindSemaphore             The semaphore to use when processing
202   *                                   bind operations.  It may be
203   *                                   {@code null} if no concurrent request
204   *                                   limiting should be performed for bind
205   *                                   operations.
206   * @param  compareSemaphore          The semaphore to use when processing
207   *                                   compare operations.  It may be
208   *                                   {@code null} if no concurrent request
209   *                                   limiting should be performed for compare
210   *                                   operations.
211   * @param  deleteSemaphore           The semaphore to use when processing
212   *                                   delete operations.  It may be
213   *                                   {@code null} if no concurrent request
214   *                                   limiting should be performed for delete
215   *                                   operations.
216   * @param  extendedSemaphore         The semaphore to use when processing
217   *                                   extended operations.  It may be
218   *                                   {@code null} if no concurrent request
219   *                                   limiting should be performed for extended
220   *                                   operations.
221   * @param  modifySemaphore           The semaphore to use when processing
222   *                                   modify operations.  It may be
223   *                                   {@code null} if no concurrent request
224   *                                   limiting should be performed for modify
225   *                                   operations.
226   * @param  modifyDNSemaphore         The semaphore to use when processing
227   *                                   modify DN operations.  It may be
228   *                                   {@code null} if no concurrent request
229   *                                   limiting should be performed for modify
230   *                                   DN operations.
231   * @param  searchSemaphore           The semaphore to use when processing
232   *                                   search operations.  It may be
233   *                                   {@code null} if no concurrent request
234   *                                   limiting should be performed for search
235   *                                   operations.
236   * @param  rejectTimeoutMillis       A timeout value (expressed in
237   *                                   milliseconds) that will cause a requested
238   *                                   operation to be rejected rather than
239   *                                   processed if the associate semaphore
240   *                                   cannot be acquired in this length of
241   *                                   time.  A value of zero indicates that the
242   *                                   operation should be rejected immediately
243   *                                   if the maximum number of concurrent
244   *                                   requests are already in progress.  A
245   *                                   value that is less than zero indicates
246   *                                   that no timeout should be imposed and
247   *                                   that requests should be forced to wait as
248   *                                   long as necessary until they can be
249   *                                   processed.
250   */
251  public ConcurrentRequestLimiterRequestHandler(
252              final LDAPListenerRequestHandler downstreamRequestHandler,
253              final Semaphore abandonSemaphore,
254              final Semaphore addSemaphore,
255              final Semaphore bindSemaphore,
256              final Semaphore compareSemaphore,
257              final Semaphore deleteSemaphore,
258              final Semaphore extendedSemaphore,
259              final Semaphore modifySemaphore,
260              final Semaphore modifyDNSemaphore,
261              final Semaphore searchSemaphore,
262              final long rejectTimeoutMillis)
263  {
264    Validator.ensureNotNull(downstreamRequestHandler);
265
266    this.downstreamRequestHandler = downstreamRequestHandler;
267    this.abandonSemaphore         = abandonSemaphore;
268    this.addSemaphore             = addSemaphore;
269    this.bindSemaphore            = bindSemaphore;
270    this.compareSemaphore         = compareSemaphore;
271    this.deleteSemaphore          = deleteSemaphore;
272    this.extendedSemaphore        = extendedSemaphore;
273    this.modifySemaphore          = modifySemaphore;
274    this.modifyDNSemaphore        = modifyDNSemaphore;
275    this.searchSemaphore          = searchSemaphore;
276
277    if (rejectTimeoutMillis >= 0L)
278    {
279      this.rejectTimeoutMillis = rejectTimeoutMillis;
280    }
281    else
282    {
283      this.rejectTimeoutMillis = (long) Integer.MAX_VALUE;
284    }
285  }
286
287
288
289  /**
290   * {@inheritDoc}
291   */
292  @Override()
293  public ConcurrentRequestLimiterRequestHandler newInstance(
294              final LDAPListenerClientConnection connection)
295         throws LDAPException
296  {
297    return new ConcurrentRequestLimiterRequestHandler(
298         downstreamRequestHandler.newInstance(connection), abandonSemaphore,
299         addSemaphore, bindSemaphore, compareSemaphore, deleteSemaphore,
300         extendedSemaphore, modifySemaphore, modifyDNSemaphore,
301         searchSemaphore, rejectTimeoutMillis);
302  }
303
304
305
306  /**
307   * {@inheritDoc}
308   */
309  @Override()
310  public void processAbandonRequest(final int messageID,
311                                    final AbandonRequestProtocolOp request,
312                                    final List<Control> controls)
313  {
314    try
315    {
316      acquirePermit(abandonSemaphore, OperationType.ABANDON);
317    }
318    catch (final LDAPException le)
319    {
320      Debug.debugException(le);
321      return;
322    }
323
324    try
325    {
326      downstreamRequestHandler.processAbandonRequest(messageID, request,
327           controls);
328    }
329    finally
330    {
331      releasePermit(abandonSemaphore);
332    }
333  }
334
335
336
337  /**
338   * {@inheritDoc}
339   */
340  @Override()
341  public LDAPMessage processAddRequest(final int messageID,
342                                       final AddRequestProtocolOp request,
343                                       final List<Control> controls)
344  {
345    try
346    {
347      acquirePermit(addSemaphore, OperationType.ADD);
348    }
349    catch (final LDAPException le)
350    {
351      Debug.debugException(le);
352      return new LDAPMessage(messageID,
353           new AddResponseProtocolOp(le.toLDAPResult()));
354    }
355
356    try
357    {
358      return downstreamRequestHandler.processAddRequest(messageID, request,
359           controls);
360    }
361    finally
362    {
363      releasePermit(addSemaphore);
364    }
365  }
366
367
368
369  /**
370   * {@inheritDoc}
371   */
372  @Override()
373  public LDAPMessage processBindRequest(final int messageID,
374                                        final BindRequestProtocolOp request,
375                                        final List<Control> controls)
376  {
377    try
378    {
379      acquirePermit(bindSemaphore, OperationType.BIND);
380    }
381    catch (final LDAPException le)
382    {
383      Debug.debugException(le);
384      return new LDAPMessage(messageID,
385           new BindResponseProtocolOp(le.toLDAPResult()));
386    }
387
388    try
389    {
390      return downstreamRequestHandler.processBindRequest(messageID, request,
391           controls);
392    }
393    finally
394    {
395      releasePermit(bindSemaphore);
396    }
397  }
398
399
400
401  /**
402   * {@inheritDoc}
403   */
404  @Override()
405  public LDAPMessage processCompareRequest(final int messageID,
406                          final CompareRequestProtocolOp request,
407                          final List<Control> controls)
408  {
409    try
410    {
411      acquirePermit(compareSemaphore, OperationType.COMPARE);
412    }
413    catch (final LDAPException le)
414    {
415      Debug.debugException(le);
416      return new LDAPMessage(messageID,
417           new CompareResponseProtocolOp(le.toLDAPResult()));
418    }
419
420    try
421    {
422      return downstreamRequestHandler.processCompareRequest(messageID, request,
423           controls);
424    }
425    finally
426    {
427      releasePermit(compareSemaphore);
428    }
429  }
430
431
432
433  /**
434   * {@inheritDoc}
435   */
436  @Override()
437  public LDAPMessage processDeleteRequest(final int messageID,
438                                          final DeleteRequestProtocolOp request,
439                                          final List<Control> controls)
440  {
441    try
442    {
443      acquirePermit(deleteSemaphore, OperationType.DELETE);
444    }
445    catch (final LDAPException le)
446    {
447      Debug.debugException(le);
448      return new LDAPMessage(messageID,
449           new DeleteResponseProtocolOp(le.toLDAPResult()));
450    }
451
452    try
453    {
454      return downstreamRequestHandler.processDeleteRequest(messageID, request,
455           controls);
456    }
457    finally
458    {
459      releasePermit(deleteSemaphore);
460    }
461  }
462
463
464
465  /**
466   * {@inheritDoc}
467   */
468  @Override()
469  public LDAPMessage processExtendedRequest(final int messageID,
470                          final ExtendedRequestProtocolOp request,
471                          final List<Control> controls)
472  {
473    try
474    {
475      acquirePermit(extendedSemaphore, OperationType.EXTENDED);
476    }
477    catch (final LDAPException le)
478    {
479      Debug.debugException(le);
480      return new LDAPMessage(messageID,
481           new ExtendedResponseProtocolOp(le.toLDAPResult()));
482    }
483
484    try
485    {
486      return downstreamRequestHandler.processExtendedRequest(messageID, request,
487           controls);
488    }
489    finally
490    {
491      releasePermit(extendedSemaphore);
492    }
493  }
494
495
496
497  /**
498   * {@inheritDoc}
499   */
500  @Override()
501  public LDAPMessage processModifyRequest(final int messageID,
502                                          final ModifyRequestProtocolOp request,
503                                          final List<Control> controls)
504  {
505    try
506    {
507      acquirePermit(modifySemaphore, OperationType.MODIFY);
508    }
509    catch (final LDAPException le)
510    {
511      Debug.debugException(le);
512      return new LDAPMessage(messageID,
513           new ModifyResponseProtocolOp(le.toLDAPResult()));
514    }
515
516    try
517    {
518      return downstreamRequestHandler.processModifyRequest(messageID, request,
519           controls);
520    }
521    finally
522    {
523      releasePermit(modifySemaphore);
524    }
525  }
526
527
528
529  /**
530   * {@inheritDoc}
531   */
532  @Override()
533  public LDAPMessage processModifyDNRequest(final int messageID,
534                          final ModifyDNRequestProtocolOp request,
535                          final List<Control> controls)
536  {
537    try
538    {
539      acquirePermit(modifyDNSemaphore, OperationType.MODIFY_DN);
540    }
541    catch (final LDAPException le)
542    {
543      Debug.debugException(le);
544      return new LDAPMessage(messageID,
545           new ModifyDNResponseProtocolOp(le.toLDAPResult()));
546    }
547
548    try
549    {
550      return downstreamRequestHandler.processModifyDNRequest(messageID, request,
551           controls);
552    }
553    finally
554    {
555      releasePermit(modifyDNSemaphore);
556    }
557  }
558
559
560
561  /**
562   * {@inheritDoc}
563   */
564  @Override()
565  public LDAPMessage processSearchRequest(final int messageID,
566                                          final SearchRequestProtocolOp request,
567                                          final List<Control> controls)
568  {
569    try
570    {
571      acquirePermit(searchSemaphore, OperationType.SEARCH);
572    }
573    catch (final LDAPException le)
574    {
575      Debug.debugException(le);
576      return new LDAPMessage(messageID,
577           new SearchResultDoneProtocolOp(le.toLDAPResult()));
578    }
579
580    try
581    {
582      return downstreamRequestHandler.processSearchRequest(messageID, request,
583           controls);
584    }
585    finally
586    {
587      releasePermit(searchSemaphore);
588    }
589  }
590
591
592
593  /**
594   * Acquires a permit from the provided semaphore.
595   *
596   * @param  semaphore      The semaphore from which to acquire a permit.  It
597   *                        may be {@code null} if no semaphore is needed for
598   *                        the associated operation type.
599   * @param  operationType  The type of operation
600   *
601   * @throws  LDAPException  If it was not possible to acquire a permit from the
602   *                         provided semaphore.
603   */
604  private void acquirePermit(final Semaphore semaphore,
605                             final OperationType operationType)
606          throws LDAPException
607  {
608    if (semaphore == null)
609    {
610      return;
611    }
612
613    try
614    {
615      if (rejectTimeoutMillis == 0L)
616      {
617        if (! semaphore.tryAcquire())
618        {
619          throw new LDAPException(ResultCode.BUSY,
620               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_NO_TIMEOUT.get(
621                    operationType.name()));
622        }
623      }
624      else
625      {
626        if (! semaphore.tryAcquire(rejectTimeoutMillis, TimeUnit.MILLISECONDS))
627        {
628          throw new LDAPException(ResultCode.BUSY,
629               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_TIMEOUT.get(
630                    operationType.name(), rejectTimeoutMillis));
631        }
632      }
633    }
634    catch (final LDAPException le)
635    {
636      throw le;
637    }
638    catch (final Exception e)
639    {
640      Debug.debugException(e);
641      throw new LDAPException(ResultCode.OTHER,
642           ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_SEMAPHORE_EXCEPTION.get(
643                operationType.name(), StaticUtils.getExceptionMessage(e)),
644           e);
645    }
646  }
647
648
649
650  /**
651   * Releases a permit back to the provided semaphore.
652   *
653   * @param  semaphore  The semaphore to which the permit should be released.
654   *                    It may be {@code null} if no semaphore is needed for the
655   *                    associated operation type.
656   */
657  private static void releasePermit(final Semaphore semaphore)
658  {
659    if (semaphore != null)
660    {
661      semaphore.release();
662    }
663  }
664}