36 #include <protobuf_comm/peer.h> 37 #include <protobuf_comm/crypto.h> 39 #include <boost/lexical_cast.hpp> 61 ProtobufBroadcastPeer::ProtobufBroadcastPeer(
const std::string address,
unsigned short port)
62 : io_service_(), resolver_(io_service_),
63 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port))
66 own_message_register_ =
true;
80 unsigned short send_to_port,
81 unsigned short recv_on_port)
82 : io_service_(), resolver_(io_service_),
83 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port))
86 own_message_register_ =
true;
87 ctor(address, send_to_port);
96 std::vector<std::string> &proto_path)
97 : io_service_(), resolver_(io_service_),
98 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port))
101 own_message_register_ =
true;
116 unsigned short send_to_port,
117 unsigned short recv_on_port,
118 std::vector<std::string> &proto_path)
119 : io_service_(), resolver_(io_service_),
120 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port))
123 own_message_register_ =
true;
124 ctor(address, send_to_port);
135 : io_service_(), resolver_(io_service_),
136 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
137 message_register_(mr), own_message_register_(false)
150 unsigned short send_to_port,
unsigned short recv_on_port,
151 const std::string crypto_key,
const std::string cipher)
152 : io_service_(), resolver_(io_service_),
153 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port))
155 ctor(address, send_to_port, crypto_key, cipher);
157 own_message_register_ =
true;
169 unsigned short send_to_port,
unsigned short recv_on_port,
171 const std::string crypto_key,
const std::string cipher)
172 : io_service_(), resolver_(io_service_),
173 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
174 message_register_(mr), own_message_register_(false)
176 ctor(address, send_to_port, crypto_key, cipher);
186 const std::string crypto_key,
const std::string cipher)
187 : io_service_(), resolver_(io_service_),
188 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port))
190 ctor(address, port, crypto_key, cipher);
192 own_message_register_ =
true;
204 const std::string crypto_key,
const std::string cipher)
205 : io_service_(), resolver_(io_service_),
206 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
207 message_register_(mr), own_message_register_(false)
209 ctor(address, port, crypto_key, cipher);
224 unsigned short send_to_port,
225 unsigned short recv_on_port,
227 frame_header_version_t header_version)
228 : io_service_(), resolver_(io_service_),
229 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
230 message_register_(mr), own_message_register_(false)
232 ctor(address, send_to_port,
"",
"", header_version);
244 ProtobufBroadcastPeer::ctor(
const std::string &address,
unsigned int send_to_port,
245 const std::string crypto_key,
const std::string cipher,
246 frame_header_version_t header_version)
252 frame_header_version_ = header_version;
254 in_data_size_ = max_packet_length;
255 in_data_ = malloc(in_data_size_);
258 socket_.set_option(socket_base::broadcast(
true));
259 socket_.set_option(socket_base::reuse_address(
true));
260 determine_local_endpoints();
262 outbound_active_ =
true;
263 ip::udp::resolver::query query(address, boost::lexical_cast<std::string>(send_to_port));
264 resolver_.async_resolve(query,
265 boost::bind(&ProtobufBroadcastPeer::handle_resolve,
this,
266 boost::asio::placeholders::error,
267 boost::asio::placeholders::iterator));
269 if (! crypto_key.empty())
setup_crypto(crypto_key, cipher);
272 asio_thread_ = std::thread(&ProtobufBroadcastPeer::run_asio,
this);
279 if (asio_thread_.joinable()) {
284 if (enc_in_data_) free(enc_in_data_);
285 if (own_message_register_) {
286 delete message_register_;
306 if (frame_header_version_ == PB_FRAME_V1) {
307 throw std::runtime_error(
"Crypto support only available with V2+ frame header");
317 if (key !=
"" && cipher !=
"") {
320 if (! enc_in_data_) {
322 enc_in_data_size_ = 2 * in_data_size_;
323 enc_in_data_ = malloc(enc_in_data_size_);
333 ProtobufBroadcastPeer::determine_local_endpoints()
335 struct ifaddrs *ifap;
336 if (getifaddrs(&ifap) == 0){
337 for (
struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next){
338 if (iter->ifa_addr == NULL)
continue;
339 if (iter->ifa_addr->sa_family == AF_INET) {
340 boost::asio::ip::address_v4
341 addr(ntohl(reinterpret_cast<sockaddr_in*>(iter->ifa_addr)->sin_addr.s_addr));
343 local_endpoints_.push_back(
344 boost::asio::ip::udp::endpoint(addr, socket_.local_endpoint().port()));
349 local_endpoints_.sort();
359 filter_self_ = filter;
365 ProtobufBroadcastPeer::run_asio()
367 #if BOOST_ASIO_VERSION > 100409 368 while (! io_service_.stopped()) {
373 #if BOOST_ASIO_VERSION > 100409 380 ProtobufBroadcastPeer::handle_resolve(
const boost::system::error_code& err,
381 ip::udp::resolver::iterator endpoint_iterator)
384 std::lock_guard<std::mutex> lock(outbound_mutex_);
385 outbound_active_ =
false;
386 outbound_endpoint_ = endpoint_iterator->endpoint();
388 sig_send_error_(
"Resolving endpoint failed");
394 ProtobufBroadcastPeer::handle_recv(
const boost::system::error_code& error,
397 const size_t expected_min_size =
398 (frame_header_version_ == PB_FRAME_V1)
401 if (!error && bytes_rcvd >= expected_min_size ) {
404 if (frame_header_version_ == PB_FRAME_V1) {
407 frame_header.
cipher = PB_ENCRYPTION_NONE;
411 memcpy(&frame_header, crypto_buf_ ? enc_in_data_ : in_data_,
sizeof(
frame_header_t));
415 sig_rcvd_raw_(in_endpoint_, frame_header,
419 sig_rcvd_raw_(in_endpoint_, frame_header,
424 if (sig_rcvd_.num_slots() > 0) {
425 if (! crypto_buf_ && (frame_header.
cipher != PB_ENCRYPTION_NONE)) {
426 sig_recv_error_(in_endpoint_,
"Received encrypted message but encryption is disabled");
427 }
else if (crypto_buf_ && (frame_header.
cipher == PB_ENCRYPTION_NONE)) {
428 sig_recv_error_(in_endpoint_,
"Received plain text message but encryption is enabled");
431 if (crypto_buf_ && (frame_header.
cipher != PB_ENCRYPTION_NONE)) {
437 (
unsigned char *)enc_in_data_ +
sizeof(
frame_header_t), to_decrypt,
438 (
unsigned char *)in_data_ +
sizeof(
frame_header_t), in_data_size_);
441 }
catch (std::runtime_error &e) {
442 sig_recv_error_(in_endpoint_, std::string(
"Decryption fail: ") + e.what());
452 if (sig_rcvd_.num_slots() > 0) {
453 if (bytes_rcvd == (header_size + payload_size)) {
454 if (! filter_self_ ||
455 ! std::binary_search(local_endpoints_.begin(), local_endpoints_.end(), in_endpoint_))
460 if (frame_header_version_ == PB_FRAME_V1) {
476 uint16_t msg_type = ntohs(message_header.
msg_type);
479 std::shared_ptr<google::protobuf::Message> m =
480 message_register_->
deserialize(frame_header, message_header, data);
482 sig_rcvd_(in_endpoint_, comp_id, msg_type, m);
483 }
catch (std::runtime_error &e) {
484 sig_recv_error_(in_endpoint_, std::string(
"Deserialization fail: ") + e.what());
488 sig_recv_error_(in_endpoint_,
"Invalid number of bytes received");
493 sig_recv_error_(in_endpoint_,
"General receiving error or truncated message");
501 ProtobufBroadcastPeer::handle_sent(
const boost::system::error_code& error,
507 std::lock_guard<std::mutex> lock(outbound_mutex_);
508 outbound_active_ =
false;
512 sig_send_error_(
"Sending message failed");
526 google::protobuf::Message &m)
529 message_register_->
serialize(component_id, msg_type, m,
534 throw std::runtime_error(
"Serialized message too big");
537 if (frame_header_version_ == PB_FRAME_V1) {
543 entry->
buffers[1] = boost::asio::const_buffer();
551 std::lock_guard<std::mutex> lock(outbound_mutex_);
552 outbound_queue_.push(entry);
566 const void *data,
size_t data_size)
570 entry->
serialized_message = std::string(reinterpret_cast<const char *>(data), data_size);
573 entry->
buffers[1] = boost::asio::const_buffer();
577 std::lock_guard<std::mutex> lock(outbound_mutex_);
578 outbound_queue_.push(entry);
591 std::shared_ptr<google::protobuf::Message> m)
593 send(component_id, msg_type, *m);
615 const google::protobuf::Descriptor *desc = m.GetDescriptor();
616 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
618 throw std::logic_error(
"Message does not have CompType enum");
620 const google::protobuf::EnumValueDescriptor *compdesc =
621 enumdesc->FindValueByName(
"COMP_ID");
622 const google::protobuf::EnumValueDescriptor *msgtdesc =
623 enumdesc->FindValueByName(
"MSG_TYPE");
624 if (! compdesc || ! msgtdesc) {
625 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
627 int comp_id = compdesc->number();
628 int msg_type = msgtdesc->number();
629 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
630 throw std::logic_error(
"Message has invalid COMP_ID");
632 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
633 throw std::logic_error(
"Message has invalid MSG_TYPE");
636 send(comp_id, msg_type, m);
640 ProtobufBroadcastPeer::start_recv()
642 crypto_buf_ = crypto_;
643 socket_.async_receive_from(boost::asio::buffer(crypto_ ? enc_in_data_ : in_data_, in_data_size_),
645 boost::bind(&ProtobufBroadcastPeer::handle_recv,
646 this, boost::asio::placeholders::error,
647 boost::asio::placeholders::bytes_transferred));
651 ProtobufBroadcastPeer::start_send()
653 std::lock_guard<std::mutex> lock(outbound_mutex_);
654 if (outbound_queue_.empty() || outbound_active_)
return;
656 outbound_active_ =
true;
659 outbound_queue_.pop();
662 size_t plain_size = boost::asio::buffer_size(entry->
buffers[1])
663 + boost::asio::buffer_size(entry->
buffers[2]);
666 std::string plain_buf = std::string(plain_size,
'\0');
669 boost::asio::buffer_size(entry->
buffers[1]),
670 boost::asio::buffer_cast<const char *>(entry->
buffers[1]),
671 boost::asio::buffer_size(entry->
buffers[1]));
673 plain_buf.replace(boost::asio::buffer_size(entry->
buffers[1]),
674 boost::asio::buffer_size(entry->
buffers[2]),
675 boost::asio::buffer_cast<const char *>(entry->
buffers[2]),
676 boost::asio::buffer_size(entry->
buffers[2]));
684 entry->
buffers[2] = boost::asio::const_buffer();
687 socket_.async_send_to(entry->
buffers, outbound_endpoint_,
688 boost::bind(&ProtobufBroadcastPeer::handle_sent,
this,
689 boost::asio::placeholders::error,
690 boost::asio::placeholders::bytes_transferred,
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to other peers.
void send_raw(const frame_header_t &frame_header, const void *data, size_t data_size)
Send a raw message.
Register to map msg type numbers to Protobuf messages.
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
void setup_crypto(const std::string &key, const std::string &cipher)
Setup encryption.
void serialize(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
Decrypt buffers encrypted with BufferEncryptor.
ProtobufBroadcastPeer(const std::string address, unsigned short port)
Constructor.
frame_header_t frame_header
Frame header (network byte order), never encrypted.
void set_filter_self(bool filter)
Set if to filter out own messages.
size_t encrypted_buffer_size(size_t plain_length)
Get required size for an encrypted buffer of the given plain text length.
size_t decrypt(int cipher, const void *enc, size_t enc_size, void *plain, size_t plain_size)
Decrypt a buffer.
int cipher_id() const
Get cipher ID.
~ProtobufBroadcastPeer()
Destructor.
Encrypt buffers using AES128 in ECB mode.
void encrypt(const std::string &plain, std::string &enc)
Encrypt a buffer.
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
message_header_t message_header
Frame header (network byte order)
std::string encrypted_message
encrypted buffer if encryption is used
std::string serialized_message
serialized protobuf message