pion-net  4.0.9
PionScheduler.cpp
1 // -----------------------------------------------------------------------
2 // pion-common: a collection of common libraries used by the Pion Platform
3 // -----------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #include <boost/date_time/posix_time/posix_time_duration.hpp>
11 #include <pion/PionScheduler.hpp>
12 
13 namespace pion { // begin namespace pion
14 
15 
16 // static members of PionScheduler
17 
18 const boost::uint32_t PionScheduler::DEFAULT_NUM_THREADS = 8;
19 const boost::uint32_t PionScheduler::NSEC_IN_SECOND = 1000000000; // (10^9)
20 const boost::uint32_t PionScheduler::MICROSEC_IN_SECOND = 1000000; // (10^6)
21 const boost::uint32_t PionScheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
22 
23 
24 // PionScheduler member functions
25 
27 {
28  // lock mutex for thread safety
29  boost::mutex::scoped_lock scheduler_lock(m_mutex);
30 
31  if (m_is_running) {
32 
33  PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
34 
35  while (m_active_users > 0) {
36  // first, wait for any active users to exit
37  PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
38  m_no_more_active_users.wait(scheduler_lock);
39  }
40 
41  // shut everything down
42  m_is_running = false;
43  stopServices();
44  stopThreads();
46  finishThreads();
47 
48  PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
49 
50  // Make sure anyone waiting on shutdown gets notified
51  m_scheduler_has_stopped.notify_all();
52 
53  } else {
54 
55  // stop and finish everything to be certain that no events are pending
56  stopServices();
57  stopThreads();
59  finishThreads();
60 
61  // Make sure anyone waiting on shutdown gets notified
62  // even if the scheduler did not startup successfully
63  m_scheduler_has_stopped.notify_all();
64  }
65 }
66 
68 {
69  boost::mutex::scoped_lock scheduler_lock(m_mutex);
70  while (m_is_running) {
71  // sleep until scheduler_has_stopped condition is signaled
72  m_scheduler_has_stopped.wait(scheduler_lock);
73  }
74 }
75 
76 void PionScheduler::keepRunning(boost::asio::io_service& my_service,
77  boost::asio::deadline_timer& my_timer)
78 {
79  if (m_is_running) {
80  // schedule this again to make sure the service doesn't complete
81  my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
82  my_timer.async_wait(boost::bind(&PionScheduler::keepRunning, this,
83  boost::ref(my_service), boost::ref(my_timer)));
84  }
85 }
86 
88 {
89  if (!m_is_running) startup();
90  boost::mutex::scoped_lock scheduler_lock(m_mutex);
92 }
93 
95 {
96  boost::mutex::scoped_lock scheduler_lock(m_mutex);
97  if (--m_active_users == 0)
98  m_no_more_active_users.notify_all();
99 }
100 
101 boost::xtime PionScheduler::getWakeupTime(boost::uint32_t sleep_sec,
102  boost::uint32_t sleep_nsec)
103 {
104  boost::xtime wakeup_time;
105 #ifdef TIME_UTC
106  boost::xtime_get(&wakeup_time, TIME_UTC);
107 #else
108  boost::xtime_get(&wakeup_time, boost::TIME_UTC);
109 #endif
110  wakeup_time.sec += sleep_sec;
111  wakeup_time.nsec += sleep_nsec;
112  if (static_cast<boost::uint32_t>(wakeup_time.nsec) >= NSEC_IN_SECOND) {
113  wakeup_time.sec++;
114  wakeup_time.nsec -= NSEC_IN_SECOND;
115  }
116  return wakeup_time;
117 }
118 
119 void PionScheduler::processServiceWork(boost::asio::io_service& service) {
120  while (m_is_running) {
121  try {
122  service.run();
123  } catch (std::exception& e) {
124  PION_LOG_ERROR(m_logger, e.what());
125  } catch (...) {
126  PION_LOG_ERROR(m_logger, "caught unrecognized exception");
127  }
128  }
129 }
130 
131 
132 // PionSingleServiceScheduler member functions
133 
135 {
136  // lock mutex for thread safety
137  boost::mutex::scoped_lock scheduler_lock(m_mutex);
138 
139  if (! m_is_running) {
140  PION_LOG_INFO(m_logger, "Starting thread scheduler");
141  m_is_running = true;
142 
143  // schedule a work item to make sure that the service doesn't complete
144  m_service.reset();
146 
147  // start multiple threads to handle async tasks
148  for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
149  boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
150  this, boost::ref(m_service)) ));
151  m_thread_pool.push_back(new_thread);
152  }
153  }
154 }
155 
156 
157 // PionOneToOneScheduler member functions
158 
160 {
161  // lock mutex for thread safety
162  boost::mutex::scoped_lock scheduler_lock(m_mutex);
163 
164  if (! m_is_running) {
165  PION_LOG_INFO(m_logger, "Starting thread scheduler");
166  m_is_running = true;
167 
168  // make sure there are enough services initialized
169  while (m_service_pool.size() < m_num_threads) {
170  boost::shared_ptr<ServicePair> service_ptr(new ServicePair());
171  m_service_pool.push_back(service_ptr);
172  }
173 
174  // schedule a work item for each service to make sure that it doesn't complete
175  for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
176  keepRunning((*i)->first, (*i)->second);
177  }
178 
179  // start multiple threads to handle async tasks
180  for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
181  boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
182  this, boost::ref(m_service_pool[n]->first)) ));
183  m_thread_pool.push_back(new_thread);
184  }
185  }
186 }
187 
188 
189 } // end namespace pion