Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
control_task_queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_ctl/control_task_queue.h
10//! @brief Control task queue.
11
12#ifndef ROC_CTL_CONTROL_TASK_QUEUE_H_
13#define ROC_CTL_CONTROL_TASK_QUEUE_H_
14
15#include "roc_core/atomic.h"
16#include "roc_core/list.h"
17#include "roc_core/mpsc_queue.h"
18#include "roc_core/mutex.h"
19#include "roc_core/thread.h"
20#include "roc_core/time.h"
21#include "roc_core/timer.h"
25
26namespace roc {
27namespace ctl {
28
29//! Control task queue.
30//!
31//! This class implements a thread-safe task queue, allowing lock-free scheduling
32//! of tasks for immediate or delayed execution on the background thread, as well
33//! as lock-free task cancellation and re-scheduling (changing deadline).
34//!
35//! It also supports tasks to be paused and resumed. Task resuming is lock-free too.
36//!
37//! Note that those operations are lock-free only if core::Timer::try_set_deadline()
38//! is so, which however is true on modern platforms.
39//!
40//! In the current implementation, priority is given to fast scheduling and cancellation
41//! over the strict observance of the scheduling deadlines. In other words, during
42//! contention or peak load, scheduling and cancellation will be always fast, but task
43//! execution may be delayed.
44//!
45//! This design was considered acceptable because the actual users of control task queue
46//! are more sensitive to delays than the tasks they schedule. The task queue is used by
47//! network and pipeline threads, which should never block and use the task queue to
48//! schedule low-priority delayed work.
49//!
50//! The implementation uses three queues internally:
51//!
52//! - ready_queue_ - a lock-free queue of tasks of three kinds:
53//! - tasks to be resumed after pause (flags_ & FlagResumed != 0)
54//! - tasks to be executed as soon as possible (renewed_deadline_ == 0)
55//! - tasks to be re-scheduled with another deadline (renewed_deadline_ > 0)
56//! - tasks to be cancelled (renewed_deadline_ < 0)
57//!
58//! - sleeping_queue_ - a sorted queue of tasks with non-zero deadline, scheduled for
59//! execution in future; the task at the head has the smallest (nearest) deadline;
60//!
61//! - pause_queue_ - an unsorted queue to keep track of all currently paused tasks.
62//!
63//! task_mutex_ should be acquired to process tasks and/or to access sleeping_queue_
64//! and pause_queue_, as well as non-atomic task fields.
65//!
66//! wakeup_timer_ (core::Timer) is used to set or wait for the next wakeup time of the
67//! background thread. This time is set to zero when ready_queue_ is non-empty, otherwise
68//! it is set to the deadline of the first task in sleeping_queue_ if it's non-empty, and
69//! otherwise is set to infinity (-1). The timer allows to update the deadline
70//! concurrently from any thread.
71//!
72//! When the task is scheduled, re-scheduled, or cancelled, there are two ways to
73//! complete the operation:
74//!
75//! - If the event loop thread is sleeping and the task_mutex_ is free, we can acquire
76//! the mutex and complete the operation in-place by manipulating sleeping_queue_
77//! under the mutex, without bothering event loop thread. This can be done only if
78//! we're changing task scheduling and not going to execute it right now.
79//!
80//! - Otherwise, we push the task to ready_queue_ (which has lock-free push), set
81//! the timer wakeup time to zero (to ensure that the event loop thread wont go to
82//! sleep), and return, leaving the completion of the operarion to the event loop
83//! thread. The event loop thread will fetch the task from ready_queue_ soon and
84//! complete the operation by manipulating the sleeping_queue_.
85//!
86//! The current task state is defined by its atomic field "state_". Various task queue
87//! operations move task from one state to another. The move is always performed using
88//! atomic CAS or exchange to handle concurrent lock-free updates correctly.
89//!
90//! There is also "flags_" field that provides additional information about task that is
91//! preserved accross transitions between states; for example that task is being resumed.
92//!
93//! Here are some example flows of the task states:
94//! @code
95//! schedule():
96//! StateCompleted -> StateReady
97//! -> StateProcessing -> StateCompleting -> StateCompleted
98//!
99//! schedule_at():
100//! StateCompleted -> StateReady
101//! -> StateSleeping
102//! -> StateProcessing -> StateCompleting -> StateCompleted
103//!
104//! resume():
105//! StateSleeping -> StateReady
106//! -> StateProcessing -> StateCompleting -> StateCompleted
107//!
108//! async_cancel():
109//! StateSleeping -> StateReady
110//! -> StateCancelling -> StateCompleting -> StateCompleted
111//! @endcode
112//!
113//! The meaning of the states is the following:
114//! - StateReady: task is added to the ready queue for execution or renewal,
115//! or probably is currently being renewed in-place
116//! - StateSleeping: task renewal is complete and the task was put into the sleeping
117//! queue to wait its deadline, or to paused queue to wait resume
118//! - StateCancelling: task renewal is complete and the task is being cancelled
119//! because it was put to ready queue for cancellation
120//! - StateProcessing: task is being processed after fetching it either from ready
121//! queue (if it was put there for execution) or sleeping queue
122//! - StateCompleting: task processing is complete and the task is being completed
123//! - StateCompleted: task is completed and is not used anywhere; it may be safely
124//! destroyed or reused; this is also the initial task state
126public:
127 //! Initialize.
128 //! @remarks
129 //! Starts background thread.
131
132 //! Destroy.
133 //! @remarks
134 //! stop_and_wait() should be called before destructor.
136
137 //! Check if the object was successfully constructed.
138 bool valid() const;
139
140 //! Enqueue a task for asynchronous execution as soon as possible.
141 //!
142 //! This is like schedule_at(), but the deadline is "as soon as possible".
144 IControlTaskExecutor& executor,
145 IControlTaskCompleter* completer);
146
147 //! Enqueue a task for asynchronous execution at given point of time.
148 //!
149 //! - If the task is already completed, it's scheduled with given deadline.
150 //! - If the task is sleeping and waiting for deadline, it's deadline is updated.
151 //! - If the task is in processing, completion or cancellation phase, it's scheduled
152 //! to be executed again after completion or cancellation finishes.
153 //! - If the task is paused, re-scheduling is postponed until task resumes.
154 //!
155 //! @p deadline should be in the same domain as core::timestamp().
156 //! It can't be negative. Zero deadline means "execute as soon as possible".
157 //!
158 //! The @p executor is used to invoke the task function. It allows to implement
159 //! tasks in different classes. If a class T wants to implement tasks, it should
160 //! inherit ControlTaskExecutor<T>.
161 //!
162 //! If @p completer is present, the task should not be destroyed until completer is
163 //! invoked. The completer is invoked on event loop thread after once and only once,
164 //! after the task completes or is cancelled. Completer should never block.
165 //!
166 //! The event loop thread assumes that the task may be destroyed right after it is
167 //! completed and it's completer is called (if it's present), and don't touch task
168 //! after this, unless the user explicitly reschedules the task.
170 core::nanoseconds_t deadline,
171 IControlTaskExecutor& executor,
172 IControlTaskCompleter* completer);
173
174 //! Tesume task if it's paused.
175 //!
176 //! - If the task is paused, schedule it for execution.
177 //! - If the task is being processed right now (i.e. it's executing or will be
178 //! executing very soon), then postpone decision until task execution ends. After
179 //! the task execution, if the task asked to pause, then immediately resume it.
180 //! - Otherwise, do nothing.
181 //!
182 //! If resume is called one or multiple times before task execution, those calls
183 //! are ignored. Only calls made during or after task execution are honored, and
184 //! only if the task execution leaved task in paused state.
185 //!
186 //! Subsequent resume calls between task executions are collapsed into one; even if
187 //! resume was called multiple after task paused and before it's executed again,
188 //! next pause will need a new resume call.
189 void resume(ControlTask& task);
190
191 //! Try to cancel scheduled task execution, if it's not executed yet.
192 //!
193 //! - If the task is already completed or is being completed or cancelled, do nothing.
194 //! - If the task is sleeping or paused, cancel task execution.
195 //! - If the task is being processed right now (i.e. it's executing or will be
196 //! executing very soon), then postpone decision until task execution ends. After
197 //! the task execution, if the task asked to pause or continue, then cancellation
198 //! request is fulfilled and the task is cancelled; otherwise cancellation request
199 //! is ignored and the task is completed normally.
200 //!
201 //! When the task is being cancelled instead of completed, if it has completer, the
202 //! completer is invoked.
204
205 //! Wait until the task is completed.
206 //!
207 //! Blocks until the task is completed or cancelled.
208 //! Does NOT wait until the task completer is called.
209 //!
210 //! Can not be called concurrently for the same task (will cause crash).
211 //! Can not be called from the task completion handler (will cause deadlock).
212 //!
213 //! If this method is called, the task should not be destroyed until this method
214 //! returns (as well as until the completer is invoked, if it's present).
215 void wait(ControlTask& task);
216
217 //! Stop thread and wait until it terminates.
218 //!
219 //! All tasks should be completed before calling stop_and_wait().
220 //! stop_and_wait() should be called before calling destructor.
222
223private:
224 virtual void run();
225
226 void start_thread_();
227 void stop_thread_();
228
229 void setup_task_(ControlTask& task,
230 IControlTaskExecutor& executor,
231 IControlTaskCompleter* completer);
232
233 void request_resume_(ControlTask& task);
234 void request_renew_(ControlTask& task, core::nanoseconds_t deadline);
235 void request_renew_guarded_(ControlTask& task, core::nanoseconds_t deadline);
236
237 bool try_renew_inplace_(ControlTask& task,
238 core::nanoseconds_t deadline,
240
241 ControlTask::State
242 renew_state_(ControlTask& task, unsigned task_flags, core::nanoseconds_t deadline);
243 bool renew_scheduling_(ControlTask& task,
244 unsigned task_flags,
245 core::nanoseconds_t deadline,
247
248 bool reschedule_task_(ControlTask& task,
249 core::nanoseconds_t deadline,
251 void cancel_task_(ControlTask& task, core::seqlock_version_t version);
252
253 void reborn_task_(ControlTask& task, ControlTask::State from_state);
254 void pause_task_(ControlTask& task, ControlTask::State from_state);
255 void
256 complete_task_(ControlTask& task, unsigned task_flags, ControlTask::State from_state);
257 void wait_task_(ControlTask& task);
258
259 void execute_task_(ControlTask& task);
260
261 bool process_tasks_();
262
263 ControlTask* fetch_task_();
264 ControlTask* fetch_ready_task_();
265 ControlTask* fetch_sleeping_task_();
266
267 void insert_sleeping_task_(ControlTask& task);
268 void remove_sleeping_task_(ControlTask& task);
269
270 core::nanoseconds_t update_wakeup_timer_();
271
272 bool started_;
273 core::Atomic<int> stop_;
274 bool fetch_ready_;
275
276 core::Atomic<int> ready_queue_size_;
280
281 core::Timer wakeup_timer_;
282 core::Mutex task_mutex_;
283};
284
285} // namespace ctl
286} // namespace roc
287
288#endif // ROC_CTL_CONTROL_TASK_QUEUE_H_
Atomic.
Atomic integer. Provides sequential consistency. For a fine-grained memory order control,...
Definition: atomic.h:26
Intrusive doubly-linked list.
Definition: list.h:35
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
Mutex.
Definition: mutex.h:30
Base class for thread objects.
Definition: thread.h:26
Thread-safe timer.
Definition: timer.h:25
void wait(ControlTask &task)
Wait until the task is completed.
void schedule(ControlTask &task, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution as soon as possible.
virtual ~ControlTaskQueue()
Destroy.
void resume(ControlTask &task)
Tesume task if it's paused.
void async_cancel(ControlTask &task)
Try to cancel scheduled task execution, if it's not executed yet.
bool valid() const
Check if the object was successfully constructed.
void schedule_at(ControlTask &task, core::nanoseconds_t deadline, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution at given point of time.
void stop_and_wait()
Stop thread and wait until it terminates.
Base class for control tasks.
Definition: control_task.h:53
Control task completion handler.
Control task executor interface.
Control task.
Control task executor.
Control task completion handler.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
Mutex.
uint32_t seqlock_version_t
Type for holding seqlock value version. Version is changed each value update. May wrap.
Definition: seqlock.h:26
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
Root namespace.
Thread.
Time definitions.
Thread-safe timer.