public class Environment extends Object implements Iterable<Map.Entry<String,Dispatcher>>, Closeable
Modifier and Type | Field and Description |
---|---|
static String |
DISPATCHER_GROUP
The name of the default ring buffer group dispatcher
|
static String |
MPSC
The name of the default mpsc dispatcher
|
static int |
PROCESSORS
The number of processors available to the runtime
|
static String |
SHARED
The name of the default shared dispatcher
|
static String |
THREAD_POOL
The name of the default thread pool dispatcher
|
static String |
WORK_QUEUE
The name of the default work queue dispatcher
|
Constructor and Description |
---|
Environment()
Creates a new Environment that will use a
PropertiesConfigurationReader to obtain its
initial configuration. |
Environment(ConfigurationReader configurationReader)
Creates a new Environment that will use the given
configurationReader to obtain its initial
configuration. |
Environment(Map<String,Dispatcher> dispatchers,
ConfigurationReader configurationReader)
Creates a new Environment that will contain the given
dispatchers , will use the given configurationReader to obtain additional configuration. |
Modifier and Type | Method and Description |
---|---|
Environment |
addCachedDispatchers(String name,
DispatcherSupplier dispatcherFactory)
Adds the
dispatcherFactory to the environment, storing it using the given name . |
static boolean |
alive()
Read if the context environment has been set
|
static Environment |
assign(Environment environment)
Assign an environment to the context in order to make it available statically in the application from the current
classloader.
|
Environment |
assignErrorJournal()
Assign a default error
Consumer to listen for any call to this#routeError(Throwable) . |
Environment |
assignErrorJournal(Consumer<? super Throwable> errorJournal)
Assign the error
Consumer to listen for any call to this#routeError(Throwable) . |
static Dispatcher |
cachedDispatcher()
Obtain a cached dispatcher out of
this#PROCESSORS maximum pooled. |
static DispatcherSupplier |
cachedDispatchers()
Obtain the default dispatcher supplier from the context environment.
|
static DispatcherSupplier |
cachedDispatchers(String key)
Obtain a dispatcher supplier into the context environment.
|
static DispatcherSupplier |
cachedDispatchers(String key,
DispatcherSupplier dispatcherSupplier)
Register a dispatcher supplier into the context environment.
|
void |
close() |
static DispatcherSupplier |
createDispatcherFactory(String name,
int poolsize,
int bufferSize,
Consumer<Throwable> errorHandler,
com.lmax.disruptor.dsl.ProducerType producerType,
com.lmax.disruptor.WaitStrategy waitStrategy)
Create a RingBuffer pool that will clone up to generated dispatcher and return a different one
on a round robin fashion each time
Supplier.get() is called. |
static Dispatcher |
dispatcher(String key)
Obtain a registred dispatcher.
|
static Dispatcher |
dispatcher(String key,
Dispatcher dispatcher)
Register a dispatcher into the context environment.
|
static Environment |
get()
Read the context environment.
|
Dispatcher |
getCachedDispatcher()
Returns a default cached dispatcher for this environment.
|
DispatcherSupplier |
getCachedDispatchers()
Returns the default dispatcher group for this environment.
|
DispatcherSupplier |
getCachedDispatchers(String name)
Returns the dispatcher factory with the given
name . |
Dispatcher |
getDefaultDispatcher()
Returns the default dispatcher for this environment.
|
Dispatcher |
getDispatcher(String name)
Returns the dispatcher with the given
name . |
int |
getIntProperty(String key,
int defaultValue)
Gets the property with the given
key , converting it to an integer. |
long |
getLongProperty(String key,
long defaultValue)
Gets the property with the given
key , converting it to a long. |
String |
getProperty(String key,
String defaultValue)
Gets the property with the given
key . |
Timer |
getTimer()
Get the
Environment -wide HashWheelTimer . |
static Environment |
initialize()
Create and assign a context environment bound to the current classloader.
|
static Environment |
initialize(Consumer<Throwable> errorConsumer)
Create and assign a context environment bound to the current classloader.
|
static Environment |
initializeIfEmpty()
Create and assign a context environment bound to the current classloader only if it not already set.
|
Iterator<Map.Entry<String,Dispatcher>> |
iterator() |
static DispatcherSupplier |
newCachedDispatchers(int poolsize) |
static DispatcherSupplier |
newCachedDispatchers(int poolsize,
String name) |
static Dispatcher |
newDispatcher()
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcher(int backlog)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcher(int backlog,
int consumers)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcher(int backlog,
int consumers,
DispatcherType dispatcherType)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcher(String key,
int backlog)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcher(String key,
int backlog,
int consumers)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcher(String key,
int backlog,
int consumers,
DispatcherType dispatcherType)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcherLike(String key)
Register a dispatcher into the context environment.
|
static Dispatcher |
newDispatcherLike(String key,
String newKey)
Register a dispatcher into the context environment.
|
static DispatcherSupplier |
newFanOutCachedDispatchers(int poolsize,
String name) |
Environment |
removeCachedDispatchers(String name)
Remove the
dispatcherFactory to the environment keyed as the given name . |
Environment |
removeDispatcher(String name)
Removes the Dispatcher, stored using the given
name from the environment. |
void |
routeError(Throwable throwable)
Route any exception to the environment error journal
this#errorConsumer . |
Environment |
setDispatcher(String name,
Dispatcher dispatcher)
Adds the
dispatcher to the environment, storing it using the given name . |
static Dispatcher |
sharedDispatcher()
Obtain the default dispatcher from the current environment.
|
void |
shutdown()
Shuts down this Environment, causing all of its
Dispatchers to be shut down. |
static TailRecurseDispatcher |
tailRecurse()
Obtain a fresh tailRecurse Dispatcher.
|
static void |
terminate()
Clean and Shutdown the context environment.
|
static Timer |
timer()
Obtain the default timer from the current environment.
|
static Dispatcher |
workDispatcher()
Obtain a multi threaded dispatcher useful for scaling up slow processing.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
forEach, spliterator
public static final String DISPATCHER_GROUP
public static final String SHARED
public static final String MPSC
public static final String THREAD_POOL
public static final String WORK_QUEUE
public static final int PROCESSORS
Runtime.availableProcessors()
public Environment()
PropertiesConfigurationReader
to obtain its
initial configuration. The configuration will be read from the classpath at the location META-INF/reactor/reactor-environment.properties
.public Environment(ConfigurationReader configurationReader)
configurationReader
to obtain its initial
configuration.configurationReader
- The configuration reader to use to obtain initial configurationpublic Environment(Map<String,Dispatcher> dispatchers, ConfigurationReader configurationReader)
dispatchers
, will use the given configurationReader
to obtain additional configuration.dispatchers
- The dispatchers to add include in the EnvironmentconfigurationReader
- The configuration reader to use to obtain additional configurationpublic static Environment initialize()
Environment
public static Environment initialize(Consumer<Throwable> errorConsumer)
errorConsumer
- the callback for uncaught exceptionsEnvironment
public static Environment initializeIfEmpty()
Environment
public static Environment assign(Environment environment)
environment
- The environment to assign to the current contextEnvironment
public static boolean alive()
public static Environment get() throws IllegalStateException
this#assign(Environment)
.IllegalStateException
- if there is no environment initialized.public static void terminate() throws IllegalStateException
this#assign(Environment)
.IllegalStateException
- if there is no environment initialized.public static Timer timer()
HashWheelTimer
. It is suitable for non blocking periodic work
such as eventing, memory access, lock=free code, dispatching...HashWheelTimer
public static Dispatcher sharedDispatcher()
public static Dispatcher workDispatcher()
reactor.bus
.EventBus
or streams reactor.rx.Stream
using reactor.rx.Stream#consumeOn
.public static Dispatcher cachedDispatcher()
this#PROCESSORS
maximum pooled. The dispatchers are created lazily so
it is preferrable to fetch them out of the critical path.
The Cached Dispatcher is suitable for IO work if combined with distinct reactor event buses reactor.bus
.EventBus
or streams reactor.rx.Stream
.public static Dispatcher dispatcher(String key)
reactor.bus
.EventBus
or streams reactor.rx.Stream
.key
- the dispatcher name to findpublic static TailRecurseDispatcher tailRecurse()
public static Dispatcher dispatcher(String key, Dispatcher dispatcher)
key
- the dispatcher name to use for future lookupsdispatcher
- the dispatcher to register, if null, the key will be removedpublic static Dispatcher newDispatcherLike(String key)
key
- the dispatcher configuration name to use to inherit properties frompublic static Dispatcher newDispatcherLike(String key, String newKey)
key
- the dispatcher configuration name to use to inherit properties fromnewKey
- the dispatcher name to use for future lookupspublic static Dispatcher newDispatcher()
public static Dispatcher newDispatcher(int backlog)
backlog
- the dispatcher capacitypublic static Dispatcher newDispatcher(String key, int backlog)
key
- the dispatcher name to use for future lookupsbacklog
- the dispatcher capacitypublic static Dispatcher newDispatcher(int backlog, int consumers)
newDispatcher(String, int)
backlog
- the dispatcher capacityconsumers
- the dispatcher number of consumerspublic static Dispatcher newDispatcher(String key, int backlog, int consumers)
newDispatcher(String, int)
key
- the dispatcher name to use for future lookupsbacklog
- the dispatcher capacityconsumers
- the dispatcher number of consumerspublic static Dispatcher newDispatcher(int backlog, int consumers, DispatcherType dispatcherType)
backlog
- the dispatcher capacityconsumers
- the numbers of consumersdispatcherType
- the dispatcher typepublic static Dispatcher newDispatcher(String key, int backlog, int consumers, DispatcherType dispatcherType)
key
- the dispatcher name to use for future lookupsbacklog
- the dispatcher capacityconsumers
- the numbers of consumersdispatcherType
- the dispatcher typepublic static DispatcherSupplier cachedDispatchers(String key)
Supplier.get()
.key
- the dispatcher factory name to findpublic static DispatcherSupplier cachedDispatchers()
Supplier.get()
.public static DispatcherSupplier cachedDispatchers(String key, DispatcherSupplier dispatcherSupplier)
Supplier.get()
.key
- the dispatcher name to use for future lookupsdispatcherSupplier
- the dispatcher factory to register, if null, the key will be removedpublic static DispatcherSupplier newCachedDispatchers(int poolsize)
public static DispatcherSupplier newCachedDispatchers(int poolsize, String name)
public static DispatcherSupplier newFanOutCachedDispatchers(int poolsize, String name)
public String getProperty(String key, String defaultValue)
key
. If the property does not exist defaultValue
will be
returned.key
- The property keydefaultValue
- The value to return if the property does not existpublic long getLongProperty(String key, long defaultValue)
key
, converting it to a long. If the property does not exist defaultValue
will be returned.key
- The property keydefaultValue
- The value to return if the property does not existpublic int getIntProperty(String key, int defaultValue)
key
, converting it to an integer. If the property does not exist defaultValue
will be returned.key
- The property keydefaultValue
- The value to return if the property does not existpublic Dispatcher getDefaultDispatcher()
PropertiesConfigurationReader
is
being used. This default dispatcher is specified by the value of the reactor.dispatchers.default
property.public Dispatcher getCachedDispatcher()
PropertiesConfigurationReader
is being used. This default dispatcher is specified by the value of the reactor.dispatchers.dispatcherGroup
property.public DispatcherSupplier getCachedDispatchers()
PropertiesConfigurationReader
is being used. This default dispatcher is specified by the value of the reactor.dispatchers.dispatcherGroup
property.public DispatcherSupplier getCachedDispatchers(String name)
name
.name
- The name of the dispatcher factorynull
.IllegalArgumentException
- if the dispatcher does not existpublic Dispatcher getDispatcher(String name)
name
.name
- The name of the dispatchernull
.IllegalArgumentException
- if the dispatcher does not existpublic void routeError(Throwable throwable)
this#errorConsumer
.throwable
- The error to routepublic Environment assignErrorJournal()
Consumer
to listen for any call to this#routeError(Throwable)
. The default
journal will log through SLF4J Logger onto the category "reactor-environment".public Environment assignErrorJournal(Consumer<? super Throwable> errorJournal)
Consumer
to listen for any call to this#routeError(Throwable)
.errorJournal
- the consumer to listen for any exceptionpublic Environment setDispatcher(String name, Dispatcher dispatcher)
dispatcher
to the environment, storing it using the given name
.name
- The name of the dispatcherdispatcher
- The dispatcherpublic Environment addCachedDispatchers(String name, DispatcherSupplier dispatcherFactory)
dispatcherFactory
to the environment, storing it using the given name
.name
- The name of the dispatcher factorydispatcherFactory
- The dispatcher factorypublic Environment removeCachedDispatchers(String name)
dispatcherFactory
to the environment keyed as the given name
.name
- The name of the dispatcher factory to removepublic Environment removeDispatcher(String name)
name
from the environment.name
- The name of the dispatcherpublic Timer getTimer()
Environment
-wide HashWheelTimer
.public void shutdown()
Dispatchers
to be shut down.Resource.shutdown()
public Iterator<Map.Entry<String,Dispatcher>> iterator()
iterator
in interface Iterable<Map.Entry<String,Dispatcher>>
public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
IOException
public static DispatcherSupplier createDispatcherFactory(String name, int poolsize, int bufferSize, Consumer<Throwable> errorHandler, com.lmax.disruptor.dsl.ProducerType producerType, com.lmax.disruptor.WaitStrategy waitStrategy)
Supplier.get()
is called.name
- poolsize
- bufferSize
- errorHandler
- producerType
- waitStrategy
- Copyright © 2016. All rights reserved.