Fawkes API  Fawkes Development Version
peer.cpp
1 /***************************************************************************
2  * peer.cpp - Protobuf stream protocol - broadcast peer
3  *
4  * Created: Mon Feb 04 17:19:17 2013
5  * Copyright 2013 Tim Niemueller [www.niemueller.de]
6  ****************************************************************************/
7 
8 /* Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * - Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  * - Redistributions in binary form must reproduce the above copyright
15  * notice, this list of conditions and the following disclaimer in
16  * the documentation and/or other materials provided with the
17  * distribution.
18  * - Neither the name of the authors nor the names of its contributors
19  * may be used to endorse or promote products derived from this
20  * software without specific prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
27  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
28  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
29  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
31  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
33  * OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <protobuf_comm/peer.h>
37 #include <protobuf_comm/crypto.h>
38 
39 #include <boost/lexical_cast.hpp>
40 #include <ifaddrs.h>
41 
42 using namespace boost::asio;
43 using namespace boost::system;
44 
45 namespace protobuf_comm {
46 #if 0 /* just to make Emacs auto-indent happy */
47 }
48 #endif
49 
50 /** @class ProtobufBroadcastPeer <protobuf_comm/peer.h>
51  * Communicate by broadcasting protobuf messages.
52  * This class allows to communicate via UDP by broadcasting messages to the
53  * network.
54  * @author Tim Niemueller
55  */
56 
57 /** Constructor.
58  * @param address IPv4 broadcast address to send to
59  * @param port IPv4 UDP port to listen on and to send to
60  */
61 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port)
62  : io_service_(), resolver_(io_service_),
63  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port))
64 {
65  message_register_ = new MessageRegister();
66  own_message_register_ = true;
67  ctor(address, port);
68 }
69 
70 
71 /** Testing constructor.
72  * This constructor listens and sends to different ports. It can be used to
73  * send and receive on the same host or even from within the same process.
74  * It is most useful for communication tests.
75  * @param address IPv4 address to send to
76  * @param send_to_port IPv4 UDP port to send data to
77  * @param recv_on_port IPv4 UDP port to receive data on
78  */
80  unsigned short send_to_port,
81  unsigned short recv_on_port)
82  : io_service_(), resolver_(io_service_),
83  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port))
84 {
85  message_register_ = new MessageRegister();
86  own_message_register_ = true;
87  ctor(address, send_to_port);
88 }
89 
90 /** Constructor.
91  * @param address IPv4 broadcast address to send to
92  * @param port IPv4 UDP port to listen on and to send to
93  * @param proto_path list of file system paths where to look for proto files
94  */
95 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port,
96  std::vector<std::string> &proto_path)
97  : io_service_(), resolver_(io_service_),
98  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port))
99 {
100  message_register_ = new MessageRegister(proto_path);
101  own_message_register_ = true;
102  ctor(address, port);
103 }
104 
105 
106 /** Testing constructor.
107  * This constructor listens and sends to different ports. It can be used to
108  * send and receive on the same host or even from within the same process.
109  * It is most useful for communication tests.
110  * @param address IPv4 address to send to
111  * @param send_to_port IPv4 UDP port to send data to
112  * @param recv_on_port IPv4 UDP port to receive data on
113  * @param proto_path list of file system paths where to look for proto files
114  */
116  unsigned short send_to_port,
117  unsigned short recv_on_port,
118  std::vector<std::string> &proto_path)
119  : io_service_(), resolver_(io_service_),
120  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port))
121 {
122  message_register_ = new MessageRegister(proto_path);
123  own_message_register_ = true;
124  ctor(address, send_to_port);
125 }
126 
127 
128 /** Constructor.
129  * @param address IPv4 broadcast address to send to
130  * @param port IPv4 UDP port to listen on and to send to
131  * @param mr message register to query for message types
132  */
133 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port,
134  MessageRegister *mr)
135  : io_service_(), resolver_(io_service_),
136  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
137  message_register_(mr), own_message_register_(false)
138 {
139  ctor(address, port);
140 }
141 
142 /** Constructor with encryption.
143  * @param address IPv4 broadcast address to send to
144  * @param send_to_port IPv4 UDP port to send data to
145  * @param recv_on_port IPv4 UDP port to receive data on
146  * @param crypto_key encryption key for messages
147  * @param cipher cipher to use for encryption
148  */
150  unsigned short send_to_port, unsigned short recv_on_port,
151  const std::string crypto_key, const std::string cipher)
152  : io_service_(), resolver_(io_service_),
153  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port))
154 {
155  ctor(address, send_to_port, crypto_key, cipher);
156  message_register_ = new MessageRegister();
157  own_message_register_ = true;
158 }
159 
160 /** Constructor with encryption.
161  * @param address IPv4 broadcast address to send to
162  * @param send_to_port IPv4 UDP port to send data to
163  * @param recv_on_port IPv4 UDP port to receive data on
164  * @param mr message register to query for message types
165  * @param crypto_key encryption key for messages
166  * @param cipher cipher to use for encryption
167  */
169  unsigned short send_to_port, unsigned short recv_on_port,
170  MessageRegister *mr,
171  const std::string crypto_key, const std::string cipher)
172  : io_service_(), resolver_(io_service_),
173  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
174  message_register_(mr), own_message_register_(false)
175 {
176  ctor(address, send_to_port, crypto_key, cipher);
177 }
178 
179 /** Constructor with encryption.
180  * @param address IPv4 broadcast address to send to
181  * @param port IPv4 UDP port to listen on and to send to
182  * @param crypto_key encryption key for messages
183  * @param cipher cipher to use for encryption
184  */
185 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port,
186  const std::string crypto_key, const std::string cipher)
187  : io_service_(), resolver_(io_service_),
188  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port))
189 {
190  ctor(address, port, crypto_key, cipher);
191  message_register_ = new MessageRegister();
192  own_message_register_ = true;
193 }
194 
195 /** Constructor with encryption.
196  * @param address IPv4 broadcast address to send to
197  * @param port IPv4 UDP port to listen on and to send to
198  * @param mr message register to query for message types
199  * @param crypto_key encryption key for messages
200  * @param cipher cipher to use for encryption
201  */
202 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port,
203  MessageRegister *mr,
204  const std::string crypto_key, const std::string cipher)
205  : io_service_(), resolver_(io_service_),
206  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
207  message_register_(mr), own_message_register_(false)
208 {
209  ctor(address, port, crypto_key, cipher);
210 }
211 
212 
213 /** Testing constructor.
214  * This constructor listens and sends to different ports. It can be used to
215  * send and receive on the same host or even from within the same process.
216  * It is most useful for communication tests.
217  * @param address IPv4 address to send to
218  * @param send_to_port IPv4 UDP port to send data to
219  * @param recv_on_port IPv4 UDP port to receive data on
220  * @param mr message register to query for message types
221  * @param header_version which frame header version to send, use with caution
222  */
224  unsigned short send_to_port,
225  unsigned short recv_on_port,
226  MessageRegister *mr,
227  frame_header_version_t header_version)
228  : io_service_(), resolver_(io_service_),
229  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
230  message_register_(mr), own_message_register_(false)
231 {
232  ctor(address, send_to_port, "", "", header_version);
233 }
234 
235 
236 /** Constructor helper.
237  * @param address hostname/address to send to
238  * @param send_to_port UDP port to send messages to
239  * @param crypto_key encryption key for messages
240  * @param cipher cipher to use for encryption
241  * @þaram header_version which frame header version to send, use with caution
242  */
243 void
244 ProtobufBroadcastPeer::ctor(const std::string &address, unsigned int send_to_port,
245  const std::string crypto_key, const std::string cipher,
246  frame_header_version_t header_version)
247 {
248  filter_self_ = true;
249  crypto_ = false;
250  crypto_enc_ = NULL;
251  crypto_dec_ = NULL;
252  frame_header_version_ = header_version;
253 
254  in_data_size_ = max_packet_length;
255  in_data_ = malloc(in_data_size_);
256  enc_in_data_ = NULL;
257 
258  socket_.set_option(socket_base::broadcast(true));
259  socket_.set_option(socket_base::reuse_address(true));
260  determine_local_endpoints();
261 
262  outbound_active_ = true;
263  ip::udp::resolver::query query(address, boost::lexical_cast<std::string>(send_to_port));
264  resolver_.async_resolve(query,
265  boost::bind(&ProtobufBroadcastPeer::handle_resolve, this,
266  boost::asio::placeholders::error,
267  boost::asio::placeholders::iterator));
268 
269  if (! crypto_key.empty()) setup_crypto(crypto_key, cipher);
270 
271  start_recv();
272  asio_thread_ = std::thread(&ProtobufBroadcastPeer::run_asio, this);
273 }
274 
275 
276 /** Destructor. */
278 {
279  if (asio_thread_.joinable()) {
280  io_service_.stop();
281  asio_thread_.join();
282  }
283  free(in_data_);
284  if (enc_in_data_) free(enc_in_data_);
285  if (own_message_register_) {
286  delete message_register_;
287  }
288 
289  delete crypto_enc_;
290  delete crypto_dec_;
291 }
292 
293 
294 /** Setup encryption.
295  * After this call communication will be encrypted. Note that the first
296  * received message might be considered invalid because we are still
297  * listening for plain text messages. To avoid this use the constructor
298  * which takes the encryption key as parameter.
299  * @param key encryption key
300  * @param cipher cipher to use for encryption
301  * @see BufferEncryptor for supported ciphers
302  */
303 void
304 ProtobufBroadcastPeer::setup_crypto(const std::string &key, const std::string &cipher)
305 {
306  if (frame_header_version_ == PB_FRAME_V1) {
307  throw std::runtime_error("Crypto support only available with V2+ frame header");
308  }
309 
310  delete crypto_enc_;
311  delete crypto_dec_;
312  crypto_enc_ = NULL;
313  crypto_dec_ = NULL;
314  crypto_ = false;
315  crypto_buf_ = false;
316 
317  if (key != "" && cipher != "") {
318  crypto_enc_ = new BufferEncryptor(key, cipher);
319 
320  if (! enc_in_data_) {
321  // this depends on the cipher, but nothing is two times the incoming buffer...
322  enc_in_data_size_ = 2 * in_data_size_;
323  enc_in_data_ = malloc(enc_in_data_size_);
324  }
325 
326  crypto_dec_ = new BufferDecryptor(key);
327  crypto_ = true;
328  crypto_buf_ = false;
329  }
330 }
331 
332 void
333 ProtobufBroadcastPeer::determine_local_endpoints()
334 {
335  struct ifaddrs *ifap;
336  if (getifaddrs(&ifap) == 0){
337  for (struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next){
338  if (iter->ifa_addr == NULL) continue;
339  if (iter->ifa_addr->sa_family == AF_INET) {
340  boost::asio::ip::address_v4
341  addr(ntohl(reinterpret_cast<sockaddr_in*>(iter->ifa_addr)->sin_addr.s_addr));
342 
343  local_endpoints_.push_back(
344  boost::asio::ip::udp::endpoint(addr, socket_.local_endpoint().port()));
345  }
346  }
347  freeifaddrs(ifap);
348  }
349  local_endpoints_.sort();
350 }
351 
352 
353 /** Set if to filter out own messages.
354  * @param filter true to filter out own messages, false to receive them
355  */
356 void
358 {
359  filter_self_ = filter;
360 }
361 
362 
363 /** ASIO thread runnable. */
364 void
365 ProtobufBroadcastPeer::run_asio()
366 {
367 #if BOOST_ASIO_VERSION > 100409
368  while (! io_service_.stopped()) {
369 #endif
370  usleep(0);
371  io_service_.reset();
372  io_service_.run();
373 #if BOOST_ASIO_VERSION > 100409
374  }
375 #endif
376 }
377 
378 
379 void
380 ProtobufBroadcastPeer::handle_resolve(const boost::system::error_code& err,
381  ip::udp::resolver::iterator endpoint_iterator)
382 {
383  if (! err) {
384  std::lock_guard<std::mutex> lock(outbound_mutex_);
385  outbound_active_ = false;
386  outbound_endpoint_ = endpoint_iterator->endpoint();
387  } else {
388  sig_send_error_("Resolving endpoint failed");
389  }
390  start_send();
391 }
392 
393 void
394 ProtobufBroadcastPeer::handle_recv(const boost::system::error_code& error,
395  size_t bytes_rcvd)
396 {
397  const size_t expected_min_size =
398  (frame_header_version_ == PB_FRAME_V1)
399  ? sizeof(frame_header_v1_t) : (sizeof(frame_header_t) + sizeof(message_header_t));
400 
401  if (!error && bytes_rcvd >= expected_min_size ) {
402  frame_header_t frame_header;
403  size_t header_size;
404  if (frame_header_version_ == PB_FRAME_V1) {
405  frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
406  frame_header.header_version = PB_FRAME_V1;
407  frame_header.cipher = PB_ENCRYPTION_NONE;
408  frame_header.payload_size = frame_header_v1->payload_size;
409  header_size = sizeof(frame_header_v1_t);
410  } else {
411  memcpy(&frame_header, crypto_buf_ ? enc_in_data_ : in_data_, sizeof(frame_header_t));
412  header_size = sizeof(frame_header_t);
413 
414  if (crypto_buf_) {
415  sig_rcvd_raw_(in_endpoint_, frame_header,
416  (unsigned char *)enc_in_data_ + sizeof(frame_header_t),
417  bytes_rcvd - sizeof(frame_header_t));
418  } else {
419  sig_rcvd_raw_(in_endpoint_, frame_header,
420  (unsigned char *)in_data_ + sizeof(frame_header_t),
421  bytes_rcvd - sizeof(frame_header_t));
422  }
423 
424  if (sig_rcvd_.num_slots() > 0) {
425  if (! crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
426  sig_recv_error_(in_endpoint_, "Received encrypted message but encryption is disabled");
427  } else if (crypto_buf_ && (frame_header.cipher == PB_ENCRYPTION_NONE)) {
428  sig_recv_error_(in_endpoint_, "Received plain text message but encryption is enabled");
429  } else {
430 
431  if (crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
432  // we need to decrypt first
433  try {
434  memcpy(in_data_, enc_in_data_, sizeof(frame_header_t));
435  size_t to_decrypt = bytes_rcvd - sizeof(frame_header_t);
436  bytes_rcvd = crypto_dec_->decrypt(frame_header.cipher,
437  (unsigned char *)enc_in_data_ + sizeof(frame_header_t), to_decrypt,
438  (unsigned char *)in_data_ + sizeof(frame_header_t), in_data_size_);
439  frame_header.payload_size = htonl(bytes_rcvd);
440  bytes_rcvd += sizeof(frame_header_t);
441  } catch (std::runtime_error &e) {
442  sig_recv_error_(in_endpoint_, std::string("Decryption fail: ") + e.what());
443  bytes_rcvd = 0;
444  }
445  }
446  }
447  } // else nobody cares about deserialized message
448  }
449 
450  size_t payload_size = ntohl(frame_header.payload_size);
451 
452  if (sig_rcvd_.num_slots() > 0) {
453  if (bytes_rcvd == (header_size + payload_size)) {
454  if (! filter_self_ ||
455  ! std::binary_search(local_endpoints_.begin(), local_endpoints_.end(), in_endpoint_))
456  {
457  void *data;
458  message_header_t message_header;
459 
460  if (frame_header_version_ == PB_FRAME_V1) {
461  frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
462  message_header.component_id = frame_header_v1->component_id;
463  message_header.msg_type = frame_header_v1->msg_type;
464  data = (char *)in_data_ + sizeof(frame_header_v1_t);
465  // message register expects payload size to include message header
466  frame_header.payload_size = htonl(ntohl(frame_header.payload_size) + sizeof(message_header_t));
467  } else {
468  message_header_t *msg_header =
469  static_cast<message_header_t *>((void*)((char *)in_data_ + sizeof(frame_header_t)));
470  message_header.component_id = msg_header->component_id;
471  message_header.msg_type = msg_header->msg_type;
472  data = (char *)in_data_ + sizeof(frame_header_t) + sizeof(message_header_t);
473  }
474 
475  uint16_t comp_id = ntohs(message_header.component_id);
476  uint16_t msg_type = ntohs(message_header.msg_type);
477 
478  try {
479  std::shared_ptr<google::protobuf::Message> m =
480  message_register_->deserialize(frame_header, message_header, data);
481 
482  sig_rcvd_(in_endpoint_, comp_id, msg_type, m);
483  } catch (std::runtime_error &e) {
484  sig_recv_error_(in_endpoint_, std::string("Deserialization fail: ") + e.what());
485  }
486  }
487  } else {
488  sig_recv_error_(in_endpoint_, "Invalid number of bytes received");
489  }
490  } // else nobody cares (no one registered to signal)
491 
492  } else {
493  sig_recv_error_(in_endpoint_, "General receiving error or truncated message");
494  }
495 
496  start_recv();
497 }
498 
499 
500 void
501 ProtobufBroadcastPeer::handle_sent(const boost::system::error_code& error,
502  size_t bytes_transferred, QueueEntry *entry)
503 {
504  delete entry;
505 
506  {
507  std::lock_guard<std::mutex> lock(outbound_mutex_);
508  outbound_active_ = false;
509  }
510 
511  if (error) {
512  sig_send_error_("Sending message failed");
513  }
514 
515  start_send();
516 }
517 
518 
519 /** Send a message to other peers.
520  * @param component_id ID of the component to address
521  * @param msg_type numeric message type
522  * @param m message to send
523  */
524 void
525 ProtobufBroadcastPeer::send(uint16_t component_id, uint16_t msg_type,
526  google::protobuf::Message &m)
527 {
528  QueueEntry *entry = new QueueEntry();
529  message_register_->serialize(component_id, msg_type, m,
530  entry->frame_header, entry->message_header,
531  entry->serialized_message);
532 
533  if (entry->serialized_message.size() > max_packet_length) {
534  throw std::runtime_error("Serialized message too big");
535  }
536 
537  if (frame_header_version_ == PB_FRAME_V1) {
541 
542  entry->buffers[0] = boost::asio::buffer(&entry->frame_header_v1, sizeof(frame_header_v1_t));
543  entry->buffers[1] = boost::asio::const_buffer();
544  } else {
545  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
546  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
547  }
548  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
549 
550  {
551  std::lock_guard<std::mutex> lock(outbound_mutex_);
552  outbound_queue_.push(entry);
553  }
554  start_send();
555 }
556 
557 /** Send a raw message.
558  * The message is sent as-is (frame_header appended by message data) over the wire.
559  * @param frame_header frame header to prepend, must be completely and properly
560  * setup.
561  * @param data data buffer, maybe encrypted (if indicated in frame header)
562  * @param data_size size in bytes of @p data
563  */
564 void
566  const void *data, size_t data_size)
567 {
568  QueueEntry *entry = new QueueEntry();
569  entry->frame_header = frame_header;
570  entry->serialized_message = std::string(reinterpret_cast<const char *>(data), data_size);
571 
572  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
573  entry->buffers[1] = boost::asio::const_buffer();
574  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
575 
576  {
577  std::lock_guard<std::mutex> lock(outbound_mutex_);
578  outbound_queue_.push(entry);
579  }
580  start_send();
581 }
582 
583 
584 /** Send a message to other peers.
585  * @param component_id ID of the component to address
586  * @param msg_type numeric message type
587  * @param m message to send
588  */
589 void
590 ProtobufBroadcastPeer::send(uint16_t component_id, uint16_t msg_type,
591  std::shared_ptr<google::protobuf::Message> m)
592 {
593  send(component_id, msg_type, *m);
594 }
595 
596 
597 /** Send a message to other peers.
598  * @param m Message to send, the message must have an CompType enum type to
599  * specify component ID and message type.
600  */
601 void
602 ProtobufBroadcastPeer::send(std::shared_ptr<google::protobuf::Message> m)
603 {
604  send(*m);
605 }
606 
607 
608 /** Send a message to other peers.
609  * @param m Message to send, the message must have an CompType enum type to
610  * specify component ID and message type.
611  */
612 void
613 ProtobufBroadcastPeer::send(google::protobuf::Message &m)
614 {
615  const google::protobuf::Descriptor *desc = m.GetDescriptor();
616  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
617  if (! enumdesc) {
618  throw std::logic_error("Message does not have CompType enum");
619  }
620  const google::protobuf::EnumValueDescriptor *compdesc =
621  enumdesc->FindValueByName("COMP_ID");
622  const google::protobuf::EnumValueDescriptor *msgtdesc =
623  enumdesc->FindValueByName("MSG_TYPE");
624  if (! compdesc || ! msgtdesc) {
625  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
626  }
627  int comp_id = compdesc->number();
628  int msg_type = msgtdesc->number();
629  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
630  throw std::logic_error("Message has invalid COMP_ID");
631  }
632  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
633  throw std::logic_error("Message has invalid MSG_TYPE");
634  }
635 
636  send(comp_id, msg_type, m);
637 }
638 
639 void
640 ProtobufBroadcastPeer::start_recv()
641 {
642  crypto_buf_ = crypto_;
643  socket_.async_receive_from(boost::asio::buffer(crypto_ ? enc_in_data_ : in_data_, in_data_size_),
644  in_endpoint_,
645  boost::bind(&ProtobufBroadcastPeer::handle_recv,
646  this, boost::asio::placeholders::error,
647  boost::asio::placeholders::bytes_transferred));
648 }
649 
650 void
651 ProtobufBroadcastPeer::start_send()
652 {
653  std::lock_guard<std::mutex> lock(outbound_mutex_);
654  if (outbound_queue_.empty() || outbound_active_) return;
655 
656  outbound_active_ = true;
657 
658  QueueEntry *entry = outbound_queue_.front();
659  outbound_queue_.pop();
660 
661  if (crypto_) {
662  size_t plain_size = boost::asio::buffer_size(entry->buffers[1])
663  + boost::asio::buffer_size(entry->buffers[2]);
664  size_t enc_size = crypto_enc_->encrypted_buffer_size(plain_size);
665 
666  std::string plain_buf = std::string(plain_size, '\0');
667 
668  plain_buf.replace(0,
669  boost::asio::buffer_size(entry->buffers[1]),
670  boost::asio::buffer_cast<const char *>(entry->buffers[1]),
671  boost::asio::buffer_size(entry->buffers[1]));
672 
673  plain_buf.replace(boost::asio::buffer_size(entry->buffers[1]),
674  boost::asio::buffer_size(entry->buffers[2]),
675  boost::asio::buffer_cast<const char *>(entry->buffers[2]),
676  boost::asio::buffer_size(entry->buffers[2]));
677 
678  entry->encrypted_message.resize(enc_size);
679  crypto_enc_->encrypt(plain_buf, entry->encrypted_message);
680 
681  entry->frame_header.payload_size = htonl(entry->encrypted_message.size());
682  entry->frame_header.cipher = crypto_enc_->cipher_id();
683  entry->buffers[1] = boost::asio::buffer(entry->encrypted_message);
684  entry->buffers[2] = boost::asio::const_buffer();
685  }
686 
687  socket_.async_send_to(entry->buffers, outbound_endpoint_,
688  boost::bind(&ProtobufBroadcastPeer::handle_sent, this,
689  boost::asio::placeholders::error,
690  boost::asio::placeholders::bytes_transferred,
691  entry));
692 }
693 
694 
695 } // end namespace protobuf_comm
uint16_t msg_type
message type
Definition: frame_header.h:103
Old network message framing header.
Definition: frame_header.h:117
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to other peers.
Definition: peer.cpp:525
Outgoing queue entry.
Definition: queue_entry.h:49
uint32_t payload_size
payload size in bytes
Definition: frame_header.h:123
void send_raw(const frame_header_t &frame_header, const void *data, size_t data_size)
Send a raw message.
Definition: peer.cpp:565
uint8_t cipher
One of PB_ENCRYPTION_*.
Definition: frame_header.h:78
Register to map msg type numbers to Protobuf messages.
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
Network framing header.
Definition: frame_header.h:74
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
Definition: queue_entry.h:61
void setup_crypto(const std::string &key, const std::string &cipher)
Setup encryption.
Definition: peer.cpp:304
void serialize(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
uint32_t payload_size
payload size in bytes includes message and header, not IV
Definition: frame_header.h:86
Decrypt buffers encrypted with BufferEncryptor.
Definition: crypto.h:76
ProtobufBroadcastPeer(const std::string address, unsigned short port)
Constructor.
Definition: peer.cpp:61
uint16_t component_id
component id
Definition: frame_header.h:119
frame_header_t frame_header
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:58
void set_filter_self(bool filter)
Set if to filter out own messages.
Definition: peer.cpp:357
uint8_t header_version
Frame header version.
Definition: frame_header.h:76
uint16_t msg_type
message type
Definition: frame_header.h:121
Network message header.
Definition: frame_header.h:99
size_t encrypted_buffer_size(size_t plain_length)
Get required size for an encrypted buffer of the given plain text length.
Definition: crypto.cpp:154
size_t decrypt(int cipher, const void *enc, size_t enc_size, void *plain, size_t plain_size)
Decrypt a buffer.
Definition: crypto.cpp:224
uint16_t component_id
component id
Definition: frame_header.h:101
int cipher_id() const
Get cipher ID.
Definition: crypto.h:61
~ProtobufBroadcastPeer()
Destructor.
Definition: peer.cpp:277
Encrypt buffers using AES128 in ECB mode.
Definition: crypto.h:52
void encrypt(const std::string &plain, std::string &enc)
Encrypt a buffer.
Definition: crypto.cpp:98
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:59
message_header_t message_header
Frame header (network byte order)
Definition: queue_entry.h:60
std::string encrypted_message
encrypted buffer if encryption is used
Definition: queue_entry.h:62
std::string serialized_message
serialized protobuf message
Definition: queue_entry.h:56