5 #if !defined(RXCPP_RX_OBSERVABLE_HPP) 6 #define RXCPP_RX_OBSERVABLE_HPP 11 #define EXPLICIT_THIS this-> 20 template<
class Subscriber,
class T>
21 struct has_on_subscribe_for
24 template<
class CS,
class CT>
25 static auto check(
int) -> decltype((*(CT*)
nullptr).on_subscribe(*(CS*)
nullptr));
26 template<
class CS,
class CT>
27 static not_void check(...);
29 typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30 static const bool value = std::is_same<detail_result, void>::value;
40 :
public std::enable_shared_from_this<state_type>
42 typedef std::function<void(subscriber<T>)> onsubscribe_type;
46 std::shared_ptr<state_type> state;
55 so.on_subscribe(std::move(o));
59 struct tag_function {};
61 void construct(F&& f, tag_function&&) {
62 state->on_subscribe = std::forward<F>(f);
75 : state(std::make_shared<state_type>())
77 construct(std::forward<SOF>(sof),
82 state->on_subscribe(std::move(o));
85 template<
class Subscriber>
86 typename std::enable_if<is_subscriber<Subscriber>::value,
void>::type
88 state->on_subscribe(o.as_dynamic());
94 return lhs.state == rhs.state;
101 template<
class T,
class Source>
107 template<
bool Selector,
class Default,
class SO>
108 struct resolve_observable;
110 template<
class Default,
class SO>
111 struct resolve_observable<true, Default, SO>
113 typedef typename SO::type type;
114 typedef typename type::value_type value_type;
115 static const bool value =
true;
116 typedef observable<value_type, type> observable_type;
117 template<
class...
AN>
118 static observable_type make(
const Default&,
AN&&... an) {
119 return observable_type(type(std::forward<AN>(an)...));
122 template<
class Default,
class SO>
123 struct resolve_observable<false, Default, SO>
125 static const bool value =
false;
126 typedef Default observable_type;
127 template<
class...
AN>
128 static observable_type make(
const observable_type& that,
const AN&...) {
133 struct resolve_observable<true, void, SO>
135 typedef typename SO::type type;
136 typedef typename type::value_type value_type;
137 static const bool value =
true;
138 typedef observable<value_type, type> observable_type;
139 template<
class...
AN>
140 static observable_type make(
AN&&... an) {
141 return observable_type(type(std::forward<AN>(an)...));
145 struct resolve_observable<false, void, SO>
147 static const bool value =
false;
148 typedef void observable_type;
149 template<
class...
AN>
150 static observable_type make(
const AN&...) {
156 template<
class Selector,
class Default,
template<
class... TN>
class SO, class...
AN>
158 :
public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
168 template<
class T,
class Observable>
171 template<
class Obsvbl,
class... ArgN>
172 static auto blocking_subscribe(
const Obsvbl&
source,
bool do_rethrow, ArgN&&... an)
175 std::condition_variable wake;
176 std::exception_ptr
error;
182 if (!disposed || !wakened) std::terminate();
191 std::atomic_bool disposed;
192 std::atomic_bool wakened;
193 std::atomic_int false_wakes;
194 std::atomic_int true_wakes;
196 auto track = std::make_shared<tracking>();
198 auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
201 auto scbr = make_subscriber<T>(
203 [&](T t){dest.on_next(t);},
204 [&](std::exception_ptr e){
211 [&](){dest.on_completed();}
214 auto cs = scbr.get_subscription();
221 track->disposed =
true;
224 std::unique_lock<std::mutex> guard(lock);
225 source.subscribe(std::move(scbr));
233 while (!track->disposed) {
234 ++track->false_wakes;
239 track->wakened =
true;
240 if (!track->disposed || !track->wakened) std::terminate();
268 template<
class... ArgN>
271 return blocking_subscribe(
source,
false, std::forward<ArgN>(an)...);
293 template<
class... ArgN>
296 return blocking_subscribe(
source,
true, std::forward<ArgN>(an)...);
314 template<
class...
AN>
316 rxu::maybe<T> result;
324 static_assert(
sizeof...(
AN) == 0,
"first() was passed too many arguments.");
342 template<
class...
AN>
344 rxu::maybe<T> result;
346 [&](T v){result.reset(v);});
350 static_assert(
sizeof...(
AN) == 0,
"last() was passed too many arguments.");
367 source.count().as_blocking().subscribe_with_rethrow(
368 [&](
int v){result = v;});
390 return source.sum().as_blocking().last();
411 return source.average().as_blocking().last();
432 return source.max().as_blocking().last();
453 return source.min().as_blocking().last();
459 template<
class SourceOperator,
class Subscriber>
460 struct safe_subscriber
462 safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
466 so->on_subscribe(*o);
469 if (!o->is_subscribed()) {
472 o->on_error(std::current_exception());
477 void operator()(
const rxsc::schedulable&) {
488 class observable<void, void>;
509 template<
class T,
class SourceOperator>
513 static_assert(std::is_same<T, typename SourceOperator::value_type>::value,
"SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
523 template<
class U,
class SO>
526 template<
class U,
class SO>
529 template<
class Subscriber>
530 auto detail_subscribe(Subscriber o)
const 536 static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value,
"the value types in the sequence must match or be convertible");
537 static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value,
"inner must have on_subscribe method that accepts this subscriber ");
541 if (!o.is_subscribed()) {
543 return o.get_subscription();
549 if (rxsc::current_thread::is_schedule_required()) {
551 sc.create_worker(o.get_subscription()).schedule(
subscriber);
554 subscriber.subscribe();
558 return o.get_subscription();
603 template<
class...
AN>
606 static_assert(
sizeof...(
AN) == 0,
"as_dynamic() was passed too many arguments.");
611 template<
class...
AN>
614 static_assert(
sizeof...(
AN) == 0,
"as_blocking() was passed too many arguments.");
624 template<
class OperatorFactory>
625 auto op(OperatorFactory&& of)
const 626 -> decltype(of(*(
const this_type*)
nullptr)) {
633 template<
class ResultType,
class Operator>
634 auto lift(Operator&& op)
const 635 -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
636 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
637 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(
source_operator, std::forward<Operator>(op)));
638 static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
"Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
646 template<
class ResultType,
class Operator>
647 auto lift_if(Operator&& op)
const 648 ->
typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
649 observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
650 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
651 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(
source_operator, std::forward<Operator>(op)));
658 template<
class ResultType,
class Operator>
659 auto lift_if(Operator&&) const
660 -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
661 decltype(rxs::
from<ResultType>())>::type {
662 return rxs::from<ResultType>();
668 template<
class... ArgN>
671 return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
676 template<
class...
AN>
687 template<
class...
AN>
698 template<
class...
AN>
709 template<
class...
AN>
720 template<
class...
AN>
731 template<
class...
AN>
742 template<
class...
AN>
753 template<
class...
AN>
764 template<
class...
AN>
775 template<
class...
AN>
786 template<
class...
AN>
797 template<
class...
AN>
808 template<
class...
AN>
819 template<
class...
AN>
820 auto finally(
AN&&... an)
const 830 template<
class...
AN>
841 template<
class...
AN>
852 template<
class...
AN>
863 template<
class...
AN>
874 template<
class...
AN>
885 template<
class...
AN>
896 template<
class...
AN>
907 template<
class...
AN>
918 template<
class...
AN>
929 template<
class...
AN>
940 template<
class...
AN>
951 template<
class...
AN>
962 template<
class...
AN>
973 template<
class...
AN>
984 template<
class...
AN>
995 template<
class...
AN>
1006 template<
class...
AN>
1017 template<
class...
AN>
1028 template<
class...
AN>
1039 template<
class...
AN>
1050 template<
class...
AN>
1061 template<
class...
AN>
1072 template<
class...
AN>
1083 template<
class...
AN>
1094 template<
class...
AN>
1105 template<
class...
AN>
1117 template<
class...
AN>
1128 template<
class...
AN>
1139 template<
class...
AN>
1150 template<
class...
AN>
1161 template<
class...
AN>
1172 template<
class...
AN>
1183 template<
class...
AN>
1194 template<
class...
AN>
1205 template<
class...
AN>
1216 template<
class...
AN>
1227 template<
class...
AN>
1238 template<
class...
AN>
1249 template<
class...
AN>
1256 static_assert(
sizeof...(
AN) == 0,
"first() was passed too many arguments.");
1261 template<
class...
AN>
1268 static_assert(
sizeof...(
AN) == 0,
"last() was passed too many arguments.");
1273 template<
class...
AN>
1280 static_assert(
sizeof...(
AN) == 0,
"count() was passed too many arguments.");
1285 template<
class...
AN>
1292 static_assert(
sizeof...(
AN) == 0,
"sum() was passed too many arguments.");
1297 template<
class...
AN>
1304 static_assert(
sizeof...(
AN) == 0,
"average() was passed too many arguments.");
1309 template<
class...
AN>
1316 static_assert(
sizeof...(
AN) == 0,
"max() was passed too many arguments.");
1321 template<
class...
AN>
1328 static_assert(
sizeof...(
AN) == 0,
"min() was passed too many arguments.");
1333 template<
class...
AN>
1344 template<
class...
AN>
1355 template<
class...
AN>
1366 template<
class...
AN>
1377 template<
class...
AN>
1388 template<
class...
AN>
1399 template<
class...
AN>
1410 template<
class...
AN>
1421 template<
class...
AN>
1432 template<
class...
AN>
1443 template<
class...
AN>
1454 template<
class...
AN>
1465 template<
class...
AN>
1476 template<
class...
AN>
1486 template<
class T,
class SourceOperator>
1490 template<
class T,
class SourceOperator>
1492 return !(lhs == rhs);
1584 template<
class T,
class OnSubscribe>
1586 -> decltype(rxs::create<T>(std::move(os))) {
1587 return rxs::create<T>(std::move(os));
1599 template<
class T,
class Coordination>
1601 -> decltype(rxs::range<T>(
first,
last, step, std::move(cn))) {
1602 return rxs::range<T>(
first,
last, step, std::move(cn));
1606 template<
class T,
class Coordination>
1608 -> decltype(rxs::range<T>(
first,
last, std::move(cn))) {
1609 return rxs::range<T>(
first,
last, std::move(cn));
1613 template<
class T,
class Coordination>
1615 -> decltype(rxs::range<T>(
first, std::move(cn))) {
1616 return rxs::range<T>(
first, std::move(cn));
1623 -> decltype(rxs::never<T>()) {
1624 return rxs::never<T>();
1629 template<
class ObservableFactory>
1637 template<
class...
AN>
1638 static auto interval(rxsc::scheduler::clock_type::duration period,
AN**...)
1641 static_assert(
sizeof...(
AN) == 0,
"interval(period) was passed too many arguments.");
1645 template<
class Coordination>
1646 static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1652 template<
class...
AN>
1653 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period,
AN**...)
1656 static_assert(
sizeof...(
AN) == 0,
"interval(initial, period) was passed too many arguments.");
1660 template<
class Coordination>
1661 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1662 -> decltype(
rxs::interval(initial, period, std::move(cn))) {
1668 template<
class...
AN>
1669 static auto timer(rxsc::scheduler::clock_type::time_point at,
AN**...)
1672 static_assert(
sizeof...(
AN) == 0,
"timer(at) was passed too many arguments.");
1676 template<
class...
AN>
1677 static auto timer(rxsc::scheduler::clock_type::duration after,
AN**...)
1680 static_assert(
sizeof...(
AN) == 0,
"timer(after) was passed too many arguments.");
1684 template<
class Coordination>
1685 static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1686 -> decltype(
rxs::timer(when, std::move(cn))) {
1691 template<
class Coordination>
1692 static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1693 -> decltype(
rxs::timer(when, std::move(cn))) {
1699 template<
class Collection>
1706 template<
class Collection,
class Coordination>
1707 static auto iterate(Collection c, Coordination cn)
1708 -> decltype(
rxs::iterate(std::move(c), std::move(cn))) {
1716 -> decltype( rxs::from<T>()) {
1717 return rxs::from<T>();
1721 template<
class T,
class Coordination>
1723 ->
typename std::enable_if<is_coordination<Coordination>::value,
1724 decltype( rxs::from<T>(std::move(cn)))>::type {
1725 return rxs::from<T>(std::move(cn));
1729 template<
class Value0,
class... ValueN>
1730 static auto from(Value0 v0, ValueN... vn)
1731 ->
typename std::enable_if<!is_coordination<Value0>::value,
1732 decltype(
rxs::from(v0, vn...))>::type {
1737 template<
class Coordination,
class Value0,
class... ValueN>
1738 static auto from(Coordination cn, Value0 v0, ValueN... vn)
1739 ->
typename std::enable_if<is_coordination<Coordination>::value,
1740 decltype(
rxs::from(std::move(cn), v0, vn...))>::type {
1741 return rxs::from(std::move(cn), v0, vn...);
1753 template<
class T,
class Coordination>
1754 static auto just(T v, Coordination cn)
1755 -> decltype(
rxs::just(std::move(v), std::move(cn))) {
1756 return rxs::just(std::move(v), std::move(cn));
1761 template<
class Observable,
class Value0,
class... ValueN>
1763 -> decltype(
rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1764 return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1771 -> decltype(from<T>()) {
1776 template<
class T,
class Coordination>
1778 -> decltype(from<T>(std::move(cn))) {
1779 return from<T>(std::move(cn));
1784 template<
class T,
class Exception>
1786 -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1787 return rxs::error<T>(std::forward<Exception>(e));
1791 template<
class T,
class Exception,
class Coordination>
1792 static auto error(Exception&& e, Coordination cn)
1793 -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1794 return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1799 template<
class ResourceFactory,
class ObservableFactory>
1800 static auto scope(ResourceFactory rf, ObservableFactory of)
1801 -> decltype(
rxs::scope(std::move(rf), std::move(of))) {
1802 return rxs::scope(std::move(rf), std::move(of));
1812 template<
class T,
class SourceOperator,
class OperatorFactory>
1814 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1815 return source.op(std::forward<OperatorFactory>(of));
1822 template<
class T,
class SourceOperator,
class OperatorFactory>
1824 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1825 return source.op(std::forward<OperatorFactory>(of));
auto distinct_until_changed(AN &&... an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:908
Definition: rx-operators.hpp:126
auto delay(AN &&... an) const
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-observable.hpp:886
auto skip_last(AN... an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1378
Definition: rx-operators.hpp:366
auto with_latest_from(AN... an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1106
auto time_interval(AN &&... an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:787
auto concat_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1095
Definition: rx-operators.hpp:248
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1274
Definition: rx-operators.hpp:380
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-observable.hpp:1322
Definition: rx-operators.hpp:143
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto merge_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1062
auto retry(AN... an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1455
Definition: rx-observable.hpp:157
auto skip_until(AN... an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1389
Definition: rx-operators.hpp:387
Definition: rx-operators.hpp:458
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1823
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-observable.hpp:1585
Definition: rx-all.hpp:26
auto skip(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1356
Definition: rx-predef.hpp:302
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1607
auto element_at(AN &&... an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:919
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:585
Definition: rx-operators.hpp:329
auto any(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:699
auto reduce(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1228
source_operator_type source_operator
Definition: rx-observable.hpp:519
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1722
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void **>::type=0)
Definition: rx-observable.hpp:74
auto timeout(AN &&... an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:798
T max() const
Definition: rx-observable.hpp:431
auto take_while(AN &&... an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1433
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:444
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:157
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1298
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:612
Definition: rx-operators.hpp:282
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1813
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:246
auto window(AN &&... an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:930
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1184
auto subscribe(ArgN &&... an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:669
Definition: rx-operators.hpp:289
Definition: rx-operators.hpp:296
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
Definition: rx-operators.hpp:352
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto combine_latest(AN... an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1118
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1707
Definition: rx-operators.hpp:421
observable(const source_operator_type &o)
Definition: rx-observable.hpp:574
Definition: rx-operators.hpp:500
Definition: rx-operators.hpp:394
Definition: rx-operators.hpp:472
Definition: rx-operators.hpp:331
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1700
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:562
auto tap(AN &&... an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:776
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:269
auto zip(AN &&... an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1129
Definition: rx-operators.hpp:521
auto scan(AN... an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1334
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:294
Definition: rx-operators.hpp:345
auto start_with(AN... an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1466
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:518
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1638
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:710
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1250
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-observable.hpp:1310
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
auto switch_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable...
Definition: rx-observable.hpp:743
auto filter(AN &&... an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:732
~blocking_observable()
Definition: rx-observable.hpp:248
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1747
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1730
Definition: rx-operators.hpp:373
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1630
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item...
Definition: rx-observable.hpp:721
Definition: rx-operators.hpp:316
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1762
Definition: rx-sources.hpp:15
Definition: rx-operators.hpp:451
Definition: rx-operators.hpp:227
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:404
auto debounce(AN &&... an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:875
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1162
Definition: rx-observable.hpp:36
auto group_by(AN &&... an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1140
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1669
auto take_last(AN &&... an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1411
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1286
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1685
~observable()
Definition: rx-observable.hpp:566
auto map(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:853
auto sample_with_time(AN &&... an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1345
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:430
auto pairwise(AN... an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1477
auto replay(AN &&... an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-observable.hpp:1195
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
Definition: rx-operators.hpp:268
auto sequence_equal(AN... an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:765
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1262
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:437
auto merge_delay_error(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
Definition: rx-observable.hpp:1029
Definition: rx-operators.hpp:359
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1715
Definition: rx-operators.hpp:479
Definition: rx-operators.hpp:136
auto repeat(AN... an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1444
auto take(AN... an) const
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-observable.hpp:1400
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:192
auto subscribe_on(AN &&... an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1206
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
auto distinct(AN &&... an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:897
auto skip_while(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1367
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1792
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1593
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1770
Definition: rx-operators.hpp:486
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto merge(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-observable.hpp:1018
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:220
Definition: rx-predef.hpp:270
auto buffer(AN &&... an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:974
Definition: rx-operators.hpp:185
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1614
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1653
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1785
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1661
observable_type source
Definition: rx-observable.hpp:247
auto buffer_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:996
Definition: rx-operators.hpp:465
Definition: rx-operators.hpp:261
Definition: rx-operators.hpp:275
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto switch_on_error(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:842
Definition: rx-operators.hpp:493
auto switch_on_next(AN &&... an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:1007
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
auto flat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1051
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:604
auto window_with_time(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:941
auto all(AN &&... an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:677
auto amb(AN... an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1040
Definition: rx-operators.hpp:150
auto window_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:952
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1692
auto take_until(AN &&... an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1422
auto transform(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:864
Definition: rx-operators.hpp:338
int count() const
Definition: rx-observable.hpp:365
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
void unsubscribe() const
Definition: rx-subscription.hpp:170
auto ignore_elements(AN &&... an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1151
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1738
blocking_observable(observable_type s)
Definition: rx-observable.hpp:251
T sum() const
Definition: rx-observable.hpp:389
Definition: rx-operators.hpp:164
auto timestamp(AN &&... an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:809
Definition: rx-operators.hpp:323
Definition: rx-operators.hpp:507
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-observable.hpp:688
T min() const
Definition: rx-observable.hpp:452
auto buffer_with_time(AN &&... an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:985
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
Definition: rx-predef.hpp:128
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1800
Definition: rx-operators.hpp:423
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes...
Definition: rx-observable.hpp:754
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1646
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1600
double average() const
Definition: rx-observable.hpp:410
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:315
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1754
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
auto observe_on(AN &&... an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1217
auto accumulate(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1239
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:590
Definition: rx-predef.hpp:115
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
Definition: rx-sources.hpp:17
auto window_toggle(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:963
auto on_error_resume_next(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:831
observable()
Definition: rx-observable.hpp:570
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:343
static auto never() -> decltype(rxs::never< T >())
Returns an observable that never sends any items or notifications to observer.
Definition: rx-observable.hpp:1622
auto concat(AN... an) const
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-observable.hpp:1073
Definition: rx-predef.hpp:126
auto concat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1084
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
auto publish(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1173
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1777
Definition: rx-operators.hpp:415
Definition: rx-operators.hpp:408
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
observable(source_operator_type &&o)
Definition: rx-observable.hpp:578
Definition: rx-operators.hpp:401
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1677
Definition: rx-operators.hpp:514