001/* 002 * Copyright 2016-2017 UnboundID Corp. 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2016-2017 UnboundID Corp. 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.ldap.listener; 022 023 024 025import java.util.List; 026import java.util.concurrent.Semaphore; 027import java.util.concurrent.TimeUnit; 028 029import com.unboundid.ldap.protocol.AbandonRequestProtocolOp; 030import com.unboundid.ldap.protocol.AddRequestProtocolOp; 031import com.unboundid.ldap.protocol.AddResponseProtocolOp; 032import com.unboundid.ldap.protocol.BindRequestProtocolOp; 033import com.unboundid.ldap.protocol.BindResponseProtocolOp; 034import com.unboundid.ldap.protocol.CompareRequestProtocolOp; 035import com.unboundid.ldap.protocol.CompareResponseProtocolOp; 036import com.unboundid.ldap.protocol.DeleteRequestProtocolOp; 037import com.unboundid.ldap.protocol.DeleteResponseProtocolOp; 038import com.unboundid.ldap.protocol.ExtendedRequestProtocolOp; 039import com.unboundid.ldap.protocol.ExtendedResponseProtocolOp; 040import com.unboundid.ldap.protocol.LDAPMessage; 041import com.unboundid.ldap.protocol.ModifyRequestProtocolOp; 042import com.unboundid.ldap.protocol.ModifyResponseProtocolOp; 043import com.unboundid.ldap.protocol.ModifyDNRequestProtocolOp; 044import com.unboundid.ldap.protocol.ModifyDNResponseProtocolOp; 045import com.unboundid.ldap.protocol.SearchRequestProtocolOp; 046import com.unboundid.ldap.protocol.SearchResultDoneProtocolOp; 047import com.unboundid.ldap.sdk.Control; 048import com.unboundid.ldap.sdk.LDAPException; 049import com.unboundid.ldap.sdk.OperationType; 050import com.unboundid.ldap.sdk.ResultCode; 051import com.unboundid.util.Debug; 052import com.unboundid.util.NotMutable; 053import com.unboundid.util.StaticUtils; 054import com.unboundid.util.ThreadSafety; 055import com.unboundid.util.ThreadSafetyLevel; 056import com.unboundid.util.Validator; 057 058import static com.unboundid.ldap.listener.ListenerMessages.*; 059 060 061 062/** 063 * This class provides an implementation of an LDAP listener request handler 064 * that can be used to limit the number of requests that may be processed 065 * concurrently. It uses one or more {@link Semaphore} instances to limit the 066 * number of requests that may be processed at any time, and provides the 067 * ability to impose limiting on a per-operation-type basis. 068 */ 069@NotMutable() 070@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE) 071public final class ConcurrentRequestLimiterRequestHandler 072 extends LDAPListenerRequestHandler 073{ 074 // The downstream request handler that will be used to process the requests 075 // after any appropriate concurrent request limiting has been performed. 076 private final LDAPListenerRequestHandler downstreamRequestHandler; 077 078 // A timeout value (expressed in milliseconds) that will cause the operation 079 // to be rejected rather than processed if the associated semaphore cannot be 080 // acquired in this length of time. 081 private final long rejectTimeoutMillis; 082 083 // The semaphores that will be used for each type of operation. 084 private final Semaphore abandonSemaphore; 085 private final Semaphore addSemaphore; 086 private final Semaphore bindSemaphore; 087 private final Semaphore compareSemaphore; 088 private final Semaphore deleteSemaphore; 089 private final Semaphore extendedSemaphore; 090 private final Semaphore modifySemaphore; 091 private final Semaphore modifyDNSemaphore; 092 private final Semaphore searchSemaphore; 093 094 095 096 /** 097 * Creates a new concurrent request limiter request handler that will impose 098 * the specified limit on the number of operations that may be in progress at 099 * any time. The limit will be enforced for all types of operations except 100 * abandon and unbind operations, which will not be limited. 101 * 102 * @param downstreamRequestHandler The downstream request handler that will 103 * be used to actually process the requests 104 * after any appropriate limiting has been 105 * performed. 106 * @param maxConcurrentRequests The maximum number of requests that may 107 * be processed at any given time. This 108 * limit will be enforced for all operation 109 * types except abandon and unbind, which 110 * will not be limited. 111 * @param rejectTimeoutMillis A timeout value (expressed in 112 * milliseconds) that will cause a requested 113 * operation to be rejected rather than 114 * processed if the associate semaphore 115 * cannot be acquired in this length of 116 * time. A value of zero indicates that the 117 * operation should be rejected immediately 118 * if the maximum number of concurrent 119 * requests are already in progress. A 120 * value that is less than zero indicates 121 * that no timeout should be imposed and 122 * that requests should be forced to wait as 123 * long as necessary until they can be 124 * processed. 125 */ 126 public ConcurrentRequestLimiterRequestHandler( 127 final LDAPListenerRequestHandler downstreamRequestHandler, 128 final int maxConcurrentRequests, final long rejectTimeoutMillis) 129 { 130 this(downstreamRequestHandler, new Semaphore(maxConcurrentRequests), 131 rejectTimeoutMillis); 132 } 133 134 135 136 /** 137 * Creates a new concurrent request limiter request handler that will use the 138 * provided semaphore to limit on the number of operations that may be in 139 * progress at any time. The limit will be enforced for all types of 140 * operations except abandon and unbind operations, which will not be limited. 141 * 142 * @param downstreamRequestHandler The downstream request handler that will 143 * be used to actually process the requests 144 * after any appropriate limiting has been 145 * performed. 146 * @param semaphore The semaphore that will be used to limit 147 * the number of concurrent operations in 148 * progress, for all operation types except 149 * abandon and unbind. 150 * @param rejectTimeoutMillis A timeout value (expressed in 151 * milliseconds) that will cause a requested 152 * operation to be rejected rather than 153 * processed if the associate semaphore 154 * cannot be acquired in this length of 155 * time. A value of zero indicates that the 156 * operation should be rejected immediately 157 * if the maximum number of concurrent 158 * requests are already in progress. A 159 * value that is less than zero indicates 160 * that no timeout should be imposed and 161 * that requests should be forced to wait as 162 * long as necessary until they can be 163 * processed. 164 */ 165 public ConcurrentRequestLimiterRequestHandler( 166 final LDAPListenerRequestHandler downstreamRequestHandler, 167 final Semaphore semaphore, final long rejectTimeoutMillis) 168 { 169 this(downstreamRequestHandler, null, semaphore, semaphore, semaphore, 170 semaphore, semaphore, semaphore, semaphore, semaphore, 171 rejectTimeoutMillis); 172 } 173 174 175 176 /** 177 * Creates a new concurrent request limiter request handler that can use the 178 * provided semaphore instances to limit the number of operations in progress 179 * concurrently for each type of operation. The same semaphore instance can 180 * be provided for multiple operation types if performance for those 181 * operations should be limited in aggregate rather than individually (e.g., 182 * if you don't want the total combined number of search and modify operations 183 * in progress at any time to exceed a given threshold, then you could provide 184 * the same semaphore instance for the {@code modifySemaphore} and 185 * {@code searchSemaphore} arguments). 186 * 187 * @param downstreamRequestHandler The downstream request handler that will 188 * be used to actually process the requests 189 * after any appropriate rate limiting has 190 * been performed. It must not be 191 * {@code null}. 192 * @param abandonSemaphore The semaphore to use when processing 193 * abandon operations. It may be 194 * {@code null} if no concurrent request 195 * limiting should be performed for abandon 196 * operations. 197 * @param addSemaphore The semaphore to use when processing add 198 * operations. It may be {@code null} if no 199 * concurrent request limiting should be 200 * performed for add operations. 201 * @param bindSemaphore The semaphore to use when processing 202 * bind operations. It may be 203 * {@code null} if no concurrent request 204 * limiting should be performed for bind 205 * operations. 206 * @param compareSemaphore The semaphore to use when processing 207 * compare operations. It may be 208 * {@code null} if no concurrent request 209 * limiting should be performed for compare 210 * operations. 211 * @param deleteSemaphore The semaphore to use when processing 212 * delete operations. It may be 213 * {@code null} if no concurrent request 214 * limiting should be performed for delete 215 * operations. 216 * @param extendedSemaphore The semaphore to use when processing 217 * extended operations. It may be 218 * {@code null} if no concurrent request 219 * limiting should be performed for extended 220 * operations. 221 * @param modifySemaphore The semaphore to use when processing 222 * modify operations. It may be 223 * {@code null} if no concurrent request 224 * limiting should be performed for modify 225 * operations. 226 * @param modifyDNSemaphore The semaphore to use when processing 227 * modify DN operations. It may be 228 * {@code null} if no concurrent request 229 * limiting should be performed for modify 230 * DN operations. 231 * @param searchSemaphore The semaphore to use when processing 232 * search operations. It may be 233 * {@code null} if no concurrent request 234 * limiting should be performed for search 235 * operations. 236 * @param rejectTimeoutMillis A timeout value (expressed in 237 * milliseconds) that will cause a requested 238 * operation to be rejected rather than 239 * processed if the associate semaphore 240 * cannot be acquired in this length of 241 * time. A value of zero indicates that the 242 * operation should be rejected immediately 243 * if the maximum number of concurrent 244 * requests are already in progress. A 245 * value that is less than zero indicates 246 * that no timeout should be imposed and 247 * that requests should be forced to wait as 248 * long as necessary until they can be 249 * processed. 250 */ 251 public ConcurrentRequestLimiterRequestHandler( 252 final LDAPListenerRequestHandler downstreamRequestHandler, 253 final Semaphore abandonSemaphore, 254 final Semaphore addSemaphore, 255 final Semaphore bindSemaphore, 256 final Semaphore compareSemaphore, 257 final Semaphore deleteSemaphore, 258 final Semaphore extendedSemaphore, 259 final Semaphore modifySemaphore, 260 final Semaphore modifyDNSemaphore, 261 final Semaphore searchSemaphore, 262 final long rejectTimeoutMillis) 263 { 264 Validator.ensureNotNull(downstreamRequestHandler); 265 266 this.downstreamRequestHandler = downstreamRequestHandler; 267 this.abandonSemaphore = abandonSemaphore; 268 this.addSemaphore = addSemaphore; 269 this.bindSemaphore = bindSemaphore; 270 this.compareSemaphore = compareSemaphore; 271 this.deleteSemaphore = deleteSemaphore; 272 this.extendedSemaphore = extendedSemaphore; 273 this.modifySemaphore = modifySemaphore; 274 this.modifyDNSemaphore = modifyDNSemaphore; 275 this.searchSemaphore = searchSemaphore; 276 277 if (rejectTimeoutMillis >= 0L) 278 { 279 this.rejectTimeoutMillis = rejectTimeoutMillis; 280 } 281 else 282 { 283 this.rejectTimeoutMillis = (long) Integer.MAX_VALUE; 284 } 285 } 286 287 288 289 /** 290 * {@inheritDoc} 291 */ 292 @Override() 293 public ConcurrentRequestLimiterRequestHandler newInstance( 294 final LDAPListenerClientConnection connection) 295 throws LDAPException 296 { 297 return new ConcurrentRequestLimiterRequestHandler( 298 downstreamRequestHandler.newInstance(connection), abandonSemaphore, 299 addSemaphore, bindSemaphore, compareSemaphore, deleteSemaphore, 300 extendedSemaphore, modifySemaphore, modifyDNSemaphore, 301 searchSemaphore, rejectTimeoutMillis); 302 } 303 304 305 306 /** 307 * {@inheritDoc} 308 */ 309 @Override() 310 public void processAbandonRequest(final int messageID, 311 final AbandonRequestProtocolOp request, 312 final List<Control> controls) 313 { 314 try 315 { 316 acquirePermit(abandonSemaphore, OperationType.ABANDON); 317 } 318 catch (final LDAPException le) 319 { 320 Debug.debugException(le); 321 return; 322 } 323 324 try 325 { 326 downstreamRequestHandler.processAbandonRequest(messageID, request, 327 controls); 328 } 329 finally 330 { 331 releasePermit(abandonSemaphore); 332 } 333 } 334 335 336 337 /** 338 * {@inheritDoc} 339 */ 340 @Override() 341 public LDAPMessage processAddRequest(final int messageID, 342 final AddRequestProtocolOp request, 343 final List<Control> controls) 344 { 345 try 346 { 347 acquirePermit(addSemaphore, OperationType.ADD); 348 } 349 catch (final LDAPException le) 350 { 351 Debug.debugException(le); 352 return new LDAPMessage(messageID, 353 new AddResponseProtocolOp(le.toLDAPResult())); 354 } 355 356 try 357 { 358 return downstreamRequestHandler.processAddRequest(messageID, request, 359 controls); 360 } 361 finally 362 { 363 releasePermit(addSemaphore); 364 } 365 } 366 367 368 369 /** 370 * {@inheritDoc} 371 */ 372 @Override() 373 public LDAPMessage processBindRequest(final int messageID, 374 final BindRequestProtocolOp request, 375 final List<Control> controls) 376 { 377 try 378 { 379 acquirePermit(bindSemaphore, OperationType.BIND); 380 } 381 catch (final LDAPException le) 382 { 383 Debug.debugException(le); 384 return new LDAPMessage(messageID, 385 new BindResponseProtocolOp(le.toLDAPResult())); 386 } 387 388 try 389 { 390 return downstreamRequestHandler.processBindRequest(messageID, request, 391 controls); 392 } 393 finally 394 { 395 releasePermit(bindSemaphore); 396 } 397 } 398 399 400 401 /** 402 * {@inheritDoc} 403 */ 404 @Override() 405 public LDAPMessage processCompareRequest(final int messageID, 406 final CompareRequestProtocolOp request, 407 final List<Control> controls) 408 { 409 try 410 { 411 acquirePermit(compareSemaphore, OperationType.COMPARE); 412 } 413 catch (final LDAPException le) 414 { 415 Debug.debugException(le); 416 return new LDAPMessage(messageID, 417 new CompareResponseProtocolOp(le.toLDAPResult())); 418 } 419 420 try 421 { 422 return downstreamRequestHandler.processCompareRequest(messageID, request, 423 controls); 424 } 425 finally 426 { 427 releasePermit(compareSemaphore); 428 } 429 } 430 431 432 433 /** 434 * {@inheritDoc} 435 */ 436 @Override() 437 public LDAPMessage processDeleteRequest(final int messageID, 438 final DeleteRequestProtocolOp request, 439 final List<Control> controls) 440 { 441 try 442 { 443 acquirePermit(deleteSemaphore, OperationType.DELETE); 444 } 445 catch (final LDAPException le) 446 { 447 Debug.debugException(le); 448 return new LDAPMessage(messageID, 449 new DeleteResponseProtocolOp(le.toLDAPResult())); 450 } 451 452 try 453 { 454 return downstreamRequestHandler.processDeleteRequest(messageID, request, 455 controls); 456 } 457 finally 458 { 459 releasePermit(deleteSemaphore); 460 } 461 } 462 463 464 465 /** 466 * {@inheritDoc} 467 */ 468 @Override() 469 public LDAPMessage processExtendedRequest(final int messageID, 470 final ExtendedRequestProtocolOp request, 471 final List<Control> controls) 472 { 473 try 474 { 475 acquirePermit(extendedSemaphore, OperationType.EXTENDED); 476 } 477 catch (final LDAPException le) 478 { 479 Debug.debugException(le); 480 return new LDAPMessage(messageID, 481 new ExtendedResponseProtocolOp(le.toLDAPResult())); 482 } 483 484 try 485 { 486 return downstreamRequestHandler.processExtendedRequest(messageID, request, 487 controls); 488 } 489 finally 490 { 491 releasePermit(extendedSemaphore); 492 } 493 } 494 495 496 497 /** 498 * {@inheritDoc} 499 */ 500 @Override() 501 public LDAPMessage processModifyRequest(final int messageID, 502 final ModifyRequestProtocolOp request, 503 final List<Control> controls) 504 { 505 try 506 { 507 acquirePermit(modifySemaphore, OperationType.MODIFY); 508 } 509 catch (final LDAPException le) 510 { 511 Debug.debugException(le); 512 return new LDAPMessage(messageID, 513 new ModifyResponseProtocolOp(le.toLDAPResult())); 514 } 515 516 try 517 { 518 return downstreamRequestHandler.processModifyRequest(messageID, request, 519 controls); 520 } 521 finally 522 { 523 releasePermit(modifySemaphore); 524 } 525 } 526 527 528 529 /** 530 * {@inheritDoc} 531 */ 532 @Override() 533 public LDAPMessage processModifyDNRequest(final int messageID, 534 final ModifyDNRequestProtocolOp request, 535 final List<Control> controls) 536 { 537 try 538 { 539 acquirePermit(modifyDNSemaphore, OperationType.MODIFY_DN); 540 } 541 catch (final LDAPException le) 542 { 543 Debug.debugException(le); 544 return new LDAPMessage(messageID, 545 new ModifyDNResponseProtocolOp(le.toLDAPResult())); 546 } 547 548 try 549 { 550 return downstreamRequestHandler.processModifyDNRequest(messageID, request, 551 controls); 552 } 553 finally 554 { 555 releasePermit(modifyDNSemaphore); 556 } 557 } 558 559 560 561 /** 562 * {@inheritDoc} 563 */ 564 @Override() 565 public LDAPMessage processSearchRequest(final int messageID, 566 final SearchRequestProtocolOp request, 567 final List<Control> controls) 568 { 569 try 570 { 571 acquirePermit(searchSemaphore, OperationType.SEARCH); 572 } 573 catch (final LDAPException le) 574 { 575 Debug.debugException(le); 576 return new LDAPMessage(messageID, 577 new SearchResultDoneProtocolOp(le.toLDAPResult())); 578 } 579 580 try 581 { 582 return downstreamRequestHandler.processSearchRequest(messageID, request, 583 controls); 584 } 585 finally 586 { 587 releasePermit(searchSemaphore); 588 } 589 } 590 591 592 593 /** 594 * Acquires a permit from the provided semaphore. 595 * 596 * @param semaphore The semaphore from which to acquire a permit. It 597 * may be {@code null} if no semaphore is needed for 598 * the associated operation type. 599 * @param operationType The type of operation 600 * 601 * @throws LDAPException If it was not possible to acquire a permit from the 602 * provided semaphore. 603 */ 604 private void acquirePermit(final Semaphore semaphore, 605 final OperationType operationType) 606 throws LDAPException 607 { 608 if (semaphore == null) 609 { 610 return; 611 } 612 613 try 614 { 615 if (rejectTimeoutMillis == 0L) 616 { 617 if (! semaphore.tryAcquire()) 618 { 619 throw new LDAPException(ResultCode.BUSY, 620 ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_NO_TIMEOUT.get( 621 operationType.name())); 622 } 623 } 624 else 625 { 626 if (! semaphore.tryAcquire(rejectTimeoutMillis, TimeUnit.MILLISECONDS)) 627 { 628 throw new LDAPException(ResultCode.BUSY, 629 ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_TIMEOUT.get( 630 operationType.name(), rejectTimeoutMillis)); 631 } 632 } 633 } 634 catch (final LDAPException le) 635 { 636 throw le; 637 } 638 catch (final Exception e) 639 { 640 Debug.debugException(e); 641 throw new LDAPException(ResultCode.OTHER, 642 ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_SEMAPHORE_EXCEPTION.get( 643 operationType.name(), StaticUtils.getExceptionMessage(e)), 644 e); 645 } 646 } 647 648 649 650 /** 651 * Releases a permit back to the provided semaphore. 652 * 653 * @param semaphore The semaphore to which the permit should be released. 654 * It may be {@code null} if no semaphore is needed for the 655 * associated operation type. 656 */ 657 private static void releasePermit(final Semaphore semaphore) 658 { 659 if (semaphore != null) 660 { 661 semaphore.release(); 662 } 663 } 664}