001/* 002 * Copyright 2009-2017 UnboundID Corp. 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2009-2017 UnboundID Corp. 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.ldap.sdk; 022 023 024 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicReference; 029 030import com.unboundid.util.InternalUseOnly; 031import com.unboundid.util.ThreadSafety; 032import com.unboundid.util.ThreadSafetyLevel; 033 034import static com.unboundid.ldap.sdk.LDAPMessages.*; 035import static com.unboundid.util.Debug.*; 036import static com.unboundid.util.Validator.*; 037 038 039 040/** 041 * This class provides an {@link EntrySource} that will read entries matching a 042 * given set of search criteria from an LDAP directory server. It may 043 * optionally close the associated connection after all entries have been read. 044 * <BR><BR> 045 * This implementation processes the search asynchronously, which provides two 046 * benefits: 047 * <UL> 048 * <LI>It makes it easier to provide a throttling mechanism to prevent the 049 * entries from piling up and causing the client to run out of memory if 050 * the server returns them faster than the client can process them. If 051 * this occurs, then the client will queue up a small number of entries 052 * but will then push back against the server to block it from sending 053 * additional entries until the client can catch up. In this case, no 054 * entries should be lost, although some servers may impose limits on how 055 * long a search may be active or other forms of constraints.</LI> 056 * <LI>It makes it possible to abandon the search if the entry source is no 057 * longer needed (as signified by calling the {@link #close} method) and 058 * the caller intends to stop iterating through the results.</LI> 059 * </UL> 060 * <H2>Example</H2> 061 * The following example demonstrates the process that may be used for iterating 062 * across all entries containing the {@code person} object class using the LDAP 063 * entry source API: 064 * <PRE> 065 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com", 066 * SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person")); 067 * LDAPEntrySource entrySource = new LDAPEntrySource(connection, 068 * searchRequest, false); 069 * 070 * int entriesRead = 0; 071 * int referencesRead = 0; 072 * int exceptionsCaught = 0; 073 * try 074 * { 075 * while (true) 076 * { 077 * try 078 * { 079 * Entry entry = entrySource.nextEntry(); 080 * if (entry == null) 081 * { 082 * // There are no more entries to be read. 083 * break; 084 * } 085 * else 086 * { 087 * // Do something with the entry here. 088 * entriesRead++; 089 * } 090 * } 091 * catch (SearchResultReferenceEntrySourceException e) 092 * { 093 * // The directory server returned a search result reference. 094 * SearchResultReference searchReference = e.getSearchReference(); 095 * referencesRead++; 096 * } 097 * catch (EntrySourceException e) 098 * { 099 * // Some kind of problem was encountered (e.g., the connection is no 100 * // longer valid). See if we can continue reading entries. 101 * exceptionsCaught++; 102 * if (! e.mayContinueReading()) 103 * { 104 * break; 105 * } 106 * } 107 * } 108 * } 109 * finally 110 * { 111 * entrySource.close(); 112 * } 113 * </PRE> 114 */ 115@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 116public final class LDAPEntrySource 117 extends EntrySource 118 implements AsyncSearchResultListener 119{ 120 /** 121 * The bogus entry that will be used to signify the end of the results. 122 */ 123 private static final String END_OF_RESULTS = "END OF RESULTS"; 124 125 126 127 /** 128 * The serial version UID for this serializable class. 129 */ 130 private static final long serialVersionUID = 1080386705549149135L; 131 132 133 134 // The request ID associated with the asynchronous search. 135 private final AsyncRequestID asyncRequestID; 136 137 // Indicates whether this entry source has been closed. 138 private final AtomicBoolean closed; 139 140 // The search result for the search operation. 141 private final AtomicReference<SearchResult> searchResult; 142 143 // Indicates whether to close the connection when this entry source is closed. 144 private final boolean closeConnection; 145 146 // The connection that will be used to read the entries. 147 private final LDAPConnection connection; 148 149 // The queue from which entries will be read. 150 private final LinkedBlockingQueue<Object> queue; 151 152 153 154 /** 155 * Creates a new LDAP entry source with the provided information. 156 * 157 * @param connection The connection to the directory server from which 158 * the entries will be read. It must not be 159 * {@code null}. 160 * @param searchRequest The search request that will be used to identify 161 * which entries should be returned. It must not be 162 * {@code null}, and it must not be configured with a 163 * {@link SearchResultListener}. 164 * @param closeConnection Indicates whether the provided connection should 165 * be closed whenever all of the entries have been 166 * read, or if the {@link #close} method is called. 167 * 168 * @throws LDAPException If there is a problem with the provided search 169 * request or when trying to communicate with the 170 * directory server over the provided connection. 171 */ 172 public LDAPEntrySource(final LDAPConnection connection, 173 final SearchRequest searchRequest, 174 final boolean closeConnection) 175 throws LDAPException 176 { 177 this(connection, searchRequest, closeConnection, 100); 178 } 179 180 181 182 /** 183 * Creates a new LDAP entry source with the provided information. 184 * 185 * @param connection The connection to the directory server from which 186 * the entries will be read. It must not be 187 * {@code null}. 188 * @param searchRequest The search request that will be used to identify 189 * which entries should be returned. It must not be 190 * {@code null}, and it must not be configured with a 191 * {@link SearchResultListener}. 192 * @param closeConnection Indicates whether the provided connection should 193 * be closed whenever all of the entries have been 194 * read, or if the {@link #close} method is called. 195 * @param queueSize The size of the internal queue used to hold search 196 * result entries until they can be consumed by the 197 * {@link #nextEntry} method. The value must be 198 * greater than zero. 199 * 200 * @throws LDAPException If there is a problem with the provided search 201 * request or when trying to communicate with the 202 * directory server over the provided connection. 203 */ 204 public LDAPEntrySource(final LDAPConnection connection, 205 final SearchRequest searchRequest, 206 final boolean closeConnection, 207 final int queueSize) 208 throws LDAPException 209 { 210 ensureNotNull(connection, searchRequest); 211 ensureTrue(queueSize > 0, 212 "LDAPEntrySource.queueSize must be greater than 0."); 213 214 this.connection = connection; 215 this.closeConnection = closeConnection; 216 217 if (searchRequest.getSearchResultListener() != null) 218 { 219 throw new LDAPException(ResultCode.PARAM_ERROR, 220 ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get()); 221 } 222 223 closed = new AtomicBoolean(false); 224 searchResult = new AtomicReference<SearchResult>(); 225 queue = new LinkedBlockingQueue<Object>(queueSize); 226 227 final SearchRequest r = new SearchRequest(this, searchRequest.getControls(), 228 searchRequest.getBaseDN(), searchRequest.getScope(), 229 searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(), 230 searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(), 231 searchRequest.getFilter(), searchRequest.getAttributes()); 232 asyncRequestID = connection.asyncSearch(r); 233 } 234 235 236 237 /** 238 * {@inheritDoc} 239 */ 240 @Override() 241 public Entry nextEntry() 242 throws EntrySourceException 243 { 244 while (true) 245 { 246 if (closed.get() && queue.isEmpty()) 247 { 248 return null; 249 } 250 251 final Object o; 252 try 253 { 254 o = queue.poll(10L, TimeUnit.MILLISECONDS); 255 } 256 catch (InterruptedException ie) 257 { 258 debugException(ie); 259 Thread.currentThread().interrupt(); 260 throw new EntrySourceException(true, 261 ERR_LDAP_ENTRY_SOURCE_NEXT_ENTRY_INTERRUPTED.get(), ie); 262 } 263 264 if (o != null) 265 { 266 if (o == END_OF_RESULTS) 267 { 268 return null; 269 } 270 else if (o instanceof Entry) 271 { 272 return (Entry) o; 273 } 274 else 275 { 276 throw (EntrySourceException) o; 277 } 278 } 279 } 280 } 281 282 283 284 /** 285 * {@inheritDoc} 286 */ 287 @Override() 288 public void close() 289 { 290 closeInternal(true); 291 } 292 293 294 295 /** 296 * Closes this LDAP entry source. 297 * 298 * @param abandon Indicates whether to attempt to abandon the search. 299 */ 300 private void closeInternal(final boolean abandon) 301 { 302 addToQueue(END_OF_RESULTS); 303 304 if (closed.compareAndSet(false, true)) 305 { 306 if (abandon) 307 { 308 try 309 { 310 connection.abandon(asyncRequestID); 311 } 312 catch (Exception e) 313 { 314 debugException(e); 315 } 316 } 317 318 if (closeConnection) 319 { 320 connection.close(); 321 } 322 } 323 } 324 325 326 327 /** 328 * Retrieves the search result for the search operation, if available. It 329 * will not be available until the search has completed (as indicated by a 330 * {@code null} return value from the {@link #nextEntry} method). 331 * 332 * @return The search result for the search operation, or {@code null} if it 333 * is not available (e.g., because the search has not yet completed). 334 */ 335 public SearchResult getSearchResult() 336 { 337 return searchResult.get(); 338 } 339 340 341 342 /** 343 * {@inheritDoc} This is intended for internal use only and should not be 344 * called by anything outside of the LDAP SDK itself. 345 */ 346 @InternalUseOnly() 347 public void searchEntryReturned(final SearchResultEntry searchEntry) 348 { 349 addToQueue(searchEntry); 350 } 351 352 353 354 /** 355 * {@inheritDoc} This is intended for internal use only and should not be 356 * called by anything outside of the LDAP SDK itself. 357 */ 358 @InternalUseOnly() 359 public void searchReferenceReturned( 360 final SearchResultReference searchReference) 361 { 362 addToQueue(new SearchResultReferenceEntrySourceException(searchReference)); 363 } 364 365 366 367 /** 368 * {@inheritDoc} This is intended for internal use only and should not be 369 * called by anything outside of the LDAP SDK itself. 370 */ 371 @InternalUseOnly() 372 public void searchResultReceived(final AsyncRequestID requestID, 373 final SearchResult searchResult) 374 { 375 this.searchResult.set(searchResult); 376 377 if (! searchResult.getResultCode().equals(ResultCode.SUCCESS)) 378 { 379 addToQueue(new EntrySourceException(false, 380 new LDAPSearchException(searchResult))); 381 } 382 383 closeInternal(false); 384 } 385 386 387 388 /** 389 * Adds the provided object to the queue, waiting as long as needed until it 390 * has been added. 391 * 392 * @param o The object to be added. It must not be {@code null}. 393 */ 394 private void addToQueue(final Object o) 395 { 396 while (true) 397 { 398 if (closed.get()) 399 { 400 return; 401 } 402 403 try 404 { 405 if (queue.offer(o, 100L, TimeUnit.MILLISECONDS)) 406 { 407 return; 408 } 409 } 410 catch (InterruptedException ie) 411 { 412 debugException(ie); 413 } 414 } 415 } 416}