I
- The input this#onNext(Object)
signalO
- The output type to listen for with this#subscribe(org.reactivestreams.Subscriber)
public abstract class Action<I,O> extends Stream<O> implements org.reactivestreams.Processor<I,O>, Consumer<I>, Recyclable, Control
Publisher
and in particular
to a Stream
. Stream is usually the place where actions are created.
An Action is also a data producer, and therefore implements Processor
.
An imperative programming equivalent of an action is a method or function. The main difference is that it also
reacts on various Subscriber
signals and produce an output data for
any downstream subscription.
The implementation specifics of an Action lies in two core features:
- Its signal scheduler on Dispatcher
- Its smart capacity awareness to prevent Dispatcher
overflow
Up to a maximum capacity defined with this#capacity(long)
will be allowed to be dispatched by requesting
the tracked remaining slots to the upstream Subscription
. This maximum in-flight data
is a value to tune accordingly with the system and the requirements. An Action will bypass this feature anytime it is
not the root of stream processing chain e.g.:
stream.filter(..).map(..) :
In that Stream, filter is a FilterAction and has no upstream action, only the publisher it is attached to. The FilterAction will decide to be capacity aware and will track demand. The MapAction will however behave like a firehose and will not track the demand, passing any request upstream.
Implementing an Action is highly recommended to work with Stream without dealing with tracking issues and other threading matters. Usually an implementation will override any doXXXXX method where 'do' is an hint that logic will safely be dispatched to avoid race-conditions.
Modifier and Type | Field and Description |
---|---|
protected long |
capacity |
protected PushSubscription<O> |
downstreamSubscription |
static int |
NO_CAPACITY |
static int |
RESERVED_SLOTS
onComplete, onError, request, onSubscribe are dispatched events, therefore up to capacity + 4 events can be
in-flight
stacking into a Dispatcher.
|
protected PushSubscription<I> |
upstreamSubscription
The upstream request tracker to avoid dispatcher overrun, based on the current
this#capacity |
Modifier and Type | Method and Description |
---|---|
void |
accept(I i)
Execute the logic of the action, accepting the given parameter.
|
protected boolean |
addSubscription(PushSubscription<O> subscription) |
protected void |
broadcastComplete()
Send a complete event to all the attached
Subscriber ONLY IF the underlying state is READY. |
protected void |
broadcastError(Throwable throwable)
Send an error to all the attached
Subscriber . |
protected void |
broadcastNext(O ev)
Send an element of parameterized type {link O} to all the attached
Subscriber . |
void |
cancel()
Stop consuming signals from upstream.
|
boolean |
cancelSubscription(PushSubscription<O> subscription)
--------------------------------------------------------------------------------------------------------
INTERNALS
--------------------------------------------------------------------------------------------------------
|
Action<I,O> |
capacity(long elements)
--------------------------------------------------------------------------------------------------------
ACTION MODIFIERS
--------------------------------------------------------------------------------------------------------
|
static void |
checkRequest(long n) |
<E> CompositeAction<E,O> |
combine()
Combine the most ancient upstream action to act as the
Subscriber input component and
the current stream to act as the Publisher . |
<E> Action<I,O> |
control(Stream<E> controlStream,
Consumer<Tuple2<Action<I,O>,? super E>> controller)
Consume a Stream to allow for dynamic
Action update. |
protected PushSubscription<O> |
createSubscription(org.reactivestreams.Subscriber<? super O> subscriber,
boolean reactivePull) |
protected PushSubscription<O> |
createSubscription(org.reactivestreams.Subscriber<? super O> subscriber,
CompletableQueue<O> queue) |
protected PushSubscription<I> |
createTrackingSubscription(org.reactivestreams.Subscription subscription) |
StreamUtils.StreamVisitor |
debug()
Print a debugged form of the root action relative to this one.
|
protected void |
doComplete() |
protected void |
doError(Throwable ev) |
protected abstract void |
doNext(I ev) |
protected void |
doOnSubscribe(org.reactivestreams.Subscription subscription) |
protected void |
doShutdown() |
protected void |
doStart() |
PushSubscription<O> |
downstreamSubscription()
Get the current action child subscription
|
static long |
evaluateCapacity(long n) |
<P extends org.reactivestreams.Publisher<?>> |
findOldestUpstream(Class<P> clazz)
Utility to find the most ancient subscribed Action.
|
long |
getCapacity()
--------------------------------------------------------------------------------------------------------
ACTION STATE
--------------------------------------------------------------------------------------------------------
|
PushSubscription<I> |
getSubscription()
Get the current upstream subscription if any
|
boolean |
isPublishing()
Check if the current stream is emitting any signal.
|
void |
onComplete() |
void |
onError(Throwable cause) |
void |
onNext(I ev) |
Stream<O> |
onOverflowBuffer(Supplier<? extends CompletableQueue<O>> queueSupplier)
Attach a No-Op Action that only serves the purpose of buffering incoming values if not enough demand is signaled
downstream.
|
void |
onSubscribe(org.reactivestreams.Subscription subscription) |
void |
recycle()
Free any internal resources and reset the state of the object to enable reuse.
|
void |
requestAll()
Usually requests Long.MAX_VALUE, which instructs a stream to never end until completed or cancelled.
|
void |
requestMore(long n)
Request the next n elements from the source
|
protected void |
requestUpstream(long capacity,
boolean terminated,
long elements) |
void |
subscribe(org.reactivestreams.Subscriber<? super O> subscriber)
--------------------------------------------------------------------------------------------------------
ACTION SIGNAL HANDLING
--------------------------------------------------------------------------------------------------------
|
protected void |
subscribeWithSubscription(org.reactivestreams.Subscriber<? super O> subscriber,
PushSubscription<O> subscription)
Subscribe a given subscriber and pairs it with a given subscription instead of letting the Stream pick it
automatically.
|
Consumer<?> |
toBroadcastCompleteConsumer()
Create a consumer that broadcast complete signal from any accepted value.
|
Consumer<Throwable> |
toBroadcastErrorConsumer()
Create a consumer that broadcast error signal from any accepted value.
|
Consumer<O> |
toBroadcastNextConsumer()
Create a consumer that broadcast next signal from accepted values.
|
String |
toString() |
adaptiveConsume, adaptiveConsumeOn, after, batchConsume, batchConsumeOn, broadcast, broadcastOn, broadcastTo, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, cache, cast, concatMap, concatWith, consume, consume, consume, consume, consume, consumeLater, consumeOn, consumeOn, consumeOn, count, count, decode, defaultIfEmpty, dematerialize, dispatchOn, dispatchOn, dispatchOn, distinct, distinct, distinctUntilChanged, distinctUntilChanged, elapsed, elementAt, elementAtOrDefault, encode, env, exists, fanIn, filter, filter, finallyDo, flatMap, getDispatcher, getEnvironment, getTimer, groupBy, ignoreError, ignoreError, isReactivePull, join, joinWith, keepAlive, last, lift, log, log, map, materialize, merge, mergeWith, nest, next, observe, observeCancel, observeComplete, observeError, observeStart, observeSubscribe, onErrorResumeNext, onErrorResumeNext, onErrorReturn, onErrorReturn, onOverflowBuffer, onOverflowDrop, partition, partition, process, recover, reduce, reduce, repeat, repeat, repeatWhen, requestWhen, retry, retry, retry, retry, retryWhen, sample, sample, sample, sample, sample, sample, sampleFirst, sampleFirst, sampleFirst, sampleFirst, sampleFirst, sampleFirst, scan, scan, skip, skip, skip, skipWhile, skipWhile, sort, sort, sort, sort, split, split, startWith, startWith, startWith, subscribe, subscribeOn, subscribeOn, subscribeOn, switchMap, take, take, take, takeWhile, tap, throttle, throttle, timeout, timeout, timeout, timeout, timestamp, toBlockingQueue, toBlockingQueue, toList, toList, unbounded, when, window, window, window, window, window, window, window, window, window, window, window, zip, zipWith, zipWith
public static final int RESERVED_SLOTS
public static final int NO_CAPACITY
protected PushSubscription<I> upstreamSubscription
this#capacity
protected PushSubscription<O> downstreamSubscription
protected long capacity
public static void checkRequest(long n)
public static long evaluateCapacity(long n)
public void subscribe(org.reactivestreams.Subscriber<? super O> subscriber)
subscribe
in interface org.reactivestreams.Publisher<O>
public void onSubscribe(org.reactivestreams.Subscription subscription)
onSubscribe
in interface org.reactivestreams.Subscriber<I>
protected final void doStart()
public final void accept(I i)
Consumer
public void onComplete()
onComplete
in interface org.reactivestreams.Subscriber<I>
public void onError(Throwable cause)
onError
in interface org.reactivestreams.Subscriber<I>
public Action<I,O> capacity(long elements)
protected void broadcastNext(O ev)
Subscriber
.
A Stream must be in READY state to dispatch signals and will fail fast otherwise (IllegalStateException).ev
- the data to forwardprotected void broadcastError(Throwable throwable)
Subscriber
.
A Stream must be in READY state to dispatch signals and will fail fast otherwise (IllegalStateException).throwable
- the error to forwardprotected void broadcastComplete()
Subscriber
ONLY IF the underlying state is READY.
Unlike broadcastNext(Object)
and broadcastError(Throwable)
it will simply ignore the signal.public boolean isPublishing()
Control
isPublishing
in interface Control
public void cancel()
Control
PushSubscription.terminated
flag.public void requestAll()
Control
requestAll
in interface Control
public StreamUtils.StreamVisitor debug()
debug
in interface Control
StreamUtils.StreamVisitor
a Debug container for the current sourcepublic final <E> Action<I,O> control(Stream<E> controlStream, Consumer<Tuple2<Action<I,O>,? super E>> controller)
Action
update. Everytime
the receives a next signal, the current Action and the input data will be published as a
Tuple2
to the attached .
This is particulary useful to dynamically adapt the Stream
instance : capacity(), pause(), resume()...
controlStream
- The consumed stream, each signal will trigger the passed controllercontroller
- The consumer accepting a pair of Stream and user-provided signal typeStream
instancepublic final Stream<O> onOverflowBuffer(Supplier<? extends CompletableQueue<O>> queueSupplier)
Stream
onOverflowBuffer
in class Stream<O>
queueSupplier
- A completable queue Supplier
to provide support for overflowpublic final <E> CompositeAction<E,O> combine()
Stream
Subscriber
input component and
the current stream to act as the Publisher
.
Useful to share and ship a full stream whilst hiding the staging actions in the middle.
Default behavior, e.g. a single stream, will raise an IllegalStateException
as there would not
be any Subscriber (Input) side to combine. combine()
is the usual reference
implementation used.
public final Consumer<?> toBroadcastCompleteConsumer()
Consumer
ready to forward complete signal to this streampublic final Consumer<O> toBroadcastNextConsumer()
Consumer
ready to forward values to this streampublic final Consumer<Throwable> toBroadcastErrorConsumer()
Consumer
ready to forward error to this streampublic <P extends org.reactivestreams.Publisher<?>> P findOldestUpstream(Class<P> clazz)
public final long getCapacity()
getCapacity
in interface NonBlocking
getCapacity
in class Stream<O>
public PushSubscription<I> getSubscription()
Subscription
public final PushSubscription<O> downstreamSubscription()
downstreamSubscription
in class Stream<O>
PushSubscription
public boolean cancelSubscription(PushSubscription<O> subscription)
cancelSubscription
in class Stream<O>
PushSubscription
protected PushSubscription<O> createSubscription(org.reactivestreams.Subscriber<? super O> subscriber, boolean reactivePull)
protected PushSubscription<O> createSubscription(org.reactivestreams.Subscriber<? super O> subscriber, CompletableQueue<O> queue)
protected void requestUpstream(long capacity, boolean terminated, long elements)
protected PushSubscription<I> createTrackingSubscription(org.reactivestreams.Subscription subscription)
protected void doOnSubscribe(org.reactivestreams.Subscription subscription)
protected void doComplete()
protected abstract void doNext(I ev)
protected void doError(Throwable ev)
public void requestMore(long n)
Control
requestMore
in interface Control
n
- the number of elements to requestprotected void subscribeWithSubscription(org.reactivestreams.Subscriber<? super O> subscriber, PushSubscription<O> subscription)
This is mainly useful for libraries implementors, usually this#lift(reactor.fn.Supplier)
and
this#subscribe(org.reactivestreams.Subscriber)
are just fine.
subscriber
- subscription
- protected boolean addSubscription(PushSubscription<O> subscription)
protected void doShutdown()
public void recycle()
Recyclable
recycle
in interface Recyclable
Copyright © 2016. All rights reserved.