10 #ifndef __PION_PIONSCHEDULER_HEADER__
11 #define __PION_PIONSCHEDULER_HEADER__
14 #include <boost/asio.hpp>
15 #include <boost/bind.hpp>
16 #include <boost/function/function0.hpp>
17 #include <boost/cstdint.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/noncopyable.hpp>
20 #include <boost/thread/thread.hpp>
21 #include <boost/thread/mutex.hpp>
22 #include <boost/thread/xtime.hpp>
23 #include <boost/thread/condition.hpp>
24 #include <pion/PionConfig.hpp>
25 #include <pion/PionException.hpp>
26 #include <pion/PionLogger.hpp>
35 private boost::noncopyable
41 : m_logger(PION_GET_LOGGER(
"pion.PionScheduler")),
42 m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
52 virtual void shutdown(
void);
60 void addActiveUser(
void);
63 void removeActiveUser(
void);
66 inline bool isRunning(
void)
const {
return m_is_running; }
69 inline void setNumThreads(
const boost::uint32_t n) { m_num_threads = n; }
72 inline boost::uint32_t
getNumThreads(
void)
const {
return m_num_threads; }
81 virtual boost::asio::io_service& getIOService(
void) = 0;
88 virtual void post(boost::function0<void> work_func) {
89 getIOService().post(work_func);
98 void keepRunning(boost::asio::io_service& my_service,
99 boost::asio::deadline_timer& my_timer);
107 inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
108 boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec));
109 boost::thread::sleep(wakeup_time);
121 template <
typename ConditionType,
typename LockType>
122 inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
123 boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
125 boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec));
126 wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
131 void processServiceWork(boost::asio::io_service& service);
144 static boost::xtime getWakeupTime(boost::uint32_t sleep_sec,
145 boost::uint32_t sleep_nsec);
215 if (! m_thread_pool.empty()) {
216 PION_LOG_DEBUG(m_logger,
"Waiting for threads to shutdown");
219 boost::thread current_thread;
220 for (ThreadPool::iterator i = m_thread_pool.begin();
221 i != m_thread_pool.end(); ++i)
225 if (**i != current_thread) (*i)->join();
235 typedef std::vector<boost::shared_ptr<boost::thread> >
ThreadPool;
253 : m_service(), m_timer(m_service)
260 virtual boost::asio::io_service&
getIOService(
void) {
return m_service; }
263 virtual void startup(
void);
293 : m_service_pool(), m_next_service(0)
301 boost::mutex::scoped_lock scheduler_lock(m_mutex);
302 while (m_service_pool.size() < m_num_threads) {
303 boost::shared_ptr<ServicePair> service_ptr(
new ServicePair());
304 m_service_pool.push_back(service_ptr);
306 if (++m_next_service >= m_num_threads)
308 PION_ASSERT(m_next_service < m_num_threads);
309 return m_service_pool[m_next_service]->first;
319 PION_ASSERT(n < m_num_threads);
320 PION_ASSERT(n < m_service_pool.size());
321 return m_service_pool[n]->first;
325 virtual void startup(
void);
332 for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
344 boost::asio::io_service first;
345 boost::asio::deadline_timer second;