RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-newthread.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
15 
17 {
18 private:
19  typedef new_thread this_type;
20  new_thread(const this_type&);
21 
22  struct new_worker : public worker_interface
23  {
24  private:
25  typedef new_worker this_type;
26 
27  typedef detail::action_queue queue_type;
28 
29  new_worker(const this_type&);
30 
31  struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
32  {
33  typedef detail::schedulable_queue<
34  typename clock_type::time_point> queue_item_time;
35 
36  typedef queue_item_time::item_type item_type;
37 
38  virtual ~new_worker_state()
39  {
40  }
41 
42  explicit new_worker_state(composite_subscription cs)
43  : lifetime(cs)
44  {
45  }
46 
47  composite_subscription lifetime;
48  mutable std::mutex lock;
49  mutable std::condition_variable wake;
50  mutable queue_item_time q;
51  std::thread worker;
52  recursion r;
53  };
54 
55  std::shared_ptr<new_worker_state> state;
56 
57  public:
58  virtual ~new_worker()
59  {
60  }
61 
62  explicit new_worker(std::shared_ptr<new_worker_state> ws)
63  : state(ws)
64  {
65  }
66 
67  new_worker(composite_subscription cs, thread_factory& tf)
68  : state(std::make_shared<new_worker_state>(cs))
69  {
70  auto keepAlive = state;
71 
72  state->lifetime.add([keepAlive](){
73  std::unique_lock<std::mutex> guard(keepAlive->lock);
74  auto expired = std::move(keepAlive->q);
75  if (!keepAlive->q.empty()) std::terminate();
76  keepAlive->wake.notify_one();
77 
78  if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
79  guard.unlock();
80  keepAlive->worker.join();
81  }
82  else {
83  keepAlive->worker.detach();
84  }
85  });
86 
87  state->worker = tf([keepAlive](){
88 
89  // take ownership
90  queue_type::ensure(std::make_shared<new_worker>(keepAlive));
91  // release ownership
93  queue_type::destroy();
94  });
95 
96  for(;;) {
97  std::unique_lock<std::mutex> guard(keepAlive->lock);
98  if (keepAlive->q.empty()) {
99  keepAlive->wake.wait(guard, [keepAlive](){
100  return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
101  });
102  }
103  if (!keepAlive->lifetime.is_subscribed()) {
104  break;
105  }
106  auto& peek = keepAlive->q.top();
107  if (!peek.what.is_subscribed()) {
108  keepAlive->q.pop();
109  continue;
110  }
111  if (clock_type::now() < peek.when) {
112  keepAlive->wake.wait_until(guard, peek.when);
113  continue;
114  }
115  auto what = peek.what;
116  keepAlive->q.pop();
117  keepAlive->r.reset(keepAlive->q.empty());
118  guard.unlock();
119  what(keepAlive->r.get_recurse());
120  }
121  });
122  }
123 
124  virtual clock_type::time_point now() const {
125  return clock_type::now();
126  }
127 
128  virtual void schedule(const schedulable& scbl) const {
129  schedule(now(), scbl);
130  }
131 
132  virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
133  if (scbl.is_subscribed()) {
134  std::unique_lock<std::mutex> guard(state->lock);
135  state->q.push(new_worker_state::item_type(when, scbl));
136  state->r.reset(false);
137  }
138  state->wake.notify_one();
139  }
140  };
141 
142  mutable thread_factory factory;
143 
144 public:
146  : factory([](std::function<void()> start){
147  return std::thread(std::move(start));
148  })
149  {
150  }
152  : factory(tf)
153  {
154  }
155  virtual ~new_thread()
156  {
157  }
158 
159  virtual clock_type::time_point now() const {
160  return clock_type::now();
161  }
162 
164  return worker(cs, std::make_shared<new_worker>(cs, factory));
165  }
166 };
167 
169  static scheduler instance = make_scheduler<new_thread>();
170  return instance;
171 }
173  return make_scheduler<new_thread>(tf);
174 }
175 
176 }
177 
178 }
179 
180 #endif
Definition: rx-scheduler.hpp:163
Definition: rx-newthread.hpp:16
Definition: rx-all.hpp:26
virtual clock_type::time_point now() const
Definition: rx-newthread.hpp:159
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
new_thread()
Definition: rx-newthread.hpp:145
std::function< std::thread(std::function< void()>)> thread_factory
Definition: rx-newthread.hpp:14
allows functions to be called at specified times and possibly in other contexts.
Definition: rx-scheduler.hpp:383
virtual ~new_thread()
Definition: rx-newthread.hpp:155
bool is_subscribed() const
Definition: rx-scheduler.hpp:592
scheduler make_new_thread()
Definition: rx-newthread.hpp:168
new_thread(thread_factory tf)
Definition: rx-newthread.hpp:151
recursion is used by the scheduler to signal to each action whether tail recursion is allowed...
Definition: rx-scheduler.hpp:95
Definition: rx-scheduler.hpp:353
virtual worker create_worker(composite_subscription cs) const
Definition: rx-newthread.hpp:163
#define RXCPP_UNWIND_AUTO(Function)
Definition: rx-util.hpp:875
std::string what(std::exception_ptr ep)
Definition: rx-util.hpp:523
Definition: rx-scheduler.hpp:426
Definition: rx-scheduler.hpp:200