RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
rx-observable.hpp
Go to the documentation of this file.
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23  struct not_void {};
24  template<class CS, class CT>
25  static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26  template<class CS, class CT>
27  static not_void check(...);
28 
29  typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30  static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
37  : public rxs::source_base<T>
38 {
39  struct state_type
40  : public std::enable_shared_from_this<state_type>
41  {
42  typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44  onsubscribe_type on_subscribe;
45  };
46  std::shared_ptr<state_type> state;
47 
48  template<class U>
49  friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51  template<class SO>
52  void construct(SO&& source, rxs::tag_source&&) {
53  rxu::decay_t<SO> so = std::forward<SO>(source);
54  state->on_subscribe = [so](subscriber<T> o) mutable {
55  so.on_subscribe(std::move(o));
56  };
57  }
58 
59  struct tag_function {};
60  template<class F>
61  void construct(F&& f, tag_function&&) {
62  state->on_subscribe = std::forward<F>(f);
63  }
64 
65 public:
66 
68 
70  {
71  }
72 
73  template<class SOF>
74  explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75  : state(std::make_shared<state_type>())
76  {
77  construct(std::forward<SOF>(sof),
78  typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79  }
80 
81  void on_subscribe(subscriber<T> o) const {
82  state->on_subscribe(std::move(o));
83  }
84 
85  template<class Subscriber>
86  typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
87  on_subscribe(Subscriber o) const {
88  state->on_subscribe(o.as_dynamic());
89  }
90 };
91 
92 template<class T>
93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94  return lhs.state == rhs.state;
95 }
96 template<class T>
97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98  return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
103  return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113  typedef typename SO::type type;
114  typedef typename type::value_type value_type;
115  static const bool value = true;
116  typedef observable<value_type, type> observable_type;
117  template<class... AN>
118  static observable_type make(const Default&, AN&&... an) {
119  return observable_type(type(std::forward<AN>(an)...));
120  }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125  static const bool value = false;
126  typedef Default observable_type;
127  template<class... AN>
128  static observable_type make(const observable_type& that, const AN&...) {
129  return that;
130  }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135  typedef typename SO::type type;
136  typedef typename type::value_type value_type;
137  static const bool value = true;
138  typedef observable<value_type, type> observable_type;
139  template<class... AN>
140  static observable_type make(AN&&... an) {
141  return observable_type(type(std::forward<AN>(an)...));
142  }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147  static const bool value = false;
148  typedef void observable_type;
149  template<class... AN>
150  static observable_type make(const AN&...) {
151  }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
158  : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
168 template<class T, class Observable>
170 {
171  template<class Obsvbl, class... ArgN>
172  static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173  -> void {
174  std::mutex lock;
175  std::condition_variable wake;
176  std::exception_ptr error;
177 
178  struct tracking
179  {
180  ~tracking()
181  {
182  if (!disposed || !wakened) std::terminate();
183  }
184  tracking()
185  {
186  disposed = false;
187  wakened = false;
188  false_wakes = 0;
189  true_wakes = 0;
190  }
191  std::atomic_bool disposed;
192  std::atomic_bool wakened;
193  std::atomic_int false_wakes;
194  std::atomic_int true_wakes;
195  };
196  auto track = std::make_shared<tracking>();
197 
198  auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
199 
200  // keep any error to rethrow at the end.
201  auto scbr = make_subscriber<T>(
202  dest,
203  [&](T t){dest.on_next(t);},
204  [&](std::exception_ptr e){
205  if (do_rethrow) {
206  error = e;
207  } else {
208  dest.on_error(e);
209  }
210  },
211  [&](){dest.on_completed();}
212  );
213 
214  auto cs = scbr.get_subscription();
215  cs.add(
216  [&, track](){
217  // OSX geting invalid x86 op if notify_one is after the disposed = true
218  // presumably because the condition_variable may already have been awakened
219  // and is now sitting in a while loop on disposed
220  wake.notify_one();
221  track->disposed = true;
222  });
223 
224  std::unique_lock<std::mutex> guard(lock);
225  source.subscribe(std::move(scbr));
226 
227  wake.wait(guard,
228  [&, track](){
229  // this is really not good.
230  // false wakeups were never followed by true wakeups so..
231 
232  // anyways this gets triggered before disposed is set now so wait.
233  while (!track->disposed) {
234  ++track->false_wakes;
235  }
236  ++track->true_wakes;
237  return true;
238  });
239  track->wakened = true;
240  if (!track->disposed || !track->wakened) std::terminate();
241 
242  if (error) {std::rethrow_exception(error);}
243  }
244 
245 public:
249  {
250  }
252 
268  template<class... ArgN>
269  auto subscribe(ArgN&&... an) const
270  -> void {
271  return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
272  }
273 
293  template<class... ArgN>
294  auto subscribe_with_rethrow(ArgN&&... an) const
295  -> void {
296  return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
297  }
298 
314  template<class... AN>
315  auto first(AN**...) -> delayed_type_t<T, AN...> const {
316  rxu::maybe<T> result;
319  cs,
320  [&](T v){result.reset(v); cs.unsubscribe();});
321  if (result.empty())
322  throw rxcpp::empty_error("first() requires a stream with at least one value");
323  return result.get();
324  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
325  }
326 
342  template<class... AN>
343  auto last(AN**...) -> delayed_type_t<T, AN...> const {
344  rxu::maybe<T> result;
346  [&](T v){result.reset(v);});
347  if (result.empty())
348  throw rxcpp::empty_error("last() requires a stream with at least one value");
349  return result.get();
350  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
351  }
352 
365  int count() const {
366  int result = 0;
367  source.count().as_blocking().subscribe_with_rethrow(
368  [&](int v){result = v;});
369  return result;
370  }
371 
389  T sum() const {
390  return source.sum().as_blocking().last();
391  }
392 
410  double average() const {
411  return source.average().as_blocking().last();
412  }
413 
431  T max() const {
432  return source.max().as_blocking().last();
433  }
434 
452  T min() const {
453  return source.min().as_blocking().last();
454  }
455 };
456 
457 namespace detail {
458 
459 template<class SourceOperator, class Subscriber>
460 struct safe_subscriber
461 {
462  safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
463 
464  void subscribe() {
465  try {
466  so->on_subscribe(*o);
467  }
468  catch(...) {
469  if (!o->is_subscribed()) {
470  throw;
471  }
472  o->on_error(std::current_exception());
473  o->unsubscribe();
474  }
475  }
476 
477  void operator()(const rxsc::schedulable&) {
478  subscribe();
479  }
480 
481  SourceOperator* so;
482  Subscriber* o;
483 };
484 
485 }
486 
487 template<>
488 class observable<void, void>;
489 
509 template<class T, class SourceOperator>
511  : public observable_base<T>
512 {
513  static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
514 
516 
517 public:
520 
521 private:
522 
523  template<class U, class SO>
524  friend class observable;
525 
526  template<class U, class SO>
527  friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
528 
529  template<class Subscriber>
530  auto detail_subscribe(Subscriber o) const
532 
533  typedef rxu::decay_t<Subscriber> subscriber_type;
534 
535  static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
536  static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
537  static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
538 
539  trace_activity().subscribe_enter(*this, o);
540 
541  if (!o.is_subscribed()) {
542  trace_activity().subscribe_return(*this);
543  return o.get_subscription();
544  }
545 
546  detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
547 
548  // make sure to let current_thread take ownership of the thread as early as possible.
549  if (rxsc::current_thread::is_schedule_required()) {
550  const auto& sc = rxsc::make_current_thread();
551  sc.create_worker(o.get_subscription()).schedule(subscriber);
552  } else {
553  // current_thread already owns this thread.
554  subscriber.subscribe();
555  }
556 
557  trace_activity().subscribe_return(*this);
558  return o.get_subscription();
559  }
560 
561 public:
562  typedef T value_type;
563 
564  static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
565 
567  {
568  }
569 
571  {
572  }
573 
574  explicit observable(const source_operator_type& o)
575  : source_operator(o)
576  {
577  }
579  : source_operator(std::move(o))
580  {
581  }
582 
584  template<class SO>
587  {}
589  template<class SO>
591  : source_operator(std::move(o.source_operator))
592  {}
593 
594 #if 0
595  template<class I>
596  void on_subscribe(observer<T, I> o) const {
597  source_operator.on_subscribe(o);
598  }
599 #endif
600 
603  template<class... AN>
605  return *this;
606  static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
607  }
608 
611  template<class... AN>
613  return blocking_observable<T, this_type>(*this);
614  static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
615  }
616 
618 
624  template<class OperatorFactory>
625  auto op(OperatorFactory&& of) const
626  -> decltype(of(*(const this_type*)nullptr)) {
627  return of(*this);
628  static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
629  }
630 
633  template<class ResultType, class Operator>
634  auto lift(Operator&& op) const
635  -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
636  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
637  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
638  static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
639  }
640 
646  template<class ResultType, class Operator>
647  auto lift_if(Operator&& op) const
648  -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
649  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
650  return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
651  rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
652  }
658  template<class ResultType, class Operator>
659  auto lift_if(Operator&&) const
660  -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
661  decltype(rxs::from<ResultType>())>::type {
662  return rxs::from<ResultType>();
663  }
665 
668  template<class... ArgN>
669  auto subscribe(ArgN&&... an) const
671  return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
672  }
673 
676  template<class... AN>
677  auto all(AN&&... an) const
679  -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
681  {
682  return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
683  }
684 
687  template<class... AN>
688  auto is_empty(AN&&... an) const
690  -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
692  {
693  return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
694  }
695 
698  template<class... AN>
699  auto any(AN&&... an) const
701  -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
703  {
704  return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
705  }
706 
709  template<class... AN>
710  auto exists(AN&&... an) const
712  -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
714  {
715  return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
716  }
717 
720  template<class... AN>
721  auto contains(AN&&... an) const
723  -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
725  {
726  return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
727  }
728 
731  template<class... AN>
732  auto filter(AN&&... an) const
734  -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
736  {
737  return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
738  }
739 
742  template<class... AN>
743  auto switch_if_empty(AN&&... an) const
745  -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
747  {
748  return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
749  }
750 
753  template<class... AN>
754  auto default_if_empty(AN&&... an) const
756  -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
758  {
759  return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
760  }
761 
764  template<class... AN>
765  auto sequence_equal(AN... an) const
767  -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
769  {
770  return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
771  }
772 
775  template<class... AN>
776  auto tap(AN&&... an) const
778  -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
780  {
781  return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
782  }
783 
786  template<class... AN>
787  auto time_interval(AN&&... an) const
789  -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
791  {
792  return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
793  }
794 
797  template<class... AN>
798  auto timeout(AN&&... an) const
800  -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
802  {
803  return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
804  }
805 
808  template<class... AN>
809  auto timestamp(AN&&... an) const
811  -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
813  {
814  return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
815  }
816 
819  template<class... AN>
820  auto finally(AN&&... an) const
822  -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
824  {
825  return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
826  }
827 
830  template<class... AN>
831  auto on_error_resume_next(AN&&... an) const
833  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
835  {
836  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
837  }
838 
841  template<class... AN>
842  auto switch_on_error(AN&&... an) const
844  -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
846  {
847  return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
848  }
849 
852  template<class... AN>
853  auto map(AN&&... an) const
855  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
857  {
858  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
859  }
860 
863  template<class... AN>
864  auto transform(AN&&... an) const
866  -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
868  {
869  return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
870  }
871 
874  template<class... AN>
875  auto debounce(AN&&... an) const
877  -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
879  {
880  return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
881  }
882 
885  template<class... AN>
886  auto delay(AN&&... an) const
888  -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
890  {
891  return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
892  }
893 
896  template<class... AN>
897  auto distinct(AN&&... an) const
899  -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
901  {
902  return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
903  }
904 
907  template<class... AN>
908  auto distinct_until_changed(AN&&... an) const
910  -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
912  {
913  return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
914  }
915 
918  template<class... AN>
919  auto element_at(AN&&... an) const
921  -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
923  {
924  return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
925  }
926 
929  template<class... AN>
930  auto window(AN&&... an) const
932  -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
934  {
935  return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
936  }
937 
940  template<class... AN>
941  auto window_with_time(AN&&... an) const
943  -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
945  {
946  return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
947  }
948 
951  template<class... AN>
952  auto window_with_time_or_count(AN&&... an) const
954  -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
956  {
957  return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
958  }
959 
962  template<class... AN>
963  auto window_toggle(AN&&... an) const
965  -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
967  {
968  return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
969  }
970 
973  template<class... AN>
974  auto buffer(AN&&... an) const
976  -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
978  {
979  return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
980  }
981 
984  template<class... AN>
985  auto buffer_with_time(AN&&... an) const
987  -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
989  {
990  return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
991  }
992 
995  template<class... AN>
996  auto buffer_with_time_or_count(AN&&... an) const
998  -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1000  {
1001  return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
1002  }
1003 
1006  template<class... AN>
1007  auto switch_on_next(AN&&... an) const
1009  -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1011  {
1012  return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
1013  }
1014 
1017  template<class... AN>
1018  auto merge(AN... an) const
1020  -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1022  {
1023  return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
1024  }
1025 
1028  template<class... AN>
1029  auto merge_delay_error(AN... an) const
1031  -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1033  {
1034  return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
1035  }
1036 
1039  template<class... AN>
1040  auto amb(AN... an) const
1042  -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1044  {
1045  return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1046  }
1047 
1050  template<class... AN>
1051  auto flat_map(AN&&... an) const
1053  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1055  {
1056  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1057  }
1058 
1061  template<class... AN>
1062  auto merge_transform(AN&&... an) const
1064  -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1066  {
1067  return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1068  }
1069 
1072  template<class... AN>
1073  auto concat(AN... an) const
1075  -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1077  {
1078  return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1079  }
1080 
1083  template<class... AN>
1084  auto concat_map(AN&&... an) const
1086  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1088  {
1089  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1090  }
1091 
1094  template<class... AN>
1095  auto concat_transform(AN&&... an) const
1097  -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1099  {
1100  return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1101  }
1102 
1105  template<class... AN>
1106  auto with_latest_from(AN... an) const
1108  -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1110  {
1111  return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1112  }
1113 
1114 
1117  template<class... AN>
1118  auto combine_latest(AN... an) const
1120  -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1122  {
1123  return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1124  }
1125 
1128  template<class... AN>
1129  auto zip(AN&&... an) const
1131  -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1133  {
1134  return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1135  }
1136 
1139  template<class... AN>
1140  inline auto group_by(AN&&... an) const
1142  -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1144  {
1145  return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1146  }
1147 
1150  template<class... AN>
1151  auto ignore_elements(AN&&... an) const
1153  -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1155  {
1156  return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1157  }
1158 
1161  template<class... AN>
1162  auto multicast(AN&&... an) const
1164  -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1166  {
1167  return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1168  }
1169 
1172  template<class... AN>
1173  auto publish(AN&&... an) const
1175  -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1177  {
1178  return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1179  }
1180 
1183  template<class... AN>
1184  auto publish_synchronized(AN&&... an) const
1186  -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1188  {
1189  return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1190  }
1191 
1194  template<class... AN>
1195  auto replay(AN&&... an) const
1197  -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1199  {
1200  return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1201  }
1202 
1205  template<class... AN>
1206  auto subscribe_on(AN&&... an) const
1208  -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1210  {
1211  return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1212  }
1213 
1216  template<class... AN>
1217  auto observe_on(AN&&... an) const
1219  -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1221  {
1222  return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1223  }
1224 
1227  template<class... AN>
1228  auto reduce(AN&&... an) const
1230  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1232  {
1233  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1234  }
1235 
1238  template<class... AN>
1239  auto accumulate(AN&&... an) const
1241  -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1243  {
1244  return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1245  }
1246 
1249  template<class... AN>
1250  auto first(AN**...) const
1254  {
1256  static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1257  }
1258 
1261  template<class... AN>
1262  auto last(AN**...) const
1266  {
1268  static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1269  }
1270 
1273  template<class... AN>
1274  auto count(AN**...) const
1278  {
1280  static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1281  }
1282 
1285  template<class... AN>
1286  auto sum(AN**...) const
1290  {
1292  static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1293  }
1294 
1297  template<class... AN>
1298  auto average(AN**...) const
1302  {
1304  static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1305  }
1306 
1309  template<class... AN>
1310  auto max(AN**...) const
1314  {
1316  static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1317  }
1318 
1321  template<class... AN>
1322  auto min(AN**...) const
1326  {
1328  static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1329  }
1330 
1333  template<class... AN>
1334  auto scan(AN... an) const
1336  -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1338  {
1339  return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1340  }
1341 
1344  template<class... AN>
1345  auto sample_with_time(AN&&... an) const
1347  -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1349  {
1350  return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1351  }
1352 
1355  template<class... AN>
1356  auto skip(AN... an) const
1358  -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1360  {
1361  return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1362  }
1363 
1366  template<class... AN>
1367  auto skip_while(AN... an) const
1369  -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1371  {
1372  return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
1373  }
1374 
1377  template<class... AN>
1378  auto skip_last(AN... an) const
1380  -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1382  {
1383  return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1384  }
1385 
1388  template<class... AN>
1389  auto skip_until(AN... an) const
1391  -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1393  {
1394  return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1395  }
1396 
1399  template<class... AN>
1400  auto take(AN... an) const
1402  -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1404  {
1405  return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1406  }
1407 
1410  template<class... AN>
1411  auto take_last(AN&&... an) const
1413  -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1415  {
1416  return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1417  }
1418 
1421  template<class... AN>
1422  auto take_until(AN&&... an) const
1424  -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1426  {
1427  return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1428  }
1429 
1432  template<class... AN>
1433  auto take_while(AN&&... an) const
1435  -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1437  {
1438  return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1439  }
1440 
1443  template<class... AN>
1444  auto repeat(AN... an) const
1446  -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1448  {
1449  return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1450  }
1451 
1454  template<class... AN>
1455  auto retry(AN... an) const
1457  -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1459  {
1460  return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1461  }
1462 
1465  template<class... AN>
1466  auto start_with(AN... an) const
1468  -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1470  {
1471  return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1472  }
1473 
1476  template<class... AN>
1477  auto pairwise(AN... an) const
1479  -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1481  {
1482  return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1483  }
1484 };
1485 
1486 template<class T, class SourceOperator>
1488  return lhs.source_operator == rhs.source_operator;
1489 }
1490 template<class T, class SourceOperator>
1492  return !(lhs == rhs);
1493 }
1494 
1577 template<>
1578 class observable<void, void>
1579 {
1580  ~observable();
1581 public:
1584  template<class T, class OnSubscribe>
1585  static auto create(OnSubscribe os)
1586  -> decltype(rxs::create<T>(std::move(os))) {
1587  return rxs::create<T>(std::move(os));
1588  }
1589 
1592  template<class T>
1593  static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1594  -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1595  return rxs::range<T>(first, last, step, identity_current_thread());
1596  }
1599  template<class T, class Coordination>
1600  static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1601  -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1602  return rxs::range<T>(first, last, step, std::move(cn));
1603  }
1606  template<class T, class Coordination>
1607  static auto range(T first, T last, Coordination cn)
1608  -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1609  return rxs::range<T>(first, last, std::move(cn));
1610  }
1613  template<class T, class Coordination>
1614  static auto range(T first, Coordination cn)
1615  -> decltype(rxs::range<T>(first, std::move(cn))) {
1616  return rxs::range<T>(first, std::move(cn));
1617  }
1618 
1621  template<class T>
1622  static auto never()
1623  -> decltype(rxs::never<T>()) {
1624  return rxs::never<T>();
1625  }
1626 
1629  template<class ObservableFactory>
1630  static auto defer(ObservableFactory of)
1631  -> decltype(rxs::defer(std::move(of))) {
1632  return rxs::defer(std::move(of));
1633  }
1634 
1637  template<class... AN>
1638  static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1639  -> decltype(rxs::interval(period)) {
1640  return rxs::interval(period);
1641  static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1642  }
1645  template<class Coordination>
1646  static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1647  -> decltype(rxs::interval(period, std::move(cn))) {
1648  return rxs::interval(period, std::move(cn));
1649  }
1652  template<class... AN>
1653  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1654  -> decltype(rxs::interval(initial, period)) {
1655  return rxs::interval(initial, period);
1656  static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1657  }
1660  template<class Coordination>
1661  static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1662  -> decltype(rxs::interval(initial, period, std::move(cn))) {
1663  return rxs::interval(initial, period, std::move(cn));
1664  }
1665 
1668  template<class... AN>
1669  static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1670  -> decltype(rxs::timer(at)) {
1671  return rxs::timer(at);
1672  static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1673  }
1676  template<class... AN>
1677  static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1678  -> decltype(rxs::timer(after)) {
1679  return rxs::timer(after);
1680  static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1681  }
1684  template<class Coordination>
1685  static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1686  -> decltype(rxs::timer(when, std::move(cn))) {
1687  return rxs::timer(when, std::move(cn));
1688  }
1691  template<class Coordination>
1692  static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1693  -> decltype(rxs::timer(when, std::move(cn))) {
1694  return rxs::timer(when, std::move(cn));
1695  }
1696 
1699  template<class Collection>
1700  static auto iterate(Collection c)
1701  -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1702  return rxs::iterate(std::move(c), identity_current_thread());
1703  }
1706  template<class Collection, class Coordination>
1707  static auto iterate(Collection c, Coordination cn)
1708  -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1709  return rxs::iterate(std::move(c), std::move(cn));
1710  }
1711 
1714  template<class T>
1715  static auto from()
1716  -> decltype( rxs::from<T>()) {
1717  return rxs::from<T>();
1718  }
1721  template<class T, class Coordination>
1722  static auto from(Coordination cn)
1723  -> typename std::enable_if<is_coordination<Coordination>::value,
1724  decltype( rxs::from<T>(std::move(cn)))>::type {
1725  return rxs::from<T>(std::move(cn));
1726  }
1729  template<class Value0, class... ValueN>
1730  static auto from(Value0 v0, ValueN... vn)
1731  -> typename std::enable_if<!is_coordination<Value0>::value,
1732  decltype( rxs::from(v0, vn...))>::type {
1733  return rxs::from(v0, vn...);
1734  }
1737  template<class Coordination, class Value0, class... ValueN>
1738  static auto from(Coordination cn, Value0 v0, ValueN... vn)
1739  -> typename std::enable_if<is_coordination<Coordination>::value,
1740  decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1741  return rxs::from(std::move(cn), v0, vn...);
1742  }
1743 
1746  template<class T>
1747  static auto just(T v)
1748  -> decltype(rxs::just(std::move(v))) {
1749  return rxs::just(std::move(v));
1750  }
1753  template<class T, class Coordination>
1754  static auto just(T v, Coordination cn)
1755  -> decltype(rxs::just(std::move(v), std::move(cn))) {
1756  return rxs::just(std::move(v), std::move(cn));
1757  }
1758 
1761  template<class Observable, class Value0, class... ValueN>
1762  static auto start_with(Observable o, Value0 v0, ValueN... vn)
1763  -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1764  return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1765  }
1766 
1769  template<class T>
1770  static auto empty()
1771  -> decltype(from<T>()) {
1772  return from<T>();
1773  }
1776  template<class T, class Coordination>
1777  static auto empty(Coordination cn)
1778  -> decltype(from<T>(std::move(cn))) {
1779  return from<T>(std::move(cn));
1780  }
1781 
1784  template<class T, class Exception>
1785  static auto error(Exception&& e)
1786  -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1787  return rxs::error<T>(std::forward<Exception>(e));
1788  }
1791  template<class T, class Exception, class Coordination>
1792  static auto error(Exception&& e, Coordination cn)
1793  -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1794  return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1795  }
1796 
1799  template<class ResourceFactory, class ObservableFactory>
1800  static auto scope(ResourceFactory rf, ObservableFactory of)
1801  -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1802  return rxs::scope(std::move(rf), std::move(of));
1803  }
1804 };
1805 
1806 }
1807 
1808 //
1809 // support range() >> filter() >> subscribe() syntax
1810 // '>>' is spelled 'stream'
1811 //
1812 template<class T, class SourceOperator, class OperatorFactory>
1813 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1814  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1815  return source.op(std::forward<OperatorFactory>(of));
1816 }
1817 
1818 //
1819 // support range() | filter() | subscribe() syntax
1820 // '|' is spelled 'pipe'
1821 //
1822 template<class T, class SourceOperator, class OperatorFactory>
1823 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1824  -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1825  return source.op(std::forward<OperatorFactory>(of));
1826 }
1827 
1828 #endif
auto distinct_until_changed(AN &&... an) const
For each item from this observable, filter out consequentially repeated values and emit only changes ...
Definition: rx-observable.hpp:908
Definition: rx-operators.hpp:126
auto delay(AN &&... an) const
Return an observable that emits each item emitted by the source observable after the specified delay...
Definition: rx-observable.hpp:886
auto skip_last(AN... an) const
Make new observable with skipped last count items from this observable.
Definition: rx-observable.hpp:1378
Definition: rx-operators.hpp:366
auto with_latest_from(AN... an) const
For each item from the first observable select the latest value from all the observables to emit from...
Definition: rx-observable.hpp:1106
auto time_interval(AN &&... an) const
Returns an observable that emits indications of the amount of time lapsed between consecutive emissio...
Definition: rx-observable.hpp:787
auto concat_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1095
Definition: rx-operators.hpp:248
void on_subscribe(subscriber< T > o) const
Definition: rx-observable.hpp:81
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1274
Definition: rx-operators.hpp:380
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items...
Definition: rx-observable.hpp:1322
Definition: rx-operators.hpp:143
auto defer(ObservableFactory of) -> observable< rxu::value_type_t< detail::defer_traits< ObservableFactory >>, detail::defer< ObservableFactory >>
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-defer.hpp:73
auto merge_transform(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1062
auto retry(AN... an) const
Retry this observable for the given number of times.
Definition: rx-observable.hpp:1455
Definition: rx-observable.hpp:157
auto skip_until(AN... an) const
Make new observable with items skipped until on_next occurs on the trigger observable or until the sp...
Definition: rx-observable.hpp:1389
Definition: rx-operators.hpp:387
Definition: rx-operators.hpp:458
auto operator|(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1823
static auto create(OnSubscribe os) -> decltype(rxs::create< T >(std::move(os)))
Returns an observable that executes the specified function when a subscriber subscribes to it...
Definition: rx-observable.hpp:1585
Definition: rx-all.hpp:26
auto skip(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1356
Definition: rx-predef.hpp:302
static auto range(T first, T last, Coordination cn) -> decltype(rxs::range< T >(first, last, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1607
auto element_at(AN &&... an) const
Pulls an item located at a specified index location in the sequence of items and emits that item as i...
Definition: rx-observable.hpp:919
observable(const observable< T, SO > &o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:585
Definition: rx-operators.hpp:329
auto any(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:699
auto reduce(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1228
source_operator_type source_operator
Definition: rx-observable.hpp:519
static auto from(Coordination cn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from< T >(std::move(cn)))>::type
Definition: rx-observable.hpp:1722
dynamic_observable(SOF &&sof, typename std::enable_if<!is_dynamic_observable< SOF >::value, void **>::type=0)
Definition: rx-observable.hpp:74
auto timeout(AN &&... an) const
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-observable.hpp:798
T max() const
Definition: rx-observable.hpp:431
auto take_while(AN &&... an) const
For the first items fulfilling the predicate from this observable emit them from the new observable t...
Definition: rx-observable.hpp:1433
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-interval.hpp:113
dynamic_observable()
Definition: rx-observable.hpp:69
Definition: rx-operators.hpp:444
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:157
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1298
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:612
Definition: rx-operators.hpp:282
auto operator>>(const rxcpp::observable< T, SourceOperator > &source, OperatorFactory &&of) -> decltype(source.op(std::forward< OperatorFactory >(of)))
Definition: rx-observable.hpp:1813
auto observable_member(Tag, AN &&... an) -> decltype(Overload::member(std::forward< AN >(an)...))
Definition: rx-operators.hpp:63
rxu::decay_t< Observable > observable_type
Definition: rx-observable.hpp:246
auto window(AN &&... an) const
Return an observable that emits connected, non-overlapping windows, each containing at most count ite...
Definition: rx-observable.hpp:930
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1184
auto subscribe(ArgN &&... an) const -> composite_subscription
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-observable.hpp:669
Definition: rx-operators.hpp:289
Definition: rx-operators.hpp:296
auto error(E e) -> decltype(detail::make_error< T >(typename std::conditional< std::is_same< std::exception_ptr, rxu::decay_t< E >>::value, detail::throw_ptr_tag, detail::throw_instance_tag >::type(), std::move(e), identity_immediate()))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-error.hpp:114
Definition: rx-operators.hpp:352
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
auto combine_latest(AN... an) const
For each item from all of the observables select a value to emit from the new observable that is retu...
Definition: rx-observable.hpp:1118
static auto iterate(Collection c, Coordination cn) -> decltype(rxs::iterate(std::move(c), std::move(cn)))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1707
Definition: rx-operators.hpp:421
observable(const source_operator_type &o)
Definition: rx-observable.hpp:574
Definition: rx-operators.hpp:500
Definition: rx-operators.hpp:394
Definition: rx-operators.hpp:472
Definition: rx-operators.hpp:331
Definition: rx-operators.hpp:234
auto AN
Definition: rx-finally.hpp:105
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-reduce.hpp:496
auto iterate(Collection c) -> observable< rxu::value_type_t< detail::iterate_traits< Collection >>, detail::iterate< Collection, identity_one_worker >>
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-iterate.hpp:160
Definition: rx-operators.hpp:255
static auto iterate(Collection c) -> decltype(rxs::iterate(std::move(c), identity_current_thread()))
Returns an observable that sends each value in the collection, on the specified scheduler.
Definition: rx-observable.hpp:1700
Definition: rx-operators.hpp:199
Definition: rx-predef.hpp:156
T value_type
Definition: rx-observable.hpp:562
auto tap(AN &&... an) const
inspect calls to on_next, on_error and on_completed.
Definition: rx-observable.hpp:776
auto subscribe(ArgN &&... an) const -> void
Definition: rx-observable.hpp:269
auto zip(AN &&... an) const
Bring by one item from all given observables and select a value to emit from the new observable that ...
Definition: rx-observable.hpp:1129
Definition: rx-operators.hpp:521
auto scan(AN... an) const
For each item from this observable use Accumulator to combine items into a value that will be emitted...
Definition: rx-observable.hpp:1334
auto subscribe_with_rethrow(ArgN &&... an) const -> void
Definition: rx-observable.hpp:294
Definition: rx-operators.hpp:345
auto start_with(AN... an) const
Start with the supplied values, then concatenate this observable.
Definition: rx-observable.hpp:1466
rxu::decay_t< SourceOperator > source_operator_type
Definition: rx-observable.hpp:518
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
static auto interval(rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1638
auto exists(AN &&... an) const
Returns an Observable that emits true if any item emitted by the source Observable satisfies a specif...
Definition: rx-observable.hpp:710
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1250
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items...
Definition: rx-observable.hpp:1310
a source of values whose methods block until all values have been emitted. subscribe or use one of th...
Definition: rx-observable.hpp:169
auto switch_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits items from a backup Observable...
Definition: rx-observable.hpp:743
auto filter(AN &&... an) const
For each item from this observable use Predicate to select which items to emit from the new observabl...
Definition: rx-observable.hpp:732
~blocking_observable()
Definition: rx-observable.hpp:248
static auto just(T v) -> decltype(rxs::just(std::move(v)))
Definition: rx-observable.hpp:1747
static auto from(Value0 v0, ValueN... vn) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(rxs::from(v0, vn...))>::type
Definition: rx-observable.hpp:1730
Definition: rx-operators.hpp:373
static auto defer(ObservableFactory of) -> decltype(rxs::defer(std::move(of)))
Returns an observable that calls the specified observable factory to create an observable for each ne...
Definition: rx-observable.hpp:1630
auto contains(AN &&... an) const
Returns an Observable that emits true if the source Observable emitted a specified item...
Definition: rx-observable.hpp:721
Definition: rx-operators.hpp:316
static auto start_with(Observable o, Value0 v0, ValueN... vn) -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...))
Definition: rx-observable.hpp:1762
Definition: rx-sources.hpp:15
Definition: rx-operators.hpp:451
Definition: rx-operators.hpp:227
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
Definition: rx-util.hpp:404
auto debounce(AN &&... an) const
Return an observable that emits an item if a particular timespan has passed without emitting another ...
Definition: rx-observable.hpp:875
auto multicast(AN &&... an) const
Definition: rx-observable.hpp:1162
Definition: rx-observable.hpp:36
auto group_by(AN &&... an) const
Return an observable that emits grouped_observables, each of which corresponds to a unique key value ...
Definition: rx-observable.hpp:1140
static auto timer(rxsc::scheduler::clock_type::time_point at, AN **...) -> decltype(rxs::timer(at))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1669
auto take_last(AN &&... an) const
Emit only the final t items emitted by the source Observable.
Definition: rx-observable.hpp:1411
observable< T > make_observable_dynamic(Source &&s)
Definition: rx-observable.hpp:102
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1286
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1685
~observable()
Definition: rx-observable.hpp:566
auto map(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:853
auto sample_with_time(AN &&... an) const
Return an Observable that emits the most recent items emitted by the source Observable within periodi...
Definition: rx-observable.hpp:1345
Definition: rx-operators.hpp:241
Definition: rx-operators.hpp:117
Definition: rx-operators.hpp:213
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-operators.hpp:430
auto pairwise(AN... an) const
Take values pairwise from this observable.
Definition: rx-observable.hpp:1477
auto replay(AN &&... an) const
1) replay(optional Coordination, optional CompositeSubscription) Turn a cold observable hot...
Definition: rx-observable.hpp:1195
Definition: rx-sources.hpp:23
Definition: rx-operators.hpp:38
Definition: rx-operators.hpp:268
auto sequence_equal(AN... an) const
Determine whether two Observables emit the same sequence of items.
Definition: rx-observable.hpp:765
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1262
auto start_with(AN &&... an) -> operator_factory< start_with_tag, AN... >
Start with the supplied values, then concatenate this observable.
Definition: rx-start_with.hpp:53
auto trace_activity() -> decltype(rxcpp_trace_activity(trace_tag()))&
Definition: rx-predef.hpp:15
bool operator!=(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:103
Definition: rx-operators.hpp:129
Definition: rx-operators.hpp:437
auto merge_delay_error(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned. The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
Definition: rx-observable.hpp:1029
Definition: rx-operators.hpp:359
std::enable_if< is_subscriber< Subscriber >::value, void >::type on_subscribe(Subscriber o) const
Definition: rx-observable.hpp:87
static auto from() -> decltype(rxs::from< T >())
Definition: rx-observable.hpp:1715
Definition: rx-operators.hpp:479
Definition: rx-operators.hpp:136
auto repeat(AN... an) const
Repeat this observable for the given number of times or infinitely.
Definition: rx-observable.hpp:1444
auto take(AN... an) const
For the first count items from this observable emit them from the new observable that is returned...
Definition: rx-observable.hpp:1400
Definition: rx-operators.hpp:206
Definition: rx-operators.hpp:192
auto subscribe_on(AN &&... an) const
Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordi...
Definition: rx-observable.hpp:1206
tag_dynamic_observable dynamic_observable_tag
Definition: rx-observable.hpp:67
Definition: rx-operators.hpp:103
auto distinct(AN &&... an) const
For each item from this observable, filter out repeated values and emit only items that have not alre...
Definition: rx-observable.hpp:897
auto skip_while(AN... an) const
Make new observable with skipped first count items from this observable.
Definition: rx-observable.hpp:1367
static auto error(Exception &&e, Coordination cn) -> decltype(rxs::error< T >(std::forward< Exception >(e), std::move(cn)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1792
static auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> decltype(rxs::range< T >(first, last, step, identity_current_thread()))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1593
static auto empty() -> decltype(from< T >())
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1770
Definition: rx-operators.hpp:486
Definition: rx-operators.hpp:110
Definition: rx-operators.hpp:57
auto merge(AN... an) const
For each given observable subscribe. For each item emitted from all of the given observables, deliver from the new observable that is returned.
Definition: rx-observable.hpp:1018
bool operator==(const dynamic_grouped_observable< K, T > &lhs, const dynamic_grouped_observable< K, T > &rhs)
Definition: rx-grouped_observable.hpp:99
Definition: rx-operators.hpp:220
Definition: rx-predef.hpp:270
auto buffer(AN &&... an) const
Return an observable that emits connected, non-overlapping buffer, each containing at most count item...
Definition: rx-observable.hpp:974
Definition: rx-operators.hpp:185
auto lift(Operator &&op) -> detail::lift_factory< ResultType, Operator >
Definition: rx-lift.hpp:101
static auto range(T first, Coordination cn) -> decltype(rxs::range< T >(first, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1614
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN **...) -> decltype(rxs::interval(initial, period))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1653
static auto error(Exception &&e) -> decltype(rxs::error< T >(std::forward< Exception >(e)))
Returns an observable that sends no items to observer and immediately generates an error...
Definition: rx-observable.hpp:1785
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(initial, period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1661
observable_type source
Definition: rx-observable.hpp:247
auto buffer_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping buffers of items from the source observabl...
Definition: rx-observable.hpp:996
Definition: rx-operators.hpp:465
Definition: rx-operators.hpp:261
Definition: rx-operators.hpp:275
const scheduler & make_current_thread()
Definition: rx-currentthread.hpp:263
auto switch_on_error(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:842
Definition: rx-operators.hpp:493
auto switch_on_next(AN &&... an) const
Return observable that emits the items emitted by the observable most recently emitted by the source ...
Definition: rx-observable.hpp:1007
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
auto flat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1051
observable< T > as_dynamic(AN **...) const
Definition: rx-observable.hpp:604
auto window_with_time(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:941
auto all(AN &&... an) const
Returns an Observable that emits true if every item emitted by the source Observable satisfies a spec...
Definition: rx-observable.hpp:677
auto amb(AN... an) const
For each item from only the first of the given observables deliver from the new observable that is re...
Definition: rx-observable.hpp:1040
Definition: rx-operators.hpp:150
auto window_with_time_or_count(AN &&... an) const
Return an observable that emits connected, non-overlapping windows of items from the source observabl...
Definition: rx-observable.hpp:952
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) -> decltype(rxs::timer(when, std::move(cn)))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1692
auto take_until(AN &&... an) const
For each item from this observable until on_next occurs on the trigger observable or until the specif...
Definition: rx-observable.hpp:1422
auto transform(AN &&... an) const
For each item from this observable use Selector to produce an item to emit from the new observable th...
Definition: rx-observable.hpp:864
Definition: rx-operators.hpp:338
int count() const
Definition: rx-observable.hpp:365
auto scope(ResourceFactory rf, ObservableFactory of) -> observable< rxu::value_type_t< detail::scope_traits< ResourceFactory, ObservableFactory >>, detail::scope< ResourceFactory, ObservableFactory >>
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-scope.hpp:114
void unsubscribe() const
Definition: rx-subscription.hpp:170
auto ignore_elements(AN &&... an) const
Do not emit any items from the source Observable, but allow termination notification (either onError ...
Definition: rx-observable.hpp:1151
static auto from(Coordination cn, Value0 v0, ValueN... vn) -> typename std::enable_if< is_coordination< Coordination >::value, decltype(rxs::from(std::move(cn), v0, vn...))>::type
Definition: rx-observable.hpp:1738
blocking_observable(observable_type s)
Definition: rx-observable.hpp:251
T sum() const
Definition: rx-observable.hpp:389
Definition: rx-operators.hpp:164
auto timestamp(AN &&... an) const
Returns an observable that attaches a timestamp to each item emitted by the source observable indicat...
Definition: rx-observable.hpp:809
Definition: rx-operators.hpp:323
Definition: rx-operators.hpp:507
auto is_empty(AN &&... an) const
Returns an Observable that emits true if the source Observable is empty, otherwise false...
Definition: rx-observable.hpp:688
T min() const
Definition: rx-observable.hpp:452
auto buffer_with_time(AN &&... an) const
Return an observable that emits buffers every period time interval and collects items from this obser...
Definition: rx-observable.hpp:985
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Subscribe will cause the source observable to emit values to the provided subscriber.
Definition: rx-subscribe.hpp:87
Definition: rx-predef.hpp:128
static auto scope(ResourceFactory rf, ObservableFactory of) -> decltype(rxs::scope(std::move(rf), std::move(of)))
Returns an observable that makes an observable by the specified observable factory using the resource...
Definition: rx-observable.hpp:1800
Definition: rx-operators.hpp:423
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes...
Definition: rx-observable.hpp:754
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-operators.hpp:119
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) -> decltype(rxs::interval(period, std::move(cn)))
Returns an observable that emits a sequential integer every specified time interval, on the specified scheduler.
Definition: rx-observable.hpp:1646
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> decltype(rxs::range< T >(first, last, step, std::move(cn)))
Returns an observable that sends values in the range first-last by adding step to the previous value...
Definition: rx-observable.hpp:1600
double average() const
Definition: rx-observable.hpp:410
auto first(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:315
Definition: rx-operators.hpp:178
static auto just(T v, Coordination cn) -> decltype(rxs::just(std::move(v), std::move(cn)))
Definition: rx-observable.hpp:1754
rxu::value_type_t< delayed_type< T, AN... > > delayed_type_t
Definition: rx-operators.hpp:60
auto observe_on(AN &&... an) const
All values are queued and delivered using the scheduler from the supplied coordination.
Definition: rx-observable.hpp:1217
auto accumulate(AN &&... an) const
For each item from this observable use Accumulator to combine items, when completed use ResultSelecto...
Definition: rx-observable.hpp:1239
observable(observable< T, SO > &&o)
implicit conversion between observables of the same value_type
Definition: rx-observable.hpp:590
Definition: rx-predef.hpp:115
friend bool operator==(const observable< U, SO > &, const observable< U, SO > &)
Definition: rx-sources.hpp:17
auto window_toggle(AN &&... an) const
Return an observable that emits observables every period time interval and collects items from this o...
Definition: rx-observable.hpp:963
auto on_error_resume_next(AN &&... an) const
If an error occurs, take the result from the Selector and subscribe to that instead.
Definition: rx-observable.hpp:831
observable()
Definition: rx-observable.hpp:570
auto last(AN **...) -> delayed_type_t< T, AN... > const
Definition: rx-observable.hpp:343
static auto never() -> decltype(rxs::never< T >())
Returns an observable that never sends any items or notifications to observer.
Definition: rx-observable.hpp:1622
auto concat(AN... an) const
For each item from this observable subscribe to one at a time, in the order received. For each item from all of the given observables deliver from the new observable that is returned.
Definition: rx-observable.hpp:1073
Definition: rx-predef.hpp:126
auto concat_map(AN &&... an) const
For each item from this observable use the CollectionSelector to produce an observable and subscribe ...
Definition: rx-observable.hpp:1084
Definition: rx-operators.hpp:127
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
friend bool operator==(const dynamic_observable< U > &, const dynamic_observable< U > &)
auto publish(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions...
Definition: rx-observable.hpp:1173
static auto empty(Coordination cn) -> decltype(from< T >(std::move(cn)))
Returns an observable that sends no items to observer and immediately completes, on the specified sch...
Definition: rx-observable.hpp:1777
Definition: rx-operators.hpp:415
Definition: rx-operators.hpp:408
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Returns an observable that emits an integer at the specified time point.
Definition: rx-timer.hpp:114
observable(source_operator_type &&o)
Definition: rx-observable.hpp:578
Definition: rx-operators.hpp:401
static auto timer(rxsc::scheduler::clock_type::duration after, AN **...) -> decltype(rxs::timer(after))
Returns an observable that emits an integer at the specified time point.
Definition: rx-observable.hpp:1677
Definition: rx-operators.hpp:514