Fawkes API  Fawkes Development Version
openprs_mp_proxy.cpp
1 
2 /***************************************************************************
3  * openprs_mp_proxy.h - OpenPRS message passer proxy
4  *
5  * Created: Tue Aug 19 16:59:27 2014
6  * Copyright 2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version. A runtime exception applies to
13  * this software (see LICENSE.GPL_WRE file mentioned below for details).
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21  */
22 
23 #include "openprs_mp_proxy.h"
24 
25 #include <core/exception.h>
26 #include <logging/logger.h>
27 #include <boost/bind.hpp>
28 #include <boost/lexical_cast.hpp>
29 
30 using namespace boost::asio;
31 
32 // Types copied from OPRS because they are not public there
33 namespace OPRS {
34  typedef enum {MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
35  typedef enum {REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED} Register_Type;
36  typedef enum {MESSAGES_PT, STRINGS_PT} Protocol_Type;
37 }
38 
39 namespace fawkes {
40 #if 0 /* just to make Emacs auto-indent happy */
41 }
42 #endif
43 
44 /** @class OpenPRSMessagePasserProxy "openprs_mp_proxy.h"
45  * Proxy for the OpenPRS server communication.
46  * Using this proxy allows to inject commands into the communication between
47  * oprs-server and oprs (or xoprs).
48  * @author Tim Niemueller
49  */
50 
51 
52 /** Constructor.
53  * @param tcp_port port to listen on for incoming connections
54  * @param mp_host host of mp-oprs to connect to
55  * @param mp_port TCP port that mp-oprs listens on
56  * @param logger logger for informational messages
57  */
58 OpenPRSMessagePasserProxy::OpenPRSMessagePasserProxy(unsigned short tcp_port,
59  const std::string &mp_host, unsigned short mp_port,
60  fawkes::Logger *logger)
61  : io_service_work_(io_service_), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
62  mp_host_(mp_host), mp_port_(mp_port), logger_(logger)
63 {
64  acceptor_.set_option(socket_base::reuse_address(true));
65  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
66  start_accept();
67 }
68 
69 /** Destructor. */
71 {
72  io_service_.stop();
73  io_service_thread_.join();
74 }
75 
76 /** Start accepting connections. */
77 void
78 OpenPRSMessagePasserProxy::start_accept()
79 {
80  Mapping::Ptr mapping(new Mapping(io_service_, mp_host_, mp_port_, logger_));
81  acceptor_.async_accept(mapping->client_socket,
82  boost::bind(&OpenPRSMessagePasserProxy::handle_accept, this,
83  mapping, boost::asio::placeholders::error));
84 }
85 
86 void
87 OpenPRSMessagePasserProxy::handle_accept(Mapping::Ptr mapping,
88  const boost::system::error_code& error)
89 {
90  if (! error) {
91  mappings_.push_back(mapping);
92  mapping->start();
93  }
94 
95  start_accept();
96 }
97 
98 
99 OpenPRSMessagePasserProxy::Mapping::Mapping(boost::asio::io_service &io_service,
100  const std::string &mp_host, unsigned short mp_port,
101  fawkes::Logger *logger)
102  : io_service_(io_service), resolver_(io_service_),
103  server_host_(mp_host), server_port_(mp_port), logger_(logger),
104  client_socket(io_service_), server_socket(io_service_)
105 {
106 }
107 
108 
109 /** Destruct mapping.
110  * This closes both, client and server sockets. This destructor
111  * assumes that the io_service has been cancelled.
112  */
113 OpenPRSMessagePasserProxy::Mapping::~Mapping()
114 {
115  boost::system::error_code err;
116  client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
117  client_socket.close();
118  server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
119  server_socket.close();
120 }
121 
122 
123 /** A client has connected, start this mapping. */
124 void
125 OpenPRSMessagePasserProxy::Mapping::start()
126 {
127  client_prot = read_int_from_socket(client_socket);
128  client_name = read_string_from_socket(client_socket);
129 
130  logger_->log_info("OPRS-mp-proxy", "Client %s connected", client_name.c_str());
131 
132  ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
133  resolver_.async_resolve(query,
134  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_resolve, this,
135  boost::asio::placeholders::error,
136  boost::asio::placeholders::iterator));
137 
138 }
139 
140 
141 bool
142 OpenPRSMessagePasserProxy::Mapping::alive() const
143 {
144  return client_socket.is_open();
145 }
146 
147 
148 /** Disconnect this client. */
149 void
150 OpenPRSMessagePasserProxy::Mapping::disconnect()
151 {
152  disconnect("disconnect", "API call");
153 }
154 
155 
156 void
157 OpenPRSMessagePasserProxy::Mapping::disconnect(const char *where, const char *reason)
158 {
159  logger_->log_warn("OPRS-mp-proxy", "Client %s disconnected (%s: %s)",
160  client_name.c_str(), where, reason);
161  boost::system::error_code ec;
162  client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
163  client_socket.close();
164 }
165 
166 
167 void
168 OpenPRSMessagePasserProxy::Mapping::handle_resolve(const boost::system::error_code& err,
169  ip::tcp::resolver::iterator endpoint_iterator)
170 {
171  if (! err) {
172  // Attempt a connection to each endpoint in the list until we
173  // successfully establish a connection.
174 #if BOOST_ASIO_VERSION > 100409
175  boost::asio::async_connect(server_socket, endpoint_iterator,
176 #else
177  server_socket.async_connect(*endpoint_iterator,
178 #endif
179  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_connect, this,
180  boost::asio::placeholders::error));
181  } else {
182  disconnect("handle_resolve", err.message().c_str());
183  }
184 }
185 
186 void
187 OpenPRSMessagePasserProxy::Mapping::handle_connect(const boost::system::error_code &err)
188 {
189  if (! err) {
190  write_int_to_socket(server_socket, client_prot);
191  write_string_to_socket(server_socket, client_name);
192 
193  // asynchronously read registration reply
194  boost::asio::async_read(server_socket,
195  boost::asio::buffer(&server_in_reg_reply_, sizeof(server_in_reg_reply_)),
196  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply,
197  this, boost::asio::placeholders::error));
198 
199  } else {
200  disconnect("handle_connect", err.message().c_str());
201  }
202 }
203 
204 
205 void
206 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply(const boost::system::error_code &err)
207 {
208  write_int_to_socket(client_socket, server_in_reg_reply_);
209 
210  if (server_in_reg_reply_ == OPRS::REGISTER_OK) {
211  start_recv_client();
212  start_recv_server();
213  } else {
214  disconnect("recv_server_reg_reply", err.message().c_str());
215  }
216 }
217 
218 
219 void
220 OpenPRSMessagePasserProxy::Mapping::start_recv_client()
221 {
222  boost::asio::async_read(client_socket,
223  boost::asio::buffer(&client_in_msg_type_, sizeof(client_in_msg_type_)),
224  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_client,
225  this, boost::asio::placeholders::error));
226 }
227 
228 
229 void
230 OpenPRSMessagePasserProxy::Mapping::handle_recv_client(const boost::system::error_code &err)
231 {
232  if (! err) {
233  try {
234  std::vector<std::string> multicast_recipients;
235  std::string message;
236  std::string recipient;
237 
238  client_in_msg_type_ = ntohl(client_in_msg_type_);
239 
240  switch (client_in_msg_type_) {
241  case OPRS::DISCONNECT_MT:
242  logger_->log_info("OPRS-mp-proxy", "Disconnecting %s", client_name.c_str());
243  disconnect("recv_client", "Client disconnected");
244  return;
245 
246  case OPRS::MESSAGE_MT:
247  recipient = read_string_from_socket(client_socket);
248  break;
249 
250  case OPRS::MULTICAST_MT:
251  multicast_recipients.resize(read_int_from_socket(client_socket));
252  break;
253 
254  case OPRS::BROADCAST_MT: break; // nothing to do here
255 
256  default:
257  disconnect("recv_client", "Unknown message type");
258  return;
259  }
260 
261  message = read_string_from_socket(client_socket);
262 
263  if (client_in_msg_type_ == OPRS::MULTICAST_MT) {
264  for (size_t i = 0; i < multicast_recipients.size(); ++i) {
265  multicast_recipients[i] = read_string_from_socket(client_socket);
266  }
267  }
268 
269  // debug output
270  switch (client_in_msg_type_) {
271  case OPRS::MESSAGE_MT:
272  logger_->log_info("OPRS-mp-proxy", "Forwarding unicast %s->%s: '%s'",
273  client_name.c_str(), recipient.c_str(), message.c_str());
274  break;
275 
276  case OPRS::MULTICAST_MT:
277  {
278  std::string recipients;
279  for (size_t i = 0; i < multicast_recipients.size(); ++i) {
280  if (i > 0) recipients += ", ";
281  recipients += multicast_recipients[i];
282  }
283 
284  logger_->log_info("OPRS-mp-proxy", "Forwarding multicast %s->(%s): '%s'",
285  client_name.c_str(), recipients.c_str(), message.c_str());
286  }
287  break;
288 
289  case OPRS::BROADCAST_MT:
290  logger_->log_info("OPRS-mp-proxy", "Forwarding broadcast %s->*: '%s'",
291  client_name.c_str(), message.c_str());
292  break;
293 
294  default: break;
295  }
296 
297  // now re-send message to server
298  write_int_to_socket(server_socket, client_in_msg_type_);
299 
300  switch (client_in_msg_type_) {
301  case OPRS::MESSAGE_MT:
302  write_string_to_socket(server_socket, recipient);
303  write_string_to_socket(server_socket, message);
304  break;
305 
306  case OPRS::MULTICAST_MT:
307  write_string_to_socket(server_socket, message);
308  for (size_t i = 0; i < multicast_recipients.size(); ++i) {
309  write_string_to_socket(server_socket, multicast_recipients[i]);
310  }
311  break;
312 
313  case OPRS::BROADCAST_MT: // nothing to do here
314  write_string_to_socket(server_socket, message);
315  break;
316 
317  default: break; // cannot happen here anymore
318  }
319 
320  start_recv_client();
321  } catch (Exception &e) {
322  disconnect("recv_client", e.what_no_backtrace());
323  }
324  } else {
325  disconnect("recv_client", err.message().c_str());
326  }
327 }
328 
329 
330 void
331 OpenPRSMessagePasserProxy::Mapping::start_recv_server()
332 {
333  if (client_prot == OPRS::MESSAGES_PT) {
334  logger_->log_warn("OPRS-mp-proxy", "Starting listening for %s in MESSAGES_PT mode", client_name.c_str());
335  boost::asio::async_read_until(server_socket, server_buffer_, '\n',
336  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt,
337  this, boost::asio::placeholders::error));
338  } else {
339  // tried async_read_some with null buffers but always immediately fires without data available
340  logger_->log_warn("OPRS-mp-proxy", "Starting listening for %s in STRINGS_PT mode", client_name.c_str());
341  server_socket.async_read_some(boost::asio::null_buffers(),
342  boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt,
343  this, boost::asio::placeholders::error));
344  }
345 }
346 
347 
348 void
349 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt(const boost::system::error_code &err)
350 {
351  if (! err) {
352  std::string line;
353  std::istream in_stream(&server_buffer_);
354  std::getline(in_stream, line);
355 
356  logger_->log_info("OPRS-mp-proxy", "Forwarding server ->%s: '%s\\n'",
357  client_name.c_str(), line.c_str());
358 
359  // resend to client
360  write_string_newline_to_socket(client_socket, line);
361 
362  start_recv_server();
363  } else {
364  disconnect("recv_server_message_pt", err.message().c_str());
365  }
366 }
367 
368 
369 void
370 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt(const boost::system::error_code &err)
371 {
372  if (! err) {
373  try {
374  std::string sender = read_string_from_socket(server_socket);
375  std::string message = read_string_from_socket(server_socket);
376 
377  logger_->log_info("OPRS-mp-proxy", "Forwarding server %s->%s: '%s'",
378  sender.c_str(), client_name.c_str(), message.c_str());
379 
380  // resend to client
381  write_string_to_socket(client_socket, sender);
382  write_string_to_socket(client_socket, message);
383 
384  start_recv_server();
385  } catch (Exception &e) {
386  disconnect("recv_server_strings_pt", e.what_no_backtrace());
387  }
388  } else {
389  disconnect("recv_server_strings_pt", err.message().c_str());
390  }
391 }
392 
393 
394 int
395 OpenPRSMessagePasserProxy::Mapping::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
396 {
397  int32_t value;
398  boost::system::error_code ec;
399  boost::asio::read(socket, boost::asio::buffer(&value, sizeof(value)), ec);
400  if (ec) {
401  throw Exception("Failed to read int from socket: %s", ec.message().c_str());
402  } else {
403  return ntohl(value);
404  }
405 }
406 
407 std::string
408 OpenPRSMessagePasserProxy::Mapping::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
409 {
410 
411  uint32_t s_size = 0;
412  boost::system::error_code ec;
413  boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
414  if (ec) {
415  throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
416  }
417  s_size = ntohl(s_size);
418 
419  char s[s_size + 1];
420  boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
421  if (ec) {
422  throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
423  }
424  s[s_size] = 0;
425 
426  return s;
427 }
428 
429 
430 void
431 OpenPRSMessagePasserProxy::Mapping::write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
432 {
433  boost::system::error_code ec;
434  int32_t value = htonl(i);
435  boost::asio::write(socket, boost::asio::buffer(&value, sizeof(value)), ec);
436  if (ec) {
437  throw Exception("Failed to write int to socket: %s", ec.message().c_str());
438  }
439 }
440 
441 
442 void
443 OpenPRSMessagePasserProxy::Mapping::write_string_to_socket(boost::asio::ip::tcp::socket &socket, std::string &str)
444 {
445  boost::system::error_code ec;
446  uint32_t s_size = htonl(str.size());
447  std::array<boost::asio::const_buffer, 2> buffers;
448  buffers[0] = boost::asio::buffer(&s_size, sizeof(s_size));
449  buffers[1] = boost::asio::buffer(str.c_str(), str.size());
450 
451  boost::asio::write(socket, buffers, ec);
452  if (ec) {
453  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
454  }
455 }
456 
457 
458 void
459 OpenPRSMessagePasserProxy::Mapping::write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket,
460  const std::string &str)
461 {
462  boost::system::error_code ec;
463  std::string s = str + "\n";
464  boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
465  if (ec) {
466  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
467  }
468 }
469 
470 
471 } // end namespace fawkes
virtual ~OpenPRSMessagePasserProxy()
Destructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:686
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Interface for logging.
Definition: logger.h:34