5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP) 6 #define RXCPP_RX_SUBSCRIPTION_HPP 15 struct is_unsubscribe_function
19 static auto check(
int) -> decltype((*(CF*)
nullptr)());
21 static not_void check(...);
23 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)),
void>::value;
34 static typename C::subscription_tag* check(
int);
36 static void check(...);
41 template<
class Unsubscribe>
45 unsubscribe_call_type unsubscribe_call;
51 : unsubscribe_call(o.unsubscribe_call)
55 : unsubscribe_call(std::move(o.unsubscribe_call))
59 : unsubscribe_call(std::move(s))
69 class base_subscription_state :
public std::enable_shared_from_this<base_subscription_state>
71 base_subscription_state();
74 explicit base_subscription_state(
bool initial)
75 : issubscribed(initial)
78 virtual ~base_subscription_state() {}
81 std::atomic<bool> issubscribed;
88 struct subscription_state :
public base_subscription_state
91 subscription_state(inner_t i)
92 : base_subscription_state(
true)
97 if (issubscribed.exchange(
false)) {
107 std::shared_ptr<base_subscription_state>
state;
123 :
state(std::make_shared<base_subscription_state>(false))
131 :
state(std::make_shared<subscription_state<U>>(std::move(u)))
168 return state->issubscribed;
174 auto keepAlive =
state;
175 state->unsubscribe();
193 return !(lhs == rhs);
203 ->
typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
207 template<
class Unsubscribe>
209 ->
typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
214 class composite_subscription;
218 struct tag_composite_subscription_empty {};
220 class composite_subscription_inner
224 struct composite_subscription_state :
public std::enable_shared_from_this<composite_subscription_state>
226 std::set<subscription> subscriptions;
228 std::atomic<bool> issubscribed;
230 ~composite_subscription_state()
232 std::unique_lock<decltype(lock)> guard(lock);
233 subscriptions.clear();
236 composite_subscription_state()
240 composite_subscription_state(tag_composite_subscription_empty)
241 : issubscribed(false)
245 inline weak_subscription add(subscription s) {
248 }
else if (s.is_subscribed()) {
249 std::unique_lock<decltype(lock)> guard(lock);
250 subscriptions.insert(s);
255 inline void remove(weak_subscription w) {
256 if (issubscribed && !w.expired()) {
258 std::unique_lock<decltype(lock)> guard(lock);
259 subscriptions.erase(std::move(s));
263 inline void clear() {
265 std::unique_lock<decltype(lock)> guard(lock);
267 std::set<subscription> v(std::move(subscriptions));
269 std::for_each(v.begin(), v.end(),
270 [](
const subscription& s) {
275 inline void unsubscribe() {
276 if (issubscribed.exchange(
false)) {
277 std::unique_lock<decltype(lock)> guard(lock);
279 std::set<subscription> v(std::move(subscriptions));
281 std::for_each(v.begin(), v.end(),
282 [](
const subscription& s) {
289 typedef std::shared_ptr<composite_subscription_state> shared_state_type;
292 mutable shared_state_type state;
295 composite_subscription_inner()
296 : state(std::make_shared<composite_subscription_state>())
299 composite_subscription_inner(tag_composite_subscription_empty et)
300 : state(std::make_shared<composite_subscription_state>(et))
304 composite_subscription_inner(
const composite_subscription_inner& o)
311 composite_subscription_inner(composite_subscription_inner&& o)
312 : state(std::move(o.state))
319 composite_subscription_inner& operator=(composite_subscription_inner o)
321 state = std::move(o.state);
328 inline weak_subscription add(subscription s)
const {
332 return state->add(std::move(s));
334 inline void remove(weak_subscription w)
const {
338 state->remove(std::move(w));
340 inline void clear()
const {
346 inline void unsubscribe() {
350 state->unsubscribe();
354 inline composite_subscription shared_empty();
365 :
protected detail::composite_subscription_inner
368 typedef detail::composite_subscription_inner inner_type;
392 : inner_type(std::move(o))
399 inner_type::operator=(std::move(o));
405 return detail::shared_empty();
411 using inner_type::clear;
414 if (s == static_cast<const subscription&>(*
this)) {
421 auto w = inner_type::add(std::move(s));
428 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
432 inline void remove(weak_subscription w)
const {
435 inner_type::remove(w);
441 return static_cast<const subscription&
>(lhs) < static_cast<const subscription&>(rhs);
444 return static_cast<const subscription&
>(lhs) == static_cast<const subscription&>(rhs);
447 return !(lhs == rhs);
452 inline composite_subscription shared_empty() {
453 static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
467 ,
value(std::make_shared<rxu::detail::maybe<T>>())
473 ,
value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
475 auto localValue =
value;
484 return value.get()->get();
498 ->
typename std::enable_if<detail::is_unsubscribe_function<F>::value,
weak_subscription>::type {
513 std::shared_ptr<rxu::detail::maybe<T>>
value;
composite_subscription lifetime
Definition: rx-subscription.hpp:512
static subscription lock(weak_state_type w)
Definition: rx-subscription.hpp:181
subscription(subscription &&o)
Definition: rx-subscription.hpp:153
Definition: rx-all.hpp:26
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:427
static_subscription(const static_subscription &o)
Definition: rx-subscription.hpp:50
bool is_subscribed() const
Definition: rx-subscription.hpp:164
Definition: rx-subscription.hpp:460
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
bool operator<(const subscription &lhs, const subscription &rhs)
Definition: rx-subscription.hpp:186
Definition: rx-subscription.hpp:28
auto make_subscription() -> subscription
Definition: rx-subscription.hpp:197
subscription::weak_state_type weak_subscription
Definition: rx-subscription.hpp:370
void unsubscribe() const
Definition: rx-subscription.hpp:507
friend bool operator<(const subscription &, const subscription &)
Definition: rx-subscription.hpp:186
friend bool operator==(const subscription &, const subscription &)
Definition: rx-subscription.hpp:189
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:413
tag_subscription subscription_tag
Definition: rx-subscription.hpp:29
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
subscription()
Definition: rx-subscription.hpp:122
subscription & operator=(subscription o)
Definition: rx-subscription.hpp:160
resource()
Definition: rx-subscription.hpp:465
std::shared_ptr< base_subscription_state > state
Definition: rx-subscription.hpp:107
Definition: rx-subscription.hpp:31
composite_subscription & get_subscription()
Definition: rx-subscription.hpp:486
composite_subscription(composite_subscription &&o)
Definition: rx-subscription.hpp:391
subscription(const subscription &o)
Definition: rx-subscription.hpp:146
weak_subscription add(subscription s) const
Definition: rx-subscription.hpp:493
static composite_subscription empty()
Definition: rx-subscription.hpp:404
subscription(U u, typename std::enable_if<!is_subscription< U >::value, void **>::type=nullptr)
Definition: rx-subscription.hpp:130
subscription(U u, typename std::enable_if<!std::is_same< subscription, U >::value &&is_subscription< U >::value, void **>::type=nullptr)
Definition: rx-subscription.hpp:138
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-subscription.hpp:42
composite_subscription & operator=(composite_subscription o)
Definition: rx-subscription.hpp:397
static_subscription(static_subscription &&o)
Definition: rx-subscription.hpp:54
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
static_subscription(unsubscribe_call_type s)
Definition: rx-subscription.hpp:58
std::weak_ptr< base_subscription_state > weak_state_type
Definition: rx-subscription.hpp:84
bool is_subscribed() const
Definition: rx-subscription.hpp:164
composite_subscription(const composite_subscription &o)
Definition: rx-subscription.hpp:386
composite_subscription::weak_subscription weak_subscription
Definition: rx-subscription.hpp:463
void unsubscribe() const
Definition: rx-subscription.hpp:170
Definition: rx-subscription.hpp:29
bool is_subscribed() const
Definition: rx-subscription.hpp:490
void unsubscribe() const
Definition: rx-subscription.hpp:62
auto add(F f) const -> typename std::enable_if< detail::is_unsubscribe_function< F >::value, weak_subscription >::type
Definition: rx-subscription.hpp:497
weak_state_type get_weak()
Definition: rx-subscription.hpp:178
void unsubscribe() const
Definition: rx-subscription.hpp:170
composite_subscription(detail::tag_composite_subscription_empty et)
Definition: rx-subscription.hpp:372
composite_subscription()
Definition: rx-subscription.hpp:380
Definition: rx-subscription.hpp:67
void remove(weak_subscription w) const
Definition: rx-subscription.hpp:432
std::shared_ptr< rxu::detail::maybe< T > > value
Definition: rx-subscription.hpp:513
void clear() const
Definition: rx-subscription.hpp:504
resource(T t, composite_subscription cs=composite_subscription())
Definition: rx-subscription.hpp:471
static const bool value
Definition: rx-subscription.hpp:38