5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP) 6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP 8 #include "../rx-includes.hpp" 12 namespace schedulers {
27 typedef detail::action_queue queue_type;
31 struct new_worker_state :
public std::enable_shared_from_this<new_worker_state>
33 typedef detail::schedulable_queue<
34 typename clock_type::time_point> queue_item_time;
36 typedef queue_item_time::item_type item_type;
38 virtual ~new_worker_state()
48 mutable std::mutex lock;
49 mutable std::condition_variable wake;
50 mutable queue_item_time q;
55 std::shared_ptr<new_worker_state> state;
62 explicit new_worker(std::shared_ptr<new_worker_state> ws)
68 : state(std::make_shared<new_worker_state>(cs))
70 auto keepAlive = state;
72 state->lifetime.add([keepAlive](){
73 std::unique_lock<std::mutex> guard(keepAlive->lock);
74 auto expired = std::move(keepAlive->q);
75 if (!keepAlive->q.empty()) std::terminate();
76 keepAlive->wake.notify_one();
78 if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
80 keepAlive->worker.join();
83 keepAlive->worker.detach();
87 state->worker = tf([keepAlive](){
90 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
93 queue_type::destroy();
97 std::unique_lock<std::mutex> guard(keepAlive->lock);
98 if (keepAlive->q.empty()) {
99 keepAlive->wake.wait(guard, [keepAlive](){
100 return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
103 if (!keepAlive->lifetime.is_subscribed()) {
106 auto& peek = keepAlive->q.top();
107 if (!peek.what.is_subscribed()) {
111 if (clock_type::now() < peek.when) {
112 keepAlive->wake.wait_until(guard, peek.when);
115 auto what = peek.what;
117 keepAlive->r.reset(keepAlive->q.empty());
119 what(keepAlive->r.get_recurse());
124 virtual clock_type::time_point
now()
const {
125 return clock_type::now();
128 virtual void schedule(
const schedulable& scbl)
const {
129 schedule(
now(), scbl);
132 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
134 std::unique_lock<std::mutex> guard(state->lock);
135 state->q.push(new_worker_state::item_type(when, scbl));
136 state->r.reset(
false);
138 state->wake.notify_one();
146 : factory([](std::function<void()> start){
147 return std::thread(std::move(start));
159 virtual clock_type::time_point
now()
const {
160 return clock_type::now();
164 return worker(cs, std::make_shared<new_worker>(cs, factory));
169 static scheduler instance = make_scheduler<new_thread>();
173 return make_scheduler<new_thread>(tf);
Definition: rx-scheduler.hpp:163
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
virtual clock_type::time_point now() const
Definition: rx-newthread.hpp:159
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
new_thread()
Definition: rx-newthread.hpp:145
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:155
bool is_subscribed() const
Definition: rx-scheduler.hpp:592
scheduler make_new_thread()
Definition: rx-newthread.hpp:168
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:151
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
virtual worker create_worker(composite_subscription cs) const
Definition: rx-newthread.hpp:163
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200