T
- the published item typepublic class SubmissionPublisher<T> extends java.lang.Object implements Flow.Publisher<T>, java.lang.AutoCloseable
Flow.Publisher
that asynchronously issues submitted
(non-null) items to current subscribers until it is closed. Each
current subscriber receives newly submitted items in the same order
unless drops or exceptions are encountered. Using a
SubmissionPublisher allows item generators to act as compliant reactive-streams
Publishers relying on drop handling and/or blocking for flow
control.
A SubmissionPublisher uses the Executor
supplied in its
constructor for delivery to subscribers. The best choice of
Executor depends on expected usage. If the generator(s) of
submitted items run in separate threads, and the number of
subscribers can be estimated, consider using a Executors.newFixedThreadPool(int)
. Otherwise consider using the
default, normally the ForkJoinPool.commonPool()
.
Buffering allows producers and consumers to transiently operate
at different rates. Each subscriber uses an independent buffer.
Buffers are created upon first use and expanded as needed up to the
given maximum. (The enforced capacity may be rounded up to the
nearest power of two and/or bounded by the largest value supported
by this implementation.) Invocations of request
do not directly result in
buffer expansion, but risk saturation if unfilled requests exceed
the maximum capacity. The default value of Flow.defaultBufferSize()
may provide a useful starting point for
choosing a capacity based on expected rates, resources, and usages.
Publication methods support different policies about what to do
when buffers are saturated. Method submit
blocks until resources are available. This is simplest, but least
responsive. The offer
methods may drop items (either
immediately or with bounded timeout), but provide an opportunity to
interpose a handler and then retry.
If any Subscriber method throws an exception, its subscription
is cancelled. If a handler is supplied as a constructor argument,
it is invoked before cancellation upon an exception in method
onNext
, but exceptions in methods
onSubscribe
,
onError
and
onComplete
are not recorded or
handled before cancellation. If the supplied Executor throws
RejectedExecutionException
(or any other RuntimeException
or Error) when attempting to execute a task, or a drop handler
throws an exception when processing a dropped item, then the
exception is rethrown. In these cases, not all subscribers will
have been issued the published item. It is usually good practice to
closeExceptionally
in these cases.
Method consume(Consumer)
simplifies support for a
common case in which the only action of a subscriber is to request
and process all items using a supplied function.
This class may also serve as a convenient base for subclasses that generate items, and use the methods in this class to publish them. For example here is a class that periodically publishes the items generated from a supplier. (In practice you might add methods to independently start and stop generation, to share Executors among publishers, and so on, or use a SubmissionPublisher as a component rather than a superclass.)
class PeriodicPublisher<T> extends SubmissionPublisher<T> {
final ScheduledFuture<?> periodicTask;
final ScheduledExecutorService scheduler;
PeriodicPublisher(Executor executor, int maxBufferCapacity,
Supplier<? extends T> supplier,
long period, TimeUnit unit) {
super(executor, maxBufferCapacity);
scheduler = new ScheduledThreadPoolExecutor(1);
periodicTask = scheduler.scheduleAtFixedRate(
() -> submit(supplier.get()), 0, period, unit);
}
public void close() {
periodicTask.cancel(false);
scheduler.shutdown();
super.close();
}
}
Here is an example of a Flow.Processor
implementation.
It uses single-step requests to its publisher for simplicity of
illustration. A more adaptive version could monitor flow using the
lag estimate returned from submit
, along with other utility
methods.
class TransformProcessor<S,T> extends SubmissionPublisher<T>
implements Flow.Processor<S,T> {
final Function<? super S, ? extends T> function;
Flow.Subscription subscription;
TransformProcessor(Executor executor, int maxBufferCapacity,
Function<? super S, ? extends T> function) {
super(executor, maxBufferCapacity);
this.function = function;
}
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1);
}
public void onNext(S item) {
subscription.request(1);
submit(function.apply(item));
}
public void onError(Throwable ex) { closeExceptionally(ex); }
public void onComplete() { close(); }
}
Modifier and Type | Class and Description |
---|---|
private static class |
SubmissionPublisher.BufferedSubscription<T>
A bounded (ring) buffer with integrated control to start a
consumer task whenever items are available.
|
private static class |
SubmissionPublisher.ConsumerSubscriber<T>
Subscriber for method consume
|
(package private) static class |
SubmissionPublisher.ConsumerTask<T>
A task for consuming buffer items and signals, created and
executed whenever they become available.
|
private static class |
SubmissionPublisher.ThreadPerTaskExecutor
Fallback if ForkJoinPool.commonPool() cannot support parallelism
|
Modifier and Type | Field and Description |
---|---|
private static java.util.concurrent.Executor |
ASYNC_POOL
Default executor -- ForkJoinPool.commonPool() unless it cannot
support parallelism.
|
(package private) static int |
BUFFER_CAPACITY_LIMIT
The largest possible power of two array size.
|
(package private) SubmissionPublisher.BufferedSubscription<T> |
clients
Clients (BufferedSubscriptions) are maintained in a linked list
(via their "next" fields).
|
(package private) boolean |
closed
Run status, updated only within locks
|
(package private) java.lang.Throwable |
closedException
If non-null, the exception in closeExceptionally
|
(package private) java.util.concurrent.Executor |
executor |
(package private) int |
maxBufferCapacity |
(package private) java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> |
onNextHandler |
Constructor and Description |
---|
SubmissionPublisher()
Creates a new SubmissionPublisher using the
ForkJoinPool.commonPool() for async delivery to subscribers
(unless it does not support a parallelism level of at least two,
in which case, a new Thread is created to run each task), with
maximum buffer capacity of Flow.defaultBufferSize() , and no
handler for Subscriber exceptions in method onNext . |
SubmissionPublisher(java.util.concurrent.Executor executor,
int maxBufferCapacity)
Creates a new SubmissionPublisher using the given Executor for
async delivery to subscribers, with the given maximum buffer size
for each subscriber, and no handler for Subscriber exceptions in
method
onNext . |
SubmissionPublisher(java.util.concurrent.Executor executor,
int maxBufferCapacity,
java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> handler)
Creates a new SubmissionPublisher using the given Executor for
async delivery to subscribers, with the given maximum buffer size
for each subscriber, and, if non-null, the given handler invoked
when any Subscriber throws an exception in method
onNext . |
Modifier and Type | Method and Description |
---|---|
void |
close()
Unless already closed, issues
onComplete signals to current
subscribers, and disallows subsequent attempts to publish. |
void |
closeExceptionally(java.lang.Throwable error)
Unless already closed, issues
onError signals to current
subscribers with the given error, and disallows subsequent
attempts to publish. |
java.util.concurrent.CompletableFuture<java.lang.Void> |
consume(java.util.function.Consumer<? super T> consumer)
Processes all published items using the given Consumer function.
|
(package private) int |
doOffer(long nanos,
T item,
java.util.function.BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
Common implementation for both forms of offer
|
int |
estimateMaximumLag()
Returns an estimate of the maximum number of items produced but
not yet consumed among all current subscribers.
|
long |
estimateMinimumDemand()
Returns an estimate of the minimum number of items requested
(via
request ) but not
yet produced, among all current subscribers. |
java.lang.Throwable |
getClosedException()
Returns the exception associated with
closeExceptionally , or null if
not closed or if closed normally. |
java.util.concurrent.Executor |
getExecutor()
Returns the Executor used for asynchronous delivery.
|
int |
getMaxBufferCapacity()
Returns the maximum per-subscriber buffer capacity.
|
int |
getNumberOfSubscribers()
Returns the number of current subscribers.
|
java.util.List<Flow.Subscriber<? super T>> |
getSubscribers()
Returns a list of current subscribers for monitoring and
tracking purposes, not for invoking
Flow.Subscriber
methods on the subscribers. |
boolean |
hasSubscribers()
Returns true if this publisher has any subscribers.
|
boolean |
isClosed()
Returns true if this publisher is not accepting submissions.
|
boolean |
isSubscribed(Flow.Subscriber<? super T> subscriber)
Returns true if the given Subscriber is currently subscribed.
|
int |
offer(T item,
java.util.function.BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
Publishes the given item, if possible, to each current subscriber
by asynchronously invoking its
onNext method. |
int |
offer(T item,
long timeout,
java.util.concurrent.TimeUnit unit,
java.util.function.BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
Publishes the given item, if possible, to each current subscriber
by asynchronously invoking its
onNext method, blocking while
resources for any subscription are unavailable, up to the
specified timeout or until the caller thread is interrupted, at
which point the given handler (if non-null) is invoked, and if it
returns true, retried once. |
(package private) static int |
roundCapacity(int cap)
Round capacity to power of 2, at most limit.
|
int |
submit(T item)
Publishes the given item to each current subscriber by
asynchronously invoking its
onNext method, blocking uninterruptibly while resources for any
subscriber are unavailable. |
void |
subscribe(Flow.Subscriber<? super T> subscriber)
Adds the given Subscriber unless already subscribed.
|
static final int BUFFER_CAPACITY_LIMIT
private static final java.util.concurrent.Executor ASYNC_POOL
SubmissionPublisher.BufferedSubscription<T> clients
volatile boolean closed
volatile java.lang.Throwable closedException
final java.util.concurrent.Executor executor
final java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler
final int maxBufferCapacity
public SubmissionPublisher(java.util.concurrent.Executor executor, int maxBufferCapacity, java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> handler)
onNext
.executor
- the executor to use for async delivery,
supporting creation of at least one independent threadmaxBufferCapacity
- the maximum capacity for each
subscriber's buffer (the enforced capacity may be rounded up to
the nearest power of two and/or bounded by the largest value
supported by this implementation; method getMaxBufferCapacity()
returns the actual value)handler
- if non-null, procedure to invoke upon exception
thrown in method onNext
java.lang.NullPointerException
- if executor is nulljava.lang.IllegalArgumentException
- if maxBufferCapacity not
positivepublic SubmissionPublisher(java.util.concurrent.Executor executor, int maxBufferCapacity)
onNext
.executor
- the executor to use for async delivery,
supporting creation of at least one independent threadmaxBufferCapacity
- the maximum capacity for each
subscriber's buffer (the enforced capacity may be rounded up to
the nearest power of two and/or bounded by the largest value
supported by this implementation; method getMaxBufferCapacity()
returns the actual value)java.lang.NullPointerException
- if executor is nulljava.lang.IllegalArgumentException
- if maxBufferCapacity not
positivepublic SubmissionPublisher()
ForkJoinPool.commonPool()
for async delivery to subscribers
(unless it does not support a parallelism level of at least two,
in which case, a new Thread is created to run each task), with
maximum buffer capacity of Flow.defaultBufferSize()
, and no
handler for Subscriber exceptions in method onNext
.static final int roundCapacity(int cap)
public void subscribe(Flow.Subscriber<? super T> subscriber)
onError
method is invoked on
the existing subscription with an IllegalStateException
.
Otherwise, upon success, the Subscriber's onSubscribe
method is invoked
asynchronously with a new Flow.Subscription
. If onSubscribe
throws an exception, the
subscription is cancelled. Otherwise, if this SubmissionPublisher
was closed exceptionally, then the subscriber's onError
method is invoked with the
corresponding exception, or if closed without exception, the
subscriber's onComplete
method is invoked. Subscribers may enable receiving items by
invoking the request
method of the new Subscription, and may unsubscribe by invoking
its cancel
method.subscribe
in interface Flow.Publisher<T>
subscriber
- the subscriberjava.lang.NullPointerException
- if subscriber is nullpublic int submit(T item)
onNext
method, blocking uninterruptibly while resources for any
subscriber are unavailable. This method returns an estimate of
the maximum lag (number of items submitted but not yet consumed)
among all current subscribers. This value is at least one
(accounting for this submitted item) if there are any
subscribers, else zero.
If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, then this exception is rethrown, in which case not all subscribers will have been issued this item.
item
- the (non-null) item to publishjava.lang.IllegalStateException
- if closedjava.lang.NullPointerException
- if item is nulljava.util.concurrent.RejectedExecutionException
- if thrown by Executorpublic int offer(T item, java.util.function.BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
onNext
method. The item may be
dropped by one or more subscribers if resource limits are
exceeded, in which case the given handler (if non-null) is
invoked, and if it returns true, retried once. Other calls to
methods in this class by other threads are blocked while the
handler is invoked. Unless recovery is assured, options are
usually limited to logging the error and/or issuing an onError
signal to the
subscriber.
This method returns a status indicator: If negative, it represents the (negative) number of drops (failed attempts to issue the item to a subscriber). Otherwise it is an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero.
If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, or the drop handler throws an exception when processing a dropped item, then this exception is rethrown.
item
- the (non-null) item to publishonDrop
- if non-null, the handler invoked upon a drop to a
subscriber, with arguments of the subscriber and item; if it
returns true, an offer is re-attempted (once)java.lang.IllegalStateException
- if closedjava.lang.NullPointerException
- if item is nulljava.util.concurrent.RejectedExecutionException
- if thrown by Executorpublic int offer(T item, long timeout, java.util.concurrent.TimeUnit unit, java.util.function.BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
onNext
method, blocking while
resources for any subscription are unavailable, up to the
specified timeout or until the caller thread is interrupted, at
which point the given handler (if non-null) is invoked, and if it
returns true, retried once. (The drop handler may distinguish
timeouts from interrupts by checking whether the current thread
is interrupted.) Other calls to methods in this class by other
threads are blocked while the handler is invoked. Unless
recovery is assured, options are usually limited to logging the
error and/or issuing an onError
signal to the subscriber.
This method returns a status indicator: If negative, it represents the (negative) number of drops (failed attempts to issue the item to a subscriber). Otherwise it is an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero.
If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, or the drop handler throws an exception when processing a dropped item, then this exception is rethrown.
item
- the (non-null) item to publishtimeout
- how long to wait for resources for any subscriber
before giving up, in units of unit
unit
- a TimeUnit
determining how to interpret the
timeout
parameteronDrop
- if non-null, the handler invoked upon a drop to a
subscriber, with arguments of the subscriber and item; if it
returns true, an offer is re-attempted (once)java.lang.IllegalStateException
- if closedjava.lang.NullPointerException
- if item is nulljava.util.concurrent.RejectedExecutionException
- if thrown by Executorfinal int doOffer(long nanos, T item, java.util.function.BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
public void close()
onComplete
signals to current
subscribers, and disallows subsequent attempts to publish.
Upon return, this method does NOT guarantee that all
subscribers have yet completed.close
in interface java.lang.AutoCloseable
public void closeExceptionally(java.lang.Throwable error)
onError
signals to current
subscribers with the given error, and disallows subsequent
attempts to publish. Future subscribers also receive the given
error. Upon return, this method does NOT guarantee
that all subscribers have yet completed.error
- the onError
argument sent to subscribersjava.lang.NullPointerException
- if error is nullpublic boolean isClosed()
public java.lang.Throwable getClosedException()
closeExceptionally
, or null if
not closed or if closed normally.public boolean hasSubscribers()
public int getNumberOfSubscribers()
public java.util.concurrent.Executor getExecutor()
public int getMaxBufferCapacity()
public java.util.List<Flow.Subscriber<? super T>> getSubscribers()
Flow.Subscriber
methods on the subscribers.public boolean isSubscribed(Flow.Subscriber<? super T> subscriber)
subscriber
- the subscriberjava.lang.NullPointerException
- if subscriber is nullpublic long estimateMinimumDemand()
request
) but not
yet produced, among all current subscribers.public int estimateMaximumLag()
public java.util.concurrent.CompletableFuture<java.lang.Void> consume(java.util.function.Consumer<? super T> consumer)
onComplete
, or completed exceptionally upon any error, or an
exception is thrown by the Consumer, or the returned
CompletableFuture is cancelled, in which case no further items
are processed.consumer
- the function applied to each onNext itemjava.lang.NullPointerException
- if consumer is null