10 #ifndef __PION_SCHEDULER_HEADER__
11 #define __PION_SCHEDULER_HEADER__
14 #include <boost/asio.hpp>
15 #include <boost/assert.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/function/function0.hpp>
18 #include <boost/cstdint.hpp>
19 #include <boost/shared_ptr.hpp>
20 #include <boost/noncopyable.hpp>
21 #include <boost/thread/thread.hpp>
22 #include <boost/thread/mutex.hpp>
23 #include <boost/thread/xtime.hpp>
24 #include <boost/thread/condition.hpp>
25 #include <pion/config.hpp>
26 #include <pion/logger.hpp>
35 private boost::noncopyable
41 : m_logger(PION_GET_LOGGER(
"pion.scheduler")),
42 m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
52 virtual void shutdown(
void);
60 void add_active_user(
void);
63 void remove_active_user(
void);
66 inline bool is_running(
void)
const {
return m_is_running; }
81 virtual boost::asio::io_service& get_io_service(
void) = 0;
88 virtual void post(boost::function0<void> work_func) {
89 get_io_service().post(work_func);
98 void keep_running(boost::asio::io_service& my_service,
99 boost::asio::deadline_timer& my_timer);
107 inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
108 boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
109 boost::thread::sleep(wakeup_time);
121 template <
typename ConditionType,
typename LockType>
122 inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
123 boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
125 boost::system_time wakeup_time(get_wakeup_time(sleep_sec, sleep_nsec));
126 wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
131 void process_service_work(boost::asio::io_service& service);
144 static boost::system_time get_wakeup_time(boost::uint32_t sleep_sec,
145 boost::uint32_t sleep_nsec);
215 if (! m_thread_pool.empty()) {
216 PION_LOG_DEBUG(m_logger,
"Waiting for threads to shutdown");
219 boost::thread current_thread;
220 for (ThreadPool::iterator i = m_thread_pool.begin();
221 i != m_thread_pool.end(); ++i)
225 if (**i != current_thread) (*i)->join();
235 typedef std::vector<boost::shared_ptr<boost::thread> >
ThreadPool;
253 : m_service(), m_timer(m_service)
263 virtual void startup(
void);
293 : m_service_pool(), m_next_service(0)
301 boost::mutex::scoped_lock scheduler_lock(m_mutex);
302 while (m_service_pool.size() < m_num_threads) {
304 m_service_pool.push_back(service_ptr);
306 if (++m_next_service >= m_num_threads)
308 BOOST_ASSERT(m_next_service < m_num_threads);
309 return m_service_pool[m_next_service]->first;
319 BOOST_ASSERT(n < m_num_threads);
320 BOOST_ASSERT(n < m_service_pool.size());
321 return m_service_pool[n]->first;
325 virtual void startup(
void);
332 for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
344 boost::asio::io_service first;
345 boost::asio::deadline_timer second;
virtual void post(boost::function0< void > work_func)
virtual ~multi_thread_scheduler()
virtual destructor
boost::condition m_scheduler_has_stopped
condition triggered when the scheduler has stopped
virtual void finish_threads(void)
finishes all threads used to perform work
bool is_running(void) const
returns true if the scheduler is running
virtual void stop_threads(void)
stops all threads used to perform work
virtual void stop_services(void)
stops all services used to schedule work
virtual ~one_to_one_scheduler()
virtual destructor
void set_logger(logger log_ptr)
sets the logger to be used
bool m_is_running
true if the thread scheduler is running
static const boost::uint32_t DEFAULT_NUM_THREADS
default number of worker threads in the thread pool
boost::asio::deadline_timer m_timer
timer used to periodically check for shutdown
single_service_scheduler(void)
constructs a new single_service_scheduler
multi_thread_scheduler(void)
constructs a new single_service_scheduler
virtual void finish_services(void)
finishes all services used to schedule work
virtual void stop_services(void)
stops all services used to schedule work
static void sleep(ConditionType &wakeup_condition, LockType &wakeup_lock, boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
boost::condition m_no_more_active_users
condition triggered when there are no more active users
virtual void finish_services(void)
finishes all services used to schedule work
boost::asio::io_service m_service
service used to manage async I/O events
static const boost::uint32_t MICROSEC_IN_SECOND
number of microseconds in one full second (10 ^ 6)
boost::uint32_t m_num_threads
total number of worker threads in the pool
virtual boost::asio::io_service & get_io_service(void)
returns an async I/O service used to schedule work
ThreadPool m_thread_pool
pool of threads used to perform work
virtual ~scheduler()
virtual destructor
virtual boost::asio::io_service & get_io_service(void)
returns an async I/O service used to schedule work
scheduler(void)
constructs a new scheduler
boost::mutex m_mutex
mutex to make class thread-safe
static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
static const boost::uint32_t NSEC_IN_SECOND
number of nanoseconds in one full second (10 ^ 9)
logger m_logger
primary logging interface used by this class
boost::uint32_t get_num_threads(void) const
returns the number of threads currently in use
boost::uint32_t m_active_users
the scheduler will not shutdown until there are no more active users
void set_num_threads(const boost::uint32_t n)
sets the number of threads to be used (these are shared by all servers)
static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS
number of seconds a timer should wait for to keep the IO services running
std::vector< boost::shared_ptr< service_pair_type > > service_pool_type
typedef for a pool of IO services
boost::uint32_t m_next_service
the next service to use for scheduling work
std::vector< boost::shared_ptr< boost::thread > > ThreadPool
typedef for a pool of worker threads
typedef for a pair object where first is an IO service and second is a deadline timer ...
virtual void finish_services(void)
finishes all services used to schedule work
virtual boost::asio::io_service & get_io_service(boost::uint32_t n)
virtual void stop_services(void)
stops all services used to schedule work
virtual ~single_service_scheduler()
virtual destructor
logger get_logger(void)
returns the logger currently in use
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
virtual void stop_threads(void)
stops all threads used to perform work
service_pool_type m_service_pool
pool of IO services used to schedule work
virtual void finish_threads(void)
finishes all threads used to perform work
one_to_one_scheduler(void)
constructs a new one_to_one_scheduler