23 #include "openprs_mp_proxy.h" 25 #include <core/exception.h> 26 #include <logging/logger.h> 27 #include <boost/bind.hpp> 28 #include <boost/lexical_cast.hpp> 34 typedef enum {MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
35 typedef enum {REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED} Register_Type;
36 typedef enum {MESSAGES_PT, STRINGS_PT} Protocol_Type;
58 OpenPRSMessagePasserProxy::OpenPRSMessagePasserProxy(
unsigned short tcp_port,
59 const std::string &mp_host,
unsigned short mp_port,
61 : io_service_work_(io_service_), acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
62 mp_host_(mp_host), mp_port_(mp_port), logger_(logger)
64 acceptor_.set_option(socket_base::reuse_address(
true));
65 io_service_thread_ = std::thread([
this]() { this->io_service_.run(); });
73 io_service_thread_.join();
78 OpenPRSMessagePasserProxy::start_accept()
80 Mapping::Ptr mapping(
new Mapping(io_service_, mp_host_, mp_port_, logger_));
81 acceptor_.async_accept(mapping->client_socket,
82 boost::bind(&OpenPRSMessagePasserProxy::handle_accept,
this,
83 mapping, boost::asio::placeholders::error));
87 OpenPRSMessagePasserProxy::handle_accept(Mapping::Ptr mapping,
88 const boost::system::error_code& error)
91 mappings_.push_back(mapping);
99 OpenPRSMessagePasserProxy::Mapping::Mapping(boost::asio::io_service &io_service,
100 const std::string &mp_host,
unsigned short mp_port,
102 : io_service_(io_service), resolver_(io_service_),
103 server_host_(mp_host), server_port_(mp_port), logger_(logger),
104 client_socket(io_service_), server_socket(io_service_)
113 OpenPRSMessagePasserProxy::Mapping::~Mapping()
115 boost::system::error_code err;
116 client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
117 client_socket.close();
118 server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
119 server_socket.close();
125 OpenPRSMessagePasserProxy::Mapping::start()
127 client_prot = read_int_from_socket(client_socket);
128 client_name = read_string_from_socket(client_socket);
130 logger_->
log_info(
"OPRS-mp-proxy",
"Client %s connected", client_name.c_str());
132 ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
133 resolver_.async_resolve(query,
134 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_resolve,
this,
135 boost::asio::placeholders::error,
136 boost::asio::placeholders::iterator));
142 OpenPRSMessagePasserProxy::Mapping::alive()
const 144 return client_socket.is_open();
150 OpenPRSMessagePasserProxy::Mapping::disconnect()
152 disconnect(
"disconnect",
"API call");
157 OpenPRSMessagePasserProxy::Mapping::disconnect(
const char *where,
const char *reason)
159 logger_->
log_warn(
"OPRS-mp-proxy",
"Client %s disconnected (%s: %s)",
160 client_name.c_str(), where, reason);
161 boost::system::error_code ec;
162 client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
163 client_socket.close();
168 OpenPRSMessagePasserProxy::Mapping::handle_resolve(
const boost::system::error_code& err,
169 ip::tcp::resolver::iterator endpoint_iterator)
174 #if BOOST_ASIO_VERSION > 100409 175 boost::asio::async_connect(server_socket, endpoint_iterator,
177 server_socket.async_connect(*endpoint_iterator,
179 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_connect,
this,
180 boost::asio::placeholders::error));
182 disconnect(
"handle_resolve", err.message().c_str());
187 OpenPRSMessagePasserProxy::Mapping::handle_connect(
const boost::system::error_code &err)
190 write_int_to_socket(server_socket, client_prot);
191 write_string_to_socket(server_socket, client_name);
194 boost::asio::async_read(server_socket,
195 boost::asio::buffer(&server_in_reg_reply_,
sizeof(server_in_reg_reply_)),
196 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply,
197 this, boost::asio::placeholders::error));
200 disconnect(
"handle_connect", err.message().c_str());
206 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_reg_reply(
const boost::system::error_code &err)
208 write_int_to_socket(client_socket, server_in_reg_reply_);
210 if (server_in_reg_reply_ == OPRS::REGISTER_OK) {
214 disconnect(
"recv_server_reg_reply", err.message().c_str());
220 OpenPRSMessagePasserProxy::Mapping::start_recv_client()
222 boost::asio::async_read(client_socket,
223 boost::asio::buffer(&client_in_msg_type_,
sizeof(client_in_msg_type_)),
224 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_client,
225 this, boost::asio::placeholders::error));
230 OpenPRSMessagePasserProxy::Mapping::handle_recv_client(
const boost::system::error_code &err)
234 std::vector<std::string> multicast_recipients;
236 std::string recipient;
238 client_in_msg_type_ = ntohl(client_in_msg_type_);
240 switch (client_in_msg_type_) {
241 case OPRS::DISCONNECT_MT:
242 logger_->
log_info(
"OPRS-mp-proxy",
"Disconnecting %s", client_name.c_str());
243 disconnect(
"recv_client",
"Client disconnected");
246 case OPRS::MESSAGE_MT:
247 recipient = read_string_from_socket(client_socket);
250 case OPRS::MULTICAST_MT:
251 multicast_recipients.resize(read_int_from_socket(client_socket));
254 case OPRS::BROADCAST_MT:
break;
257 disconnect(
"recv_client",
"Unknown message type");
261 message = read_string_from_socket(client_socket);
263 if (client_in_msg_type_ == OPRS::MULTICAST_MT) {
264 for (
size_t i = 0; i < multicast_recipients.size(); ++i) {
265 multicast_recipients[i] = read_string_from_socket(client_socket);
270 switch (client_in_msg_type_) {
271 case OPRS::MESSAGE_MT:
272 logger_->
log_info(
"OPRS-mp-proxy",
"Forwarding unicast %s->%s: '%s'",
273 client_name.c_str(), recipient.c_str(), message.c_str());
276 case OPRS::MULTICAST_MT:
278 std::string recipients;
279 for (
size_t i = 0; i < multicast_recipients.size(); ++i) {
280 if (i > 0) recipients +=
", ";
281 recipients += multicast_recipients[i];
284 logger_->
log_info(
"OPRS-mp-proxy",
"Forwarding multicast %s->(%s): '%s'",
285 client_name.c_str(), recipients.c_str(), message.c_str());
289 case OPRS::BROADCAST_MT:
290 logger_->
log_info(
"OPRS-mp-proxy",
"Forwarding broadcast %s->*: '%s'",
291 client_name.c_str(), message.c_str());
298 write_int_to_socket(server_socket, client_in_msg_type_);
300 switch (client_in_msg_type_) {
301 case OPRS::MESSAGE_MT:
302 write_string_to_socket(server_socket, recipient);
303 write_string_to_socket(server_socket, message);
306 case OPRS::MULTICAST_MT:
307 write_string_to_socket(server_socket, message);
308 for (
size_t i = 0; i < multicast_recipients.size(); ++i) {
309 write_string_to_socket(server_socket, multicast_recipients[i]);
313 case OPRS::BROADCAST_MT:
314 write_string_to_socket(server_socket, message);
325 disconnect(
"recv_client", err.message().c_str());
331 OpenPRSMessagePasserProxy::Mapping::start_recv_server()
333 if (client_prot == OPRS::MESSAGES_PT) {
334 logger_->
log_warn(
"OPRS-mp-proxy",
"Starting listening for %s in MESSAGES_PT mode", client_name.c_str());
335 boost::asio::async_read_until(server_socket, server_buffer_,
'\n',
336 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt,
337 this, boost::asio::placeholders::error));
340 logger_->
log_warn(
"OPRS-mp-proxy",
"Starting listening for %s in STRINGS_PT mode", client_name.c_str());
341 server_socket.async_read_some(boost::asio::null_buffers(),
342 boost::bind(&OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt,
343 this, boost::asio::placeholders::error));
349 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_message_pt(
const boost::system::error_code &err)
353 std::istream in_stream(&server_buffer_);
354 std::getline(in_stream, line);
356 logger_->
log_info(
"OPRS-mp-proxy",
"Forwarding server ->%s: '%s\\n'",
357 client_name.c_str(), line.c_str());
360 write_string_newline_to_socket(client_socket, line);
364 disconnect(
"recv_server_message_pt", err.message().c_str());
370 OpenPRSMessagePasserProxy::Mapping::handle_recv_server_strings_pt(
const boost::system::error_code &err)
374 std::string sender = read_string_from_socket(server_socket);
375 std::string message = read_string_from_socket(server_socket);
377 logger_->
log_info(
"OPRS-mp-proxy",
"Forwarding server %s->%s: '%s'",
378 sender.c_str(), client_name.c_str(), message.c_str());
381 write_string_to_socket(client_socket, sender);
382 write_string_to_socket(client_socket, message);
389 disconnect(
"recv_server_strings_pt", err.message().c_str());
395 OpenPRSMessagePasserProxy::Mapping::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
398 boost::system::error_code ec;
399 boost::asio::read(socket, boost::asio::buffer(&value,
sizeof(value)), ec);
401 throw Exception(
"Failed to read int from socket: %s", ec.message().c_str());
408 OpenPRSMessagePasserProxy::Mapping::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
412 boost::system::error_code ec;
413 boost::asio::read(socket, boost::asio::buffer(&s_size,
sizeof(s_size)), ec);
415 throw Exception(
"Failed to read string size from socket: %s", ec.message().c_str());
417 s_size = ntohl(s_size);
420 boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
422 throw Exception(
"Failed to read string content from socket: %s", ec.message().c_str());
431 OpenPRSMessagePasserProxy::Mapping::write_int_to_socket(boost::asio::ip::tcp::socket &socket,
int i)
433 boost::system::error_code ec;
434 int32_t value = htonl(i);
435 boost::asio::write(socket, boost::asio::buffer(&value,
sizeof(value)), ec);
437 throw Exception(
"Failed to write int to socket: %s", ec.message().c_str());
443 OpenPRSMessagePasserProxy::Mapping::write_string_to_socket(boost::asio::ip::tcp::socket &socket, std::string &str)
445 boost::system::error_code ec;
446 uint32_t s_size = htonl(str.size());
447 std::array<boost::asio::const_buffer, 2> buffers;
448 buffers[0] = boost::asio::buffer(&s_size,
sizeof(s_size));
449 buffers[1] = boost::asio::buffer(str.c_str(), str.size());
451 boost::asio::write(socket, buffers, ec);
453 throw Exception(
"Failed to write string to socket: %s", ec.message().c_str());
459 OpenPRSMessagePasserProxy::Mapping::write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket,
460 const std::string &str)
462 boost::system::error_code ec;
463 std::string s = str +
"\n";
464 boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
466 throw Exception(
"Failed to write string to socket: %s", ec.message().c_str());
virtual ~OpenPRSMessagePasserProxy()
Destructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
Base class for exceptions in Fawkes.
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.