001/*
002 * Copyright 2011-2017 UnboundID Corp.
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2011-2017 UnboundID Corp.
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.InputStream;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Iterator;
032
033import static com.unboundid.util.UtilityMessages.*;
034
035
036
037/**
038 * This class provides an input stream implementation that can aggregate
039 * multiple input streams.  When reading data from this input stream, it will
040 * read from the first input stream until the end of it is reached, at point it
041 * will close it and start reading from the next one, and so on until all input
042 * streams have been exhausted.  Closing the aggregate input stream will cause
043 * all remaining input streams to be closed.
044 */
045@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
046public final class AggregateInputStream
047       extends InputStream
048{
049  // The currently-active input stream.
050  private volatile InputStream activeInputStream;
051
052  // The iterator that will be used to access the input streams.
053  private final Iterator<InputStream> streamIterator;
054
055
056
057  /**
058   * Creates a new aggregate input stream that will use the provided set of
059   * input streams.
060   *
061   * @param  inputStreams  The input streams to be used by this aggregate input
062   *                       stream.  It must not be {@code null}.
063   */
064  public AggregateInputStream(final InputStream... inputStreams)
065  {
066    this(StaticUtils.toList(inputStreams));
067  }
068
069
070
071  /**
072   * Creates a new aggregate input stream that will use the provided set of
073   * input streams.
074   *
075   * @param  inputStreams  The input streams to be used by this aggregate input
076   *                       stream.  It must not be {@code null}.
077   */
078  public AggregateInputStream(
079              final Collection<? extends InputStream> inputStreams)
080  {
081    Validator.ensureNotNull(inputStreams);
082
083    final ArrayList<InputStream> streamList =
084         new ArrayList<InputStream>(inputStreams);
085    streamIterator = streamList.iterator();
086    activeInputStream = null;
087  }
088
089
090
091  /**
092   * Creates a new aggregate input stream that will read data from the specified
093   * files.
094   *
095   * @param  files  The set of files to be read by this aggregate input stream.
096   *                It must not be {@code null}.
097   *
098   * @throws  IOException  If a problem is encountered while attempting to
099   *                       create input streams for the provided files.
100   */
101  public AggregateInputStream(final File... files)
102         throws IOException
103  {
104    Validator.ensureNotNull(files);
105
106    final ArrayList<InputStream> streamList =
107         new ArrayList<InputStream>(files.length);
108
109    IOException ioException = null;
110    for (final File f : files)
111    {
112      try
113      {
114        streamList.add(new FileInputStream(f));
115      }
116      catch (final IOException ioe)
117      {
118        Debug.debugException(ioe);
119        ioException = ioe;
120        break;
121      }
122    }
123
124    if (ioException != null)
125    {
126      for (final InputStream s : streamList)
127      {
128        if (s != null)
129        {
130          try
131          {
132            s.close();
133          }
134          catch (final Exception e)
135          {
136            Debug.debugException(e);
137          }
138        }
139      }
140
141      throw ioException;
142    }
143
144    streamIterator = streamList.iterator();
145    activeInputStream = null;
146  }
147
148
149
150  /**
151   * Reads the next byte of data from the current active input stream, switching
152   * to the next input stream in the set if appropriate.
153   *
154   * @return  The next byte of data that was read, or -1 if all streams have
155   *          been exhausted.
156   *
157   * @throws  IOException  If a problem is encountered while attempting to read
158   *                       data from an input stream.
159   */
160  @Override()
161  public int read()
162         throws IOException
163  {
164    while (true)
165    {
166      if (activeInputStream == null)
167      {
168        if (streamIterator.hasNext())
169        {
170          activeInputStream = streamIterator.next();
171          continue;
172        }
173        else
174        {
175          return -1;
176        }
177      }
178
179      final int byteRead = activeInputStream.read();
180      if (byteRead < 0)
181      {
182        activeInputStream.close();
183        activeInputStream = null;
184      }
185      else
186      {
187        return byteRead;
188      }
189    }
190  }
191
192
193
194  /**
195   * Reads data from the current active input stream into the provided array,
196   * switching to the next input stream in the set if appropriate.
197   *
198   * @param  b  The array into which the data read should be placed, starting
199   *            with an index of zero.  It must not be {@code null}.
200   *
201   * @return  The number of bytes read into the array, or -1 if all streams have
202   *          been exhausted.
203   *
204   * @throws  IOException  If a problem is encountered while attempting to read
205   *                       data from an input stream.
206   */
207  @Override()
208  public int read(final byte[] b)
209         throws IOException
210  {
211    return read(b, 0, b.length);
212  }
213
214
215
216  /**
217   * Reads data from the current active input stream into the provided array,
218   * switching to the next input stream in the set if appropriate.
219   *
220   * @param  b    The array into which the data read should be placed.  It must
221   *              not be {@code null}.
222   * @param  off  The position in the array at which to start writing data.
223   * @param  len  The maximum number of bytes that may be read.
224   *
225   * @return  The number of bytes read into the array, or -1 if all streams have
226   *          been exhausted.
227   *
228   * @throws  IOException  If a problem is encountered while attempting to read
229   *                       data from an input stream.
230   */
231  @Override()
232  public int read(final byte[] b, final int off, final int len)
233         throws IOException
234  {
235    while (true)
236    {
237      if (activeInputStream == null)
238      {
239        if (streamIterator.hasNext())
240        {
241          activeInputStream = streamIterator.next();
242          continue;
243        }
244        else
245        {
246          return -1;
247        }
248      }
249
250      final int bytesRead = activeInputStream.read(b, off, len);
251      if (bytesRead < 0)
252      {
253        activeInputStream.close();
254        activeInputStream = null;
255      }
256      else
257      {
258        return bytesRead;
259      }
260    }
261  }
262
263
264
265  /**
266   * Attempts to skip and discard up to the specified number of bytes from the
267   * input stream.
268   *
269   * @param  n  The number of bytes to attempt to skip.
270   *
271   * @return  The number of bytes actually skipped.
272   *
273   * @throws  IOException  If a problem is encountered while attempting to skip
274   *                       data from the input stream.
275   */
276  @Override()
277  public long skip(final long n)
278         throws IOException
279  {
280    if (activeInputStream == null)
281    {
282      if (streamIterator.hasNext())
283      {
284        activeInputStream = streamIterator.next();
285        return activeInputStream.skip(n);
286      }
287      else
288      {
289        return 0L;
290      }
291    }
292    else
293    {
294      return activeInputStream.skip(n);
295    }
296  }
297
298
299
300  /**
301   * Retrieves an estimate of the number of bytes that can be read without
302   * blocking.
303   *
304   * @return  An estimate of the number of bytes that can be read without
305   *          blocking.
306   *
307   * @throws  IOException  If a problem is encountered while attempting to make
308   *                       the determination.
309   */
310  @Override()
311  public int available()
312         throws IOException
313  {
314    if (activeInputStream == null)
315    {
316      if (streamIterator.hasNext())
317      {
318        activeInputStream = streamIterator.next();
319        return activeInputStream.available();
320      }
321      else
322      {
323        return 0;
324      }
325    }
326    else
327    {
328      return activeInputStream.available();
329    }
330  }
331
332
333
334  /**
335   * Indicates whether this input stream supports the use of the {@code mark}
336   * and {@code reset} methods.  This implementation does not support that
337   * capability.
338   *
339   * @return  {@code false} to indicate that this input stream implementation
340   *          does not support the use of {@code mark} and {@code reset}.
341   */
342  @Override()
343  public boolean markSupported()
344  {
345    return false;
346  }
347
348
349
350  /**
351   * Marks the current position in the input stream.  This input stream does not
352   * support this functionality, so no action will be taken.
353   *
354   * @param  readLimit  The maximum number of bytes that the caller may wish to
355   *                    read before being able to reset the stream.
356   */
357  @Override()
358  public void mark(final int readLimit)
359  {
360    // No implementation is required.
361  }
362
363
364
365  /**
366   * Attempts to reset the position of this input stream to the mark location.
367   * This implementation does not support {@code mark} and {@code reset}
368   * functionality, so this method will always throw an exception.
369   *
370   * @throws  IOException  To indicate that reset is not supported.
371   */
372  @Override()
373  public void reset()
374         throws IOException
375  {
376    throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get());
377  }
378
379
380
381  /**
382   * Closes this input stream.  All associated input streams will be closed.
383   *
384   * @throws  IOException  If an exception was encountered while attempting to
385   *                       close any of the associated streams.  Note that even
386   *                       if an exception is encountered, an attempt will be
387   *                       made to close all streams.
388   */
389  @Override()
390  public void close()
391         throws IOException
392  {
393    IOException firstException = null;
394
395    if (activeInputStream != null)
396    {
397      try
398      {
399        activeInputStream.close();
400      }
401      catch (final IOException ioe)
402      {
403        Debug.debugException(ioe);
404        firstException = ioe;
405      }
406      activeInputStream = null;
407    }
408
409    while (streamIterator.hasNext())
410    {
411      final InputStream s = streamIterator.next();
412      try
413      {
414        s.close();
415      }
416      catch (final IOException ioe)
417      {
418        Debug.debugException(ioe);
419        if (firstException == null)
420        {
421          firstException = ioe;
422        }
423      }
424    }
425
426    if (firstException != null)
427    {
428      throw firstException;
429    }
430  }
431}