22 #include "openprs_thread.h" 23 #include "utils/openprs_server_proxy.h" 24 #include "utils/openprs_mp_proxy.h" 25 #include "utils/proc.h" 27 #include <logging/logger.h> 28 #include <baseapp/run.h> 29 #include <netcomm/fawkes/network_manager.h> 37 #include <boost/format.hpp> 38 #include <boost/bind.hpp> 39 #include <boost/lambda/lambda.hpp> 40 #include <boost/lambda/bind.hpp> 55 server_socket_(io_service_), deadline_(io_service_)
71 openprs_server_proxy_ = NULL;
72 openprs_mp_proxy_ = NULL;
74 char hostname[HOST_NAME_MAX];
75 if (gethostname(hostname, HOST_NAME_MAX) == -1) {
76 strcpy(hostname,
"localhost");
84 cfg_mp_host_ = hostname;
86 cfg_mp_port_ =
config->
get_uint(
"/openprs/message-passer/tcp-port");
87 cfg_mp_port_s_ = boost::str(boost::format(
"%u") % cfg_mp_port_);
88 cfg_mp_use_proxy_ =
config->
get_bool(
"/openprs/message-passer/use-proxy");
89 cfg_mp_proxy_port_ =
config->
get_uint(
"/openprs/message-passer/proxy-tcp-port");
96 cfg_server_host_ = hostname;
99 cfg_server_port_s_ = boost::str(boost::format(
"%u") % cfg_server_port_);
100 cfg_server_proxy_port_ =
config->
get_uint(
"/openprs/server/proxy-tcp-port");
103 cfg_kernel_timeout_ =
config->
get_float(
"/openprs/kernels/start-timeout");
109 const char *filename = cfg_mp_bin_.c_str();
110 const char *argv[] = { filename,
"-j", cfg_mp_port_s_.c_str(), NULL };
116 if (cfg_server_run_) {
118 const char *filename = cfg_server_bin_.c_str();
119 const char *argv[] = { filename,
120 "-j", cfg_mp_port_s_.c_str(),
121 "-i", cfg_server_port_s_.c_str(),
122 "-l",
"lower", NULL };
128 #if BOOST_VERSION >= 104800 131 boost::asio::ip::tcp::resolver resolver(io_service_);
132 boost::asio::ip::tcp::resolver::query query(cfg_server_host_, cfg_server_port_s_);
133 boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
138 deadline_.expires_at(boost::posix_time::pos_infin);
139 check_deadline(deadline_, server_socket_);
141 deadline_.expires_from_now(boost::posix_time::seconds(cfg_server_timeout_));
143 boost::system::error_code ec = boost::asio::error::would_block;
144 server_socket_.async_connect(iter->endpoint(),
145 boost::lambda::var(ec) = boost::lambda::_1);
149 io_service_.run_one();
150 #if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500 156 server_socket_.remote_endpoint(ec);
157 if (ec == boost::system::errc::not_connected) {
159 ec = boost::asio::error::would_block;
160 server_socket_.async_connect(iter->endpoint(),
161 boost::lambda::var(ec) = boost::lambda::_1);
165 }
while (ec == boost::asio::error::would_block);
168 if (ec || ! server_socket_.is_open()) {
170 if (ec.value() == boost::system::errc::operation_canceled) {
171 throw Exception(
"OpenPRS waiting for server to come up timed out");
173 throw Exception(
"OpenPRS waiting for server failed: %s", ec.message().c_str());
180 boost::asio::socket_base::keep_alive keep_alive_option(
true);
181 server_socket_.set_option(keep_alive_option);
184 std::string greeting = OpenPRSServerProxy::read_string_from_socket(server_socket_);
187 OpenPRSServerProxy::write_string_to_socket(server_socket_,
"fawkes");
188 OpenPRSServerProxy::write_int_to_socket(server_socket_, getpid());
189 OpenPRSServerProxy::write_int_to_socket(server_socket_, 0);
191 io_service_thread_ = std::thread([
this]() { this->io_service_.run(); });
196 cfg_server_host_, cfg_server_port_,
logger);
198 if (cfg_mp_use_proxy_) {
201 cfg_mp_host_, cfg_mp_port_,
logger);
203 openprs_mp_proxy_ = NULL;
208 cfg_mp_use_proxy_ ? hostname : cfg_mp_host_,
209 cfg_mp_use_proxy_ ? cfg_mp_proxy_port_ : cfg_mp_port_,
211 openprs_aspect_inifin_.
prepare(
"localhost", fawkes::runtime::network_manager->fawkes_port(),
212 openprs_kernel_mgr_, openprs_server_proxy_, openprs_mp_proxy_);
213 openprs_manager_aspect_inifin_.
set_manager(openprs_kernel_mgr_);
219 server_socket_.close();
221 if (io_service_thread_.joinable()) {
222 io_service_thread_.join();
227 proc_srv_->
kill(SIGINT);
231 proc_mp_->
kill(SIGINT);
237 delete openprs_server_proxy_;
238 delete openprs_mp_proxy_;
239 openprs_kernel_mgr_.
clear();
250 const std::list<AspectIniFin *>
251 OpenPRSThread::inifin_list()
253 std::list<AspectIniFin *> rv;
254 rv.push_back(&openprs_aspect_inifin_);
255 rv.push_back(&openprs_manager_aspect_inifin_);
261 OpenPRSThread::server_alive()
263 if (server_socket_.is_open()) {
264 boost::system::error_code ec;
265 server_socket_.remote_endpoint(ec);
274 OpenPRSThread::check_deadline(boost::asio::deadline_timer &deadline,
275 boost::asio::ip::tcp::socket &socket)
277 if (deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
279 deadline.expires_at(boost::posix_time::pos_infin);
282 #if BOOST_VERSION >= 104800 283 deadline.async_wait(boost::lambda::bind(&OpenPRSThread::check_deadline,
this,
284 boost::ref(deadline), boost::ref(socket)));
286 deadline.async_wait(boost::bind(&OpenPRSThread::check_deadline,
this,
287 boost::ref(deadline), boost::ref(socket)));
OpenPRSThread()
Constructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
void kill(int signum)
Send a signal to the process.
void check_proc()
Check if the process is still alive.
void set_manager(LockPtr< OpenPRSKernelManager > &clips_kernel_mgr)
Set OpenPRS environment manger.
Thread class encapsulation of pthreads.
Logger * logger
This is the Logger member used to access the logger.
virtual void init()
Initialize the thread.
Clock * clock
By means of this member access to the clock is given.
Thread aspect to use blocked timing.
Base class for exceptions in Fawkes.
virtual void finalize()
Finalize the thread.
const char * name() const
Get name of thread.
void prepare(const std::string &fawkes_host, unsigned short fawkes_port, LockPtr< OpenPRSKernelManager > &openprs_kernel_mgr, OpenPRSServerProxy *openprs_server_proxy, OpenPRSMessagePasserProxy *openprs_mp_proxy)
Prepare OpenPRS aspect initializer.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Thread aspect provide a new aspect.
void clear()
Set underlying instance to 0, decrementing reference count of existing instance appropriately.
Sub-process execution with stdin/stdout/stderr redirection.
void set_kernel_timeout(float timeout_sec)
Set timeout for kernel creation.
virtual void loop()
Code to execute in the thread.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Proxy for the OpenPRS server communication.
Configuration * config
This is the Configuration member used to access the configuration.
virtual ~OpenPRSThread()
Destructor.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.