Fawkes API  Fawkes Development Version
openprs_thread.cpp
1 
2 /***************************************************************************
3  * openprs_thread.cpp - OpenPRS environment providing Thread
4  *
5  * Created: Thu Aug 14 15:52:35 2014
6  * Copyright 2014-2015 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "openprs_thread.h"
23 #include "utils/openprs_server_proxy.h"
24 #include "utils/openprs_mp_proxy.h"
25 #include "utils/proc.h"
26 
27 #include <logging/logger.h>
28 #include <baseapp/run.h>
29 #include <netcomm/fawkes/network_manager.h>
30 
31 #include <unistd.h>
32 #include <cstdio>
33 #include <cerrno>
34 #include <cstdlib>
35 #include <csignal>
36 
37 #include <boost/format.hpp>
38 #include <boost/bind.hpp>
39 #include <boost/lambda/lambda.hpp>
40 #include <boost/lambda/bind.hpp>
41 
42 using namespace fawkes;
43 
44 /** @class OpenPRSThread "openprs_thread.h"
45  * OpenPRS environment thread.
46  *
47  * @author Tim Niemueller
48  */
49 
50 /** Constructor. */
52  : Thread("OpenPRSThread", Thread::OPMODE_WAITFORWAKEUP),
53  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE),
54  AspectProviderAspect(inifin_list()),
55  server_socket_(io_service_), deadline_(io_service_)
56 {
57 }
58 
59 
60 /** Destructor. */
62 {
63 }
64 
65 
66 void
68 {
69  proc_srv_ = NULL;
70  proc_mp_ = NULL;
71  openprs_server_proxy_ = NULL;
72  openprs_mp_proxy_ = NULL;
73 
74  char hostname[HOST_NAME_MAX];
75  if (gethostname(hostname, HOST_NAME_MAX) == -1) {
76  strcpy(hostname, "localhost");
77  }
78 
79  cfg_mp_run_ = config->get_bool("/openprs/message-passer/run");
80  cfg_mp_bin_ = config->get_string("/openprs/message-passer/binary");
81  try {
82  cfg_mp_host_ = config->get_string("/openprs/message-passer/hostname");
83  } catch (Exception &e) {
84  cfg_mp_host_ = hostname;
85  }
86  cfg_mp_port_ = config->get_uint("/openprs/message-passer/tcp-port");
87  cfg_mp_port_s_ = boost::str(boost::format("%u") % cfg_mp_port_);
88  cfg_mp_use_proxy_ = config->get_bool("/openprs/message-passer/use-proxy");
89  cfg_mp_proxy_port_ = config->get_uint("/openprs/message-passer/proxy-tcp-port");
90 
91  cfg_server_run_ = config->get_bool("/openprs/server/run");
92  cfg_server_bin_ = config->get_string("/openprs/server/binary");
93  try {
94  cfg_server_host_ = config->get_string("/openprs/server/hostname");
95  } catch (Exception &e) {
96  cfg_server_host_ = hostname;
97  }
98  cfg_server_port_ = config->get_uint("/openprs/server/tcp-port");
99  cfg_server_port_s_ = boost::str(boost::format("%u") % cfg_server_port_);
100  cfg_server_proxy_port_ = config->get_uint("/openprs/server/proxy-tcp-port");
101 
102  cfg_server_timeout_ = config->get_float("/openprs/server/timeout");
103  cfg_kernel_timeout_ = config->get_float("/openprs/kernels/start-timeout");
104 
105  openprs_aspect_inifin_.set_kernel_timeout(cfg_kernel_timeout_);
106 
107  if (cfg_mp_run_) {
108  logger->log_warn(name(), "Running OPRS-mp");
109  const char *filename = cfg_mp_bin_.c_str();
110  const char *argv[] = { filename, "-j", cfg_mp_port_s_.c_str(), NULL };
111  proc_mp_ = new SubProcess("OPRS-mp", filename, argv, NULL, logger);
112  } else {
113  proc_mp_ = NULL;
114  }
115 
116  if (cfg_server_run_) {
117  logger->log_warn(name(), "Running OPRS-server");
118  const char *filename = cfg_server_bin_.c_str();
119  const char *argv[] = { filename,
120  "-j", cfg_mp_port_s_.c_str(),
121  "-i", cfg_server_port_s_.c_str(),
122  "-l", "lower", NULL };
123  proc_srv_ = new SubProcess("OPRS-server", filename, argv, NULL, logger);
124  } else {
125  proc_srv_ = NULL;
126  }
127 
128 #if BOOST_VERSION >= 104800
129  logger->log_info(name(), "Verifying OPRS-server availability");
130 
131  boost::asio::ip::tcp::resolver resolver(io_service_);
132  boost::asio::ip::tcp::resolver::query query(cfg_server_host_, cfg_server_port_s_);
133  boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
134 
135  // this is just the overly complicated way to get a timeout on
136  // a synchronous connect, cf.
137  // http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/example/cpp03/timeouts/blocking_tcp_client.cpp
138  deadline_.expires_at(boost::posix_time::pos_infin);
139  check_deadline(deadline_, server_socket_);
140 
141  deadline_.expires_from_now(boost::posix_time::seconds(cfg_server_timeout_));
142 
143  boost::system::error_code ec = boost::asio::error::would_block;
144  server_socket_.async_connect(iter->endpoint(),
145  boost::lambda::var(ec) = boost::lambda::_1);
146 
147  // Block until the asynchronous operation has completed.
148  do {
149  io_service_.run_one();
150 #if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
151  // Boost 1.54 has a bug that causes async_connect to report success
152  // if it cannot connect at all to the other side, cf.
153  // https://svn.boost.org/trac/boost/ticket/8795
154  // Work around by explicitly checking for connected status
155  if (! ec) {
156  server_socket_.remote_endpoint(ec);
157  if (ec == boost::system::errc::not_connected) {
158  // continue waiting for timeout
159  ec = boost::asio::error::would_block;
160  server_socket_.async_connect(iter->endpoint(),
161  boost::lambda::var(ec) = boost::lambda::_1);
162  }
163  }
164 #endif
165  } while (ec == boost::asio::error::would_block);
166 
167  // Determine whether a connection was successfully established.
168  if (ec || ! server_socket_.is_open()) {
169  finalize();
170  if (ec.value() == boost::system::errc::operation_canceled) {
171  throw Exception("OpenPRS waiting for server to come up timed out");
172  } else {
173  throw Exception("OpenPRS waiting for server failed: %s", ec.message().c_str());
174  }
175  }
176 #else
177  logger->log_warn(name(), "Cannot verify server aliveness, Boost too old");
178 #endif
179 
180  boost::asio::socket_base::keep_alive keep_alive_option(true);
181  server_socket_.set_option(keep_alive_option);
182 
183  // receive greeting
184  std::string greeting = OpenPRSServerProxy::read_string_from_socket(server_socket_);
185  //logger->log_info(name(), "Received server greeting: %s", greeting.c_str());
186  // send our greeting
187  OpenPRSServerProxy::write_string_to_socket(server_socket_, "fawkes");
188  OpenPRSServerProxy::write_int_to_socket(server_socket_, getpid());
189  OpenPRSServerProxy::write_int_to_socket(server_socket_, 0);
190 
191  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
192 
193  logger->log_info(name(), "Starting OpenPRS server proxy");
194 
195  openprs_server_proxy_ = new OpenPRSServerProxy(cfg_server_proxy_port_,
196  cfg_server_host_, cfg_server_port_, logger);
197 
198  if (cfg_mp_use_proxy_) {
199  logger->log_info(name(), "Starting OpenPRS message passer proxy");
200  openprs_mp_proxy_ = new OpenPRSMessagePasserProxy(cfg_mp_proxy_port_,
201  cfg_mp_host_, cfg_mp_port_, logger);
202  } else {
203  openprs_mp_proxy_ = NULL;
204  }
205 
206  logger->log_warn(name(), "Initializing kernel manager");
207  openprs_kernel_mgr_ = new OpenPRSKernelManager(hostname, cfg_server_proxy_port_,
208  cfg_mp_use_proxy_ ? hostname : cfg_mp_host_,
209  cfg_mp_use_proxy_ ? cfg_mp_proxy_port_ : cfg_mp_port_,
210  logger, clock, config);
211  openprs_aspect_inifin_.prepare("localhost", fawkes::runtime::network_manager->fawkes_port(),
212  openprs_kernel_mgr_, openprs_server_proxy_, openprs_mp_proxy_);
213  openprs_manager_aspect_inifin_.set_manager(openprs_kernel_mgr_);
214 }
215 
216 void
218 {
219  server_socket_.close();
220  io_service_.stop();
221  if (io_service_thread_.joinable()) {
222  io_service_thread_.join();
223  }
224 
225  if (proc_srv_) {
226  logger->log_info(name(), "Killing OpenPRS server");
227  proc_srv_->kill(SIGINT);
228  }
229  if (proc_mp_) {
230  logger->log_info(name(), "Killing OpenPRS message passer");
231  proc_mp_->kill(SIGINT);
232  }
233 
234  delete proc_srv_;
235  delete proc_mp_;
236 
237  delete openprs_server_proxy_;
238  delete openprs_mp_proxy_;
239  openprs_kernel_mgr_.clear();
240 }
241 
242 
243 void
245 {
246  if (proc_srv_) proc_srv_->check_proc();
247  if (proc_mp_) proc_mp_->check_proc();
248 }
249 
250 const std::list<AspectIniFin *>
251 OpenPRSThread::inifin_list()
252 {
253  std::list<AspectIniFin *> rv;
254  rv.push_back(&openprs_aspect_inifin_);
255  rv.push_back(&openprs_manager_aspect_inifin_);
256  return rv;
257 }
258 
259 
260 bool
261 OpenPRSThread::server_alive()
262 {
263  if (server_socket_.is_open()) {
264  boost::system::error_code ec;
265  server_socket_.remote_endpoint(ec);
266  return !ec;
267  } else {
268  return false;
269  }
270 }
271 
272 
273 void
274 OpenPRSThread::check_deadline(boost::asio::deadline_timer &deadline,
275  boost::asio::ip::tcp::socket &socket)
276 {
277  if (deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
278  socket.close();
279  deadline.expires_at(boost::posix_time::pos_infin);
280  }
281 
282 #if BOOST_VERSION >= 104800
283  deadline.async_wait(boost::lambda::bind(&OpenPRSThread::check_deadline, this,
284  boost::ref(deadline), boost::ref(socket)));
285 #else
286  deadline.async_wait(boost::bind(&OpenPRSThread::check_deadline, this,
287  boost::ref(deadline), boost::ref(socket)));
288 #endif
289 }
OpenPRSThread()
Constructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Proxy for the OpenPRS server communication.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
void kill(int signum)
Send a signal to the process.
Definition: proc.cpp:100
void check_proc()
Check if the process is still alive.
Definition: proc.cpp:240
void set_manager(LockPtr< OpenPRSKernelManager > &clips_kernel_mgr)
Set OpenPRS environment manger.
Thread class encapsulation of pthreads.
Definition: thread.h:42
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
virtual void init()
Initialize the thread.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
Thread aspect to use blocked timing.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void finalize()
Finalize the thread.
const char * name() const
Get name of thread.
Definition: thread.h:95
void prepare(const std::string &fawkes_host, unsigned short fawkes_port, LockPtr< OpenPRSKernelManager > &openprs_kernel_mgr, OpenPRSServerProxy *openprs_server_proxy, OpenPRSMessagePasserProxy *openprs_mp_proxy)
Prepare OpenPRS aspect initializer.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Thread aspect provide a new aspect.
void clear()
Set underlying instance to 0, decrementing reference count of existing instance appropriately.
Definition: lockptr.h:492
Sub-process execution with stdin/stdout/stderr redirection.
Definition: proc.h:39
void set_kernel_timeout(float timeout_sec)
Set timeout for kernel creation.
virtual void loop()
Code to execute in the thread.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Proxy for the OpenPRS server communication.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:44
virtual ~OpenPRSThread()
Destructor.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.