Fawkes API  Fawkes Development Version
openprs_server_proxy.cpp
1 
2 /***************************************************************************
3  * openprs_server_proxy.h - OpenPRS server proxy
4  *
5  * Created: Tue Aug 19 16:59:27 2014
6  * Copyright 2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version. A runtime exception applies to
13  * this software (see LICENSE.GPL_WRE file mentioned below for details).
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21  */
22 
23 #include "openprs_server_proxy.h"
24 
25 #include <core/exception.h>
26 #include <core/exceptions/system.h>
27 #include <core/threading/mutex_locker.h>
28 #include <logging/logger.h>
29 #include <boost/bind.hpp>
30 #include <boost/lexical_cast.hpp>
31 
32 using namespace boost::asio;
33 
34 // Types copied from OPRS because they are not public there
35 /// @cond EXTERN
36 namespace OPRS {
37  typedef enum {MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
38  typedef enum {REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED} Register_Type;
39  typedef enum {MESSAGES_PT, STRINGS_PT} Protocol_Type;
40 }
41 /// @endcond
42 
43 namespace fawkes {
44 #if 0 /* just to make Emacs auto-indent happy */
45 }
46 #endif
47 
48 /** @class OpenPRSServerProxy "openprs_server_proxy.h"
49  * Proxy for the OpenPRS server communication.
50  * Using this proxy allows to inject commands into the communication between
51  * oprs-server and oprs (or xoprs).
52  * @author Tim Niemueller
53  */
54 
55 
56 /** Constructor.
57  * @param tcp_port port to listen on for incoming connections
58  * @param server_host host of oprs-server to connect to
59  * @param server_port TCP port that oprs-server listens on
60  * @param logger logger for informational messages
61  */
62 OpenPRSServerProxy::OpenPRSServerProxy(unsigned short tcp_port,
63  const std::string &server_host, unsigned short server_port,
64  fawkes::Logger *logger)
65  : io_service_work_(io_service_), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
66  server_host_(server_host), server_port_(server_port), logger_(logger)
67 {
68  acceptor_.set_option(socket_base::reuse_address(true));
69  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
70  start_accept();
71 }
72 
73 
74 /** Destructor. */
76 {
77  io_service_.stop();
78  io_service_thread_.join();
79 }
80 
81 
82 /** Check if a kernel connected to the proxy.
83  * @param kernel_name name of the kernel to look for
84  * @return true if the kernel connected, false otherwise
85  */
86 bool
87 OpenPRSServerProxy::has_kernel(const std::string &kernel_name)
88 {
89  auto map_it = find_if(mappings_.begin(), mappings_.end(),
90  [&kernel_name] (const Mapping::Ptr &mapping)
91  { return mapping->client_name == kernel_name; });
92  return (map_it != mappings_.end());
93 }
94 
95 
96 OpenPRSServerProxy::Mapping::Ptr
97 OpenPRSServerProxy::find_mapping(const std::string &recipient)
98 {
99  auto map_it = find_if(mappings_.begin(), mappings_.end(),
100  [&recipient] (const Mapping::Ptr &mapping)
101  { return mapping->client_name == recipient; });
102  if (map_it != mappings_.end()) {
103  return *map_it;
104  } else {
105  throw Exception("Client %s is not connected to OpenPRS server proxy", recipient.c_str());
106  }
107 }
108 
109 /** Transmit a command to an OpenPRS kernel.
110  * This works equivalent to the transmit oprs-server console command.
111  * @param recipient OpenPRS kernel name to send to
112  * @param command command to send, cf. OpenPRS manual for valid commands
113  */
114 void
115 OpenPRSServerProxy::transmit_command(const std::string &recipient, const std::string &command)
116 {
117  MutexLocker lock(mappings_.mutex());
118  Mapping::Ptr mapping = find_mapping(recipient);
119  mapping->transmit_command(command);
120 }
121 
122 
123 /** Transmit a command to an OpenPRS kernel.
124  * This works equivalent to the transmit oprs-server console command.
125  * This function allows to pass a format according to the sprintf()
126  * format and its arguments.
127  * @param recipient OpenPRS kernel name to send to
128  * @param format format string for the command, must be followed by the
129  * appropriate number and types of arguments.
130  */
131 void
132 OpenPRSServerProxy::transmit_command_f(const std::string &recipient, const char *format, ...)
133 {
134  MutexLocker lock(mappings_.mutex());
135  Mapping::Ptr mapping = find_mapping(recipient);
136 
137  va_list arg;
138  va_start(arg, format);
139 
140  char *msg;
141  if (vasprintf(&msg, format, arg) == -1) {
142  throw OutOfMemoryException("Cannot format OpenPRS client command string");
143  }
144  va_end(arg);
145  std::string command = msg;
146  free(msg);
147 
148  mapping->transmit_command(command);
149 }
150 
151 /** Transmit a command to an OpenPRS kernel.
152  * This works equivalent to the transmit oprs-server console command.
153  * This function allows to pass a format according to the sprintf()
154  * format and its arguments. The arguments are read from the @p arg list.
155  * @param recipient OpenPRS kernel name to send to
156  * @param format format string for the command, must be followed by the
157  * appropriate number and types of arguments.
158  * @param arg argument list for the string format
159  */
160 void
161 OpenPRSServerProxy::transmit_command_v(const std::string &recipient, const char *format, va_list arg)
162 {
163  MutexLocker lock(mappings_.mutex());
164  Mapping::Ptr mapping = find_mapping(recipient);
165 
166  char *msg;
167  if (vasprintf(&msg, format, arg) == -1) {
168  throw OutOfMemoryException("Cannot format OpenPRS client command string");
169  }
170  std::string command = msg;
171  free(msg);
172 
173  mapping->transmit_command(command);
174 }
175 
176 
177 /** Start accepting connections. */
178 void
179 OpenPRSServerProxy::start_accept()
180 {
181  Mapping::Ptr mapping(new Mapping(io_service_, server_host_, server_port_, logger_));
182  acceptor_.async_accept(mapping->client_socket,
183  boost::bind(&OpenPRSServerProxy::handle_accept, this,
184  mapping, boost::asio::placeholders::error));
185 }
186 
187 void
188 OpenPRSServerProxy::handle_accept(Mapping::Ptr mapping,
189  const boost::system::error_code& error)
190 {
191  if (! error) {
192  MutexLocker lock(mappings_.mutex());
193  mappings_.push_back(mapping);
194  mapping->start();
195  }
196 
197  start_accept();
198 }
199 
200 
201 OpenPRSServerProxy::Mapping::Mapping(boost::asio::io_service &io_service,
202  const std::string &server_host, unsigned short server_port,
203  fawkes::Logger *logger)
204  : io_service_(io_service), resolver_(io_service_),
205  server_host_(server_host), server_port_(server_port), logger_(logger),
206  client_socket(io_service_), server_socket(io_service_)
207 {
208 }
209 
210 
211 /** Destruct mapping.
212  * This closes both, client and server sockets. This destructor
213  * assumes that the io_service has been cancelled.
214  */
215 OpenPRSServerProxy::Mapping::~Mapping()
216 {
217  boost::system::error_code err;
218  client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
219  client_socket.close();
220  server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
221  server_socket.close();
222 }
223 
224 
225 /** A client has connected, start this mapping. */
226 void
227 OpenPRSServerProxy::Mapping::start()
228 {
229  logger_->log_info("OPRS-server-proxy", "Client connected, connecting to server");
230  ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
231  resolver_.async_resolve(query,
232  boost::bind(&OpenPRSServerProxy::Mapping::handle_resolve, this,
233  boost::asio::placeholders::error,
234  boost::asio::placeholders::iterator));
235 
236 }
237 
238 
239 bool
240 OpenPRSServerProxy::Mapping::alive() const
241 {
242  return client_socket.is_open();
243 }
244 
245 
246 void
247 OpenPRSServerProxy::Mapping::disconnect()
248 {
249  logger_->log_info("OPRS-server-proxy", "Disconnecting %s", client_name.c_str());
250  boost::system::error_code ec;
251  client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
252  client_socket.close();
253 }
254 
255 
256 void
257 OpenPRSServerProxy::Mapping::handle_resolve(const boost::system::error_code& err,
258  ip::tcp::resolver::iterator endpoint_iterator)
259 {
260  if (! err) {
261  // Attempt a connection to each endpoint in the list until we
262  // successfully establish a connection.
263 #if BOOST_ASIO_VERSION > 100409
264  boost::asio::async_connect(server_socket, endpoint_iterator,
265 #else
266  server_socket.async_connect(*endpoint_iterator,
267 #endif
268  boost::bind(&OpenPRSServerProxy::Mapping::handle_connect, this,
269  boost::asio::placeholders::error));
270  } else {
271  disconnect();
272  }
273 }
274 
275 void
276 OpenPRSServerProxy::Mapping::handle_connect(const boost::system::error_code &err)
277 {
278  if (! err) {
279 
280  try {
281  // forward greeting
282  std::string greeting = read_string_from_socket(server_socket);
283  logger_->log_info("OPRS-server-proxy", "Forwarding greeting '%s'", greeting.c_str());
284  write_string_to_socket(client_socket, greeting);
285 
286  int client_pid = 0;
287  int client_use_x = 0;
288 
289  logger_->log_info("OPRS-server-proxy", "Reading client details");
290  // now read connection details
291  client_name = read_string_from_socket(client_socket);
292  client_pid = read_int_from_socket(client_socket);
293  client_use_x = read_int_from_socket(client_socket);
294 
295  logger_->log_info("OPRS-server-proxy", "Got client info: %s %i %s",
296  client_name.c_str(), client_pid, client_use_x ? "XOPRS" : "OPRS");
297 
298  // forward to server
299  write_string_to_socket(server_socket, client_name);
300  write_int_to_socket(server_socket, client_pid);
301  write_int_to_socket(server_socket, client_use_x);
302 
303  start_recv_client();
304  start_recv_server();
305  } catch (Exception &e) {
306  disconnect();
307  }
308  } else {
309  disconnect();
310  }
311 }
312 
313 
314 void
315 OpenPRSServerProxy::Mapping::start_recv_client()
316 {
317  boost::asio::async_read(client_socket,
318  boost::asio::buffer(&client_in_num_completions_, sizeof(client_in_num_completions_)),
319  boost::bind(&OpenPRSServerProxy::Mapping::handle_recv_client,
320  this, boost::asio::placeholders::error));
321 }
322 
323 void
324 OpenPRSServerProxy::Mapping::start_recv_server()
325 {
326  boost::asio::async_read_until(server_socket, server_buffer_, '\n',
327  boost::bind(&OpenPRSServerProxy::Mapping::handle_recv_server,
328  this, boost::asio::placeholders::error));
329 }
330 
331 
332 void
333 OpenPRSServerProxy::Mapping::handle_recv_server(const boost::system::error_code &err)
334 {
335  if (! err) {
336  std::string line;
337  std::istream in_stream(&server_buffer_);
338  std::getline(in_stream, line);
339 
340  logger_->log_info("OPRS-server-proxy", "Forwarding S->C line '%s'", line.c_str());
341  write_string_newline_to_socket(client_socket, line);
342 
343  start_recv_server();
344  } else {
345  disconnect();
346  }
347 }
348 
349 
350 void
351 OpenPRSServerProxy::Mapping::handle_recv_client(const boost::system::error_code &err)
352 {
353  if (! err) {
354  client_in_num_completions_ = ntohl(client_in_num_completions_);
355  for (int i = 0; i < client_in_num_completions_; ++i) {
356  std::string c = read_string_from_socket(client_socket);
357  write_string_to_socket(server_socket, c);
358  }
359 
360  start_recv_client();
361  } else {
362  disconnect();
363  }
364 }
365 
366 
367 void
368 OpenPRSServerProxy::Mapping::transmit_command(const std::string &command)
369 {
370  write_string_newline_to_socket(client_socket, command);
371 }
372 
373 /** Read an int from a given socket.
374  * @param socket socket to read from
375  * @return read value
376  */
377 int
378 OpenPRSServerProxy::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
379 {
380  int32_t value;
381  boost::system::error_code ec;
382  boost::asio::read(socket, boost::asio::buffer(&value, sizeof(value)), ec);
383  if (ec) {
384  throw Exception("Failed to read int from socket: %s", ec.message().c_str());
385  } else {
386  return ntohl(value);
387  }
388 }
389 
390 /** Read a string from a given socket.
391  * @param socket socket to read from
392  * @return read value
393  */
394 std::string
395 OpenPRSServerProxy::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
396 {
397  uint32_t s_size = 0;
398  boost::system::error_code ec;
399  boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
400  if (ec) {
401  throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
402  }
403  s_size = ntohl(s_size);
404 
405  char s[s_size + 1];
406  boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
407  if (ec) {
408  throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
409  }
410  s[s_size] = 0;
411 
412  return s;
413 }
414 
415 
416 /** Write an int to a given socket.
417  * @param socket socket to write to
418  * @param i value to write
419  */
420 void
421 OpenPRSServerProxy::write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
422 {
423  boost::system::error_code ec;
424  int32_t value = htonl(i);
425  boost::asio::write(socket, boost::asio::buffer(&value, sizeof(value)), ec);
426  if (ec) {
427  throw Exception("Failed to write int to socket: %s", ec.message().c_str());
428  }
429 }
430 
431 /** Write a string to a given socket.
432  * @param socket socket to write to
433  * @param str string value to write
434  */
435 void
436 OpenPRSServerProxy::write_string_to_socket(boost::asio::ip::tcp::socket &socket,
437  const std::string &str)
438 {
439  boost::system::error_code ec;
440  uint32_t s_size = htonl(str.size());
441  std::array<boost::asio::const_buffer, 2> buffers;
442  buffers[0] = boost::asio::buffer(&s_size, sizeof(s_size));
443  buffers[1] = boost::asio::buffer(str.c_str(), str.size());
444 
445  boost::asio::write(socket, buffers, ec);
446  if (ec) {
447  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
448  }
449 }
450 
451 
452 /** Write a string followed by a newline character to a given socket.
453  * @param socket socket to write to
454  * @param str string value to write
455  */
456 void
457 OpenPRSServerProxy::write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket,
458  const std::string &str)
459 {
460  boost::system::error_code ec;
461  std::string s = str + "\n";
462  boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
463  if (ec) {
464  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
465  }
466 }
467 
468 
469 } // end namespace fawkes
void transmit_command_v(const std::string &client_name, const char *format, va_list arg)
Transmit a command to an OpenPRS kernel.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
static void write_string_to_socket(boost::asio::ip::tcp::socket &socket, const std::string &str)
Write a string to a given socket.
Fawkes library namespace.
Mutex locking helper.
Definition: mutex_locker.h:33
static void write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket, const std::string &str)
Write a string followed by a newline character to a given socket.
void transmit_command(const std::string &client_name, const std::string &command)
Transmit a command to an OpenPRS kernel.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_list.h:182
virtual ~OpenPRSServerProxy()
Destructor.
Base class for exceptions in Fawkes.
Definition: exception.h:36
static void write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
Write an int to a given socket.
static std::string read_string_from_socket(boost::asio::ip::tcp::socket &socket)
Read a string from a given socket.
static int read_int_from_socket(boost::asio::ip::tcp::socket &socket)
Read an int from a given socket.
bool has_kernel(const std::string &kernel_name)
Check if a kernel connected to the proxy.
void transmit_command_f(const std::string &client_name, const char *format,...)
Transmit a command to an OpenPRS kernel.
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Interface for logging.
Definition: logger.h:34