001/*
002 * Copyright 2009-2017 UnboundID Corp.
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2009-2017 UnboundID Corp.
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.ldap.sdk;
022
023
024
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicReference;
029
030import com.unboundid.util.InternalUseOnly;
031import com.unboundid.util.ThreadSafety;
032import com.unboundid.util.ThreadSafetyLevel;
033
034import static com.unboundid.ldap.sdk.LDAPMessages.*;
035import static com.unboundid.util.Debug.*;
036import static com.unboundid.util.Validator.*;
037
038
039
040/**
041 * This class provides an {@link EntrySource} that will read entries matching a
042 * given set of search criteria from an LDAP directory server.  It may
043 * optionally close the associated connection after all entries have been read.
044 * <BR><BR>
045 * This implementation processes the search asynchronously, which provides two
046 * benefits:
047 * <UL>
048 *   <LI>It makes it easier to provide a throttling mechanism to prevent the
049 *       entries from piling up and causing the client to run out of memory if
050 *       the server returns them faster than the client can process them.  If
051 *       this occurs, then the client will queue up a small number of entries
052 *       but will then push back against the server to block it from sending
053 *       additional entries until the client can catch up.  In this case, no
054 *       entries should be lost, although some servers may impose limits on how
055 *       long a search may be active or other forms of constraints.</LI>
056 *   <LI>It makes it possible to abandon the search if the entry source is no
057 *       longer needed (as signified by calling the {@link #close} method) and
058 *       the caller intends to stop iterating through the results.</LI>
059 * </UL>
060 * <H2>Example</H2>
061 * The following example demonstrates the process that may be used for iterating
062 * across all entries containing the {@code person} object class using the LDAP
063 * entry source API:
064 * <PRE>
065 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com",
066 *      SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person"));
067 * LDAPEntrySource entrySource = new LDAPEntrySource(connection,
068 *      searchRequest, false);
069 *
070 * int entriesRead = 0;
071 * int referencesRead = 0;
072 * int exceptionsCaught = 0;
073 * try
074 * {
075 *   while (true)
076 *   {
077 *     try
078 *     {
079 *       Entry entry = entrySource.nextEntry();
080 *       if (entry == null)
081 *       {
082 *         // There are no more entries to be read.
083 *         break;
084 *       }
085 *       else
086 *       {
087 *         // Do something with the entry here.
088 *         entriesRead++;
089 *       }
090 *     }
091 *     catch (SearchResultReferenceEntrySourceException e)
092 *     {
093 *       // The directory server returned a search result reference.
094 *       SearchResultReference searchReference = e.getSearchReference();
095 *       referencesRead++;
096 *     }
097 *     catch (EntrySourceException e)
098 *     {
099 *       // Some kind of problem was encountered (e.g., the connection is no
100 *       // longer valid).  See if we can continue reading entries.
101 *       exceptionsCaught++;
102 *       if (! e.mayContinueReading())
103 *       {
104 *         break;
105 *       }
106 *     }
107 *   }
108 * }
109 * finally
110 * {
111 *   entrySource.close();
112 * }
113 * </PRE>
114 */
115@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
116public final class LDAPEntrySource
117       extends EntrySource
118       implements AsyncSearchResultListener
119{
120  /**
121   * The bogus entry that will be used to signify the end of the results.
122   */
123  private static final String END_OF_RESULTS = "END OF RESULTS";
124
125
126
127  /**
128   * The serial version UID for this serializable class.
129   */
130  private static final long serialVersionUID = 1080386705549149135L;
131
132
133
134  // The request ID associated with the asynchronous search.
135  private final AsyncRequestID asyncRequestID;
136
137  // Indicates whether this entry source has been closed.
138  private final AtomicBoolean closed;
139
140  // The search result for the search operation.
141  private final AtomicReference<SearchResult> searchResult;
142
143  // Indicates whether to close the connection when this entry source is closed.
144  private final boolean closeConnection;
145
146  // The connection that will be used to read the entries.
147  private final LDAPConnection connection;
148
149  // The queue from which entries will be read.
150  private final LinkedBlockingQueue<Object> queue;
151
152
153
154  /**
155   * Creates a new LDAP entry source with the provided information.
156   *
157   * @param  connection       The connection to the directory server from which
158   *                          the entries will be read.  It must not be
159   *                          {@code null}.
160   * @param  searchRequest    The search request that will be used to identify
161   *                          which entries should be returned.  It must not be
162   *                          {@code null}, and it must not be configured with a
163   *                          {@link SearchResultListener}.
164   * @param  closeConnection  Indicates whether the provided connection should
165   *                          be closed whenever all of the entries have been
166   *                          read, or if the {@link #close} method is called.
167   *
168   * @throws  LDAPException  If there is a problem with the provided search
169   *                         request or when trying to communicate with the
170   *                         directory server over the provided connection.
171   */
172  public LDAPEntrySource(final LDAPConnection connection,
173                         final SearchRequest searchRequest,
174                         final boolean closeConnection)
175         throws LDAPException
176  {
177    this(connection, searchRequest, closeConnection, 100);
178  }
179
180
181
182  /**
183   * Creates a new LDAP entry source with the provided information.
184   *
185   * @param  connection       The connection to the directory server from which
186   *                          the entries will be read.  It must not be
187   *                          {@code null}.
188   * @param  searchRequest    The search request that will be used to identify
189   *                          which entries should be returned.  It must not be
190   *                          {@code null}, and it must not be configured with a
191   *                          {@link SearchResultListener}.
192   * @param  closeConnection  Indicates whether the provided connection should
193   *                          be closed whenever all of the entries have been
194   *                          read, or if the {@link #close} method is called.
195   * @param  queueSize        The size of the internal queue used to hold search
196   *                          result entries until they can be consumed by the
197   *                          {@link #nextEntry} method.  The value must be
198   *                          greater than zero.
199   *
200   * @throws  LDAPException  If there is a problem with the provided search
201   *                         request or when trying to communicate with the
202   *                         directory server over the provided connection.
203   */
204  public LDAPEntrySource(final LDAPConnection connection,
205                         final SearchRequest searchRequest,
206                         final boolean closeConnection,
207                         final int queueSize)
208         throws LDAPException
209  {
210    ensureNotNull(connection, searchRequest);
211    ensureTrue(queueSize > 0,
212               "LDAPEntrySource.queueSize must be greater than 0.");
213
214    this.connection      = connection;
215    this.closeConnection = closeConnection;
216
217    if (searchRequest.getSearchResultListener() != null)
218    {
219      throw new LDAPException(ResultCode.PARAM_ERROR,
220                              ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get());
221    }
222
223    closed       = new AtomicBoolean(false);
224    searchResult = new AtomicReference<SearchResult>();
225    queue        = new LinkedBlockingQueue<Object>(queueSize);
226
227    final SearchRequest r = new SearchRequest(this, searchRequest.getControls(),
228         searchRequest.getBaseDN(), searchRequest.getScope(),
229         searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(),
230         searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(),
231         searchRequest.getFilter(), searchRequest.getAttributes());
232    asyncRequestID = connection.asyncSearch(r);
233  }
234
235
236
237  /**
238   * {@inheritDoc}
239   */
240  @Override()
241  public Entry nextEntry()
242         throws EntrySourceException
243  {
244    while (true)
245    {
246      if (closed.get() && queue.isEmpty())
247      {
248        return null;
249      }
250
251      final Object o;
252      try
253      {
254        o = queue.poll(10L, TimeUnit.MILLISECONDS);
255      }
256      catch (InterruptedException ie)
257      {
258        debugException(ie);
259        Thread.currentThread().interrupt();
260        throw new EntrySourceException(true,
261             ERR_LDAP_ENTRY_SOURCE_NEXT_ENTRY_INTERRUPTED.get(), ie);
262      }
263
264      if (o != null)
265      {
266        if (o == END_OF_RESULTS)
267        {
268          return null;
269        }
270        else if (o instanceof Entry)
271        {
272          return (Entry) o;
273        }
274        else
275        {
276          throw (EntrySourceException) o;
277        }
278      }
279    }
280  }
281
282
283
284  /**
285   * {@inheritDoc}
286   */
287  @Override()
288  public void close()
289  {
290    closeInternal(true);
291  }
292
293
294
295  /**
296   * Closes this LDAP entry source.
297   *
298   * @param  abandon  Indicates whether to attempt to abandon the search.
299   */
300  private void closeInternal(final boolean abandon)
301  {
302    addToQueue(END_OF_RESULTS);
303
304    if (closed.compareAndSet(false, true))
305    {
306      if (abandon)
307      {
308        try
309        {
310          connection.abandon(asyncRequestID);
311        }
312        catch (Exception e)
313        {
314          debugException(e);
315        }
316      }
317
318      if (closeConnection)
319      {
320        connection.close();
321      }
322    }
323  }
324
325
326
327  /**
328   * Retrieves the search result for the search operation, if available.  It
329   * will not be available until the search has completed (as indicated by a
330   * {@code null} return value from the {@link #nextEntry} method).
331   *
332   * @return  The search result for the search operation, or {@code null} if it
333   *          is not available (e.g., because the search has not yet completed).
334   */
335  public SearchResult getSearchResult()
336  {
337    return searchResult.get();
338  }
339
340
341
342  /**
343   * {@inheritDoc}  This is intended for internal use only and should not be
344   * called by anything outside of the LDAP SDK itself.
345   */
346  @InternalUseOnly()
347  public void searchEntryReturned(final SearchResultEntry searchEntry)
348  {
349    addToQueue(searchEntry);
350  }
351
352
353
354  /**
355   * {@inheritDoc}  This is intended for internal use only and should not be
356   * called by anything outside of the LDAP SDK itself.
357   */
358  @InternalUseOnly()
359  public void searchReferenceReturned(
360                   final SearchResultReference searchReference)
361  {
362    addToQueue(new SearchResultReferenceEntrySourceException(searchReference));
363  }
364
365
366
367  /**
368   * {@inheritDoc}  This is intended for internal use only and should not be
369   * called by anything outside of the LDAP SDK itself.
370   */
371  @InternalUseOnly()
372  public void searchResultReceived(final AsyncRequestID requestID,
373                                   final SearchResult searchResult)
374  {
375    this.searchResult.set(searchResult);
376
377    if (! searchResult.getResultCode().equals(ResultCode.SUCCESS))
378    {
379      addToQueue(new EntrySourceException(false,
380           new LDAPSearchException(searchResult)));
381    }
382
383    closeInternal(false);
384  }
385
386
387
388  /**
389   * Adds the provided object to the queue, waiting as long as needed until it
390   * has been added.
391   *
392   * @param  o  The object to be added.  It must not be {@code null}.
393   */
394  private void addToQueue(final Object o)
395  {
396    while (true)
397    {
398      if (closed.get())
399      {
400        return;
401      }
402
403      try
404      {
405        if (queue.offer(o, 100L, TimeUnit.MILLISECONDS))
406        {
407          return;
408        }
409      }
410      catch (InterruptedException ie)
411      {
412        debugException(ie);
413      }
414    }
415  }
416}