Fawkes API  Fawkes Development Version
client.cpp
1 
2 /***************************************************************************
3  * client.cpp - Protobuf stream protocol - client
4  *
5  * Created: Thu Jan 31 17:38:04 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <protobuf_comm/client.h>
38 
39 #include <boost/lexical_cast.hpp>
40 
41 using namespace boost::asio;
42 using namespace boost::system;
43 
44 namespace protobuf_comm {
45 #if 0 /* just to make Emacs auto-indent happy */
46 }
47 #endif
48 
49 /** @class ProtobufStreamClient <protobuf_comm/client.h>
50  * Stream client for protobuf message transmission.
51  * The client opens a TCP connection (IPv4) to a specified server and
52  * send and receives messages to the remote.
53  * @author Tim Niemueller
54  */
55 
56 /** Constructor. */
57 ProtobufStreamClient::ProtobufStreamClient()
58  : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
59 {
60  message_register_ = new MessageRegister();
61  own_message_register_ = true;
62  connected_ = false;
63  outbound_active_ = false;
64  in_data_size_ = 1024;
65  frame_header_version_ = PB_FRAME_V2;
66  in_frame_header_size_ = sizeof(frame_header_t);
67  in_frame_header_ = malloc(in_frame_header_size_);
68  in_data_ = malloc(in_data_size_);
69  run_asio();
70 }
71 
72 /** Constructor.
73  * @param proto_path file paths to search for proto files. All message types
74  * within these files will automatically be registered and available for dynamic
75  * message creation.
76  */
77 ProtobufStreamClient::ProtobufStreamClient(std::vector<std::string> &proto_path)
78  : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
79 {
80  message_register_ = new MessageRegister(proto_path);
81  own_message_register_ = true;
82  connected_ = false;
83  outbound_active_ = false;
84  in_data_size_ = 1024;
85  in_data_ = malloc(in_data_size_);
86  frame_header_version_ = PB_FRAME_V2;
87  in_frame_header_size_ = sizeof(frame_header_t);
88  in_frame_header_ = malloc(in_frame_header_size_);
89  run_asio();
90 }
91 
92 
93 /** Constructor.
94  * @param mr message register to use to (de)serialize messages
95  * @param header_version protobuf protocol frame header version to use,
96  */
98  frame_header_version_t header_version)
99  : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_),
100  message_register_(mr), own_message_register_(false),
101  frame_header_version_(header_version)
102 {
103  connected_ = false;
104  outbound_active_ = false;
105  in_data_size_ = 1024;
106  in_data_ = malloc(in_data_size_);
107  if (frame_header_version_ == PB_FRAME_V1) {
108  in_frame_header_size_ = sizeof(frame_header_v1_t);
109  } else {
110  in_frame_header_size_ = sizeof(frame_header_t);
111  }
112  in_frame_header_ = malloc(in_frame_header_size_);
113  run_asio();
114 }
115 
116 
117 /** Destructor. */
119 {
120  disconnect_nosig();
121  io_service_.stop();
122  asio_thread_.join();
123  free(in_data_);
124  free(in_frame_header_);
125  if (own_message_register_) {
126  delete message_register_;
127  }
128 }
129 
130 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6))
131 static void run_asio_thread(boost::asio::io_service &io_service)
132 { io_service.run(); }
133 #endif
134 
135 void
136 ProtobufStreamClient::run_asio()
137 {
138 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6))
139  asio_thread_ = std::thread(run_asio_thread, std::ref(io_service_));
140 #else
141  asio_thread_ = std::thread([this]() { this->io_service_.run(); });
142 #endif
143 }
144 
145 
146 /** Asynchronous connect.
147  * This triggers connection establishment. The method does not block,
148  * i.e. it returns immediately and does not wait for the connection to
149  * be established.
150  * @param host host to connect to
151  * @param port TCP port to connect to
152  */
153 void
154 ProtobufStreamClient::async_connect(const char *host, unsigned short port)
155 {
156  ip::tcp::resolver::query query(host, boost::lexical_cast<std::string>(port));
157  resolver_.async_resolve(query,
158  boost::bind(&ProtobufStreamClient::handle_resolve, this,
159  boost::asio::placeholders::error,
160  boost::asio::placeholders::iterator));
161 
162 }
163 
164 
165 void
166 ProtobufStreamClient::handle_resolve(const boost::system::error_code& err,
167  ip::tcp::resolver::iterator endpoint_iterator)
168 {
169  if (! err) {
170  // Attempt a connection to each endpoint in the list until we
171  // successfully establish a connection.
172 #if BOOST_ASIO_VERSION > 100409
173  boost::asio::async_connect(socket_, endpoint_iterator,
174 #else
175  socket_.async_connect(*endpoint_iterator,
176 #endif
177  boost::bind(&ProtobufStreamClient::handle_connect, this,
178  boost::asio::placeholders::error));
179  } else {
180  disconnect_nosig();
181  sig_disconnected_(err);
182  }
183 }
184 
185 void
186 ProtobufStreamClient::handle_connect(const boost::system::error_code &err)
187 {
188  if (! err) {
189 #if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
190  // Boost 1.54 has a bug that causes async_connect to report success
191  // if it cannot connect at all to the other side, cf.
192  // https://svn.boost.org/trac/boost/ticket/8795
193  // Work around by explicitly checking for connected status
194  boost::system::error_code ec;
195  socket_.remote_endpoint(ec);
196  if (ec == boost::system::errc::not_connected) {
197  disconnect_nosig();
198  sig_disconnected_(ec);
199  return;
200  }
201 #endif
202  connected_ = true;
203  start_recv();
204  sig_connected_();
205  } else {
206  disconnect_nosig();
207  sig_disconnected_(err);
208  }
209 }
210 
211 void
212 ProtobufStreamClient::disconnect_nosig()
213 {
214  boost::system::error_code err;
215  if (socket_.is_open()) {
216  socket_.shutdown(ip::tcp::socket::shutdown_both, err);
217  socket_.close();
218  }
219  connected_ = false;
220 }
221 
222 
223 /** Disconnect from remote host. */
224 void
226 {
227  disconnect_nosig();
228  sig_disconnected_(boost::system::error_code());
229 }
230 
231 
232 void
233 ProtobufStreamClient::start_recv()
234 {
235  boost::asio::async_read(socket_,
236  boost::asio::buffer(in_frame_header_, in_frame_header_size_),
237  boost::bind(&ProtobufStreamClient::handle_read_header,
238  this, boost::asio::placeholders::error));
239 }
240 
241 void
242 ProtobufStreamClient::handle_read_header(const boost::system::error_code& error)
243 {
244  if (! error) {
245  size_t to_read;
246  if (frame_header_version_ == PB_FRAME_V1) {
247  frame_header_v1_t *frame_header = (frame_header_v1_t *)in_frame_header_;
248  to_read = ntohl(frame_header->payload_size);
249  } else {
250  frame_header_t *frame_header = (frame_header_t *)in_frame_header_;
251  to_read = ntohl(frame_header->payload_size);
252  }
253  if (to_read > in_data_size_) {
254  void *new_data = realloc(in_data_, to_read);
255  if (new_data) {
256  in_data_size_ = to_read;
257  in_data_ = new_data;
258  } else {
259  disconnect_nosig();
260  sig_disconnected_(errc::make_error_code(errc::not_enough_memory));
261  }
262  }
263  // setup new read
264  boost::asio::async_read(socket_,
265  boost::asio::buffer(in_data_, to_read),
266  boost::bind(&ProtobufStreamClient::handle_read_message,
267  this, boost::asio::placeholders::error));
268  } else {
269  disconnect_nosig();
270  sig_disconnected_(error);
271  }
272 }
273 
274 void
275 ProtobufStreamClient::handle_read_message(const boost::system::error_code& error)
276 {
277  if (! error) {
278  frame_header_t frame_header;
279  message_header_t message_header;
280  void *data;
281 
282  if (frame_header_version_ == PB_FRAME_V1) {
283  frame_header_v1_t *frame_header_v1 = (frame_header_v1_t *)in_frame_header_;
284  frame_header.header_version = PB_FRAME_V1;
285  frame_header.cipher = PB_ENCRYPTION_NONE;
286  frame_header.payload_size = htonl(ntohl(frame_header_v1->payload_size) + sizeof(message_header_t));
287  message_header.component_id = frame_header_v1->component_id;
288  message_header.msg_type = frame_header_v1->msg_type;
289  data = in_data_;
290  } else {
291  memcpy(&frame_header, in_frame_header_, sizeof(frame_header_t));
292 
293  message_header_t *msg_header = static_cast<message_header_t *>(in_data_);
294  message_header.component_id = msg_header->component_id;
295  message_header.msg_type = msg_header->msg_type;
296 
297  data = (char *)in_data_ + sizeof(message_header);
298  }
299 
300  uint16_t comp_id = ntohs(message_header.component_id);
301  uint16_t msg_type = ntohs(message_header.msg_type);
302  try {
303  std::shared_ptr<google::protobuf::Message> m =
304  message_register_->deserialize(frame_header, message_header, data);
305 
306  sig_rcvd_(comp_id, msg_type, m);
307  } catch (std::runtime_error &e) {
308  sig_recv_failed_(comp_id, msg_type, e.what());
309  }
310 
311  start_recv();
312  } else {
313  disconnect_nosig();
314  sig_disconnected_(error);
315  }
316 }
317 
318 void
319 ProtobufStreamClient::handle_write(const boost::system::error_code& error,
320  size_t /*bytes_transferred*/,
321  QueueEntry *entry)
322 {
323  delete entry;
324 
325  if (! error) {
326  std::lock_guard<std::mutex> lock(outbound_mutex_);
327  if (! outbound_queue_.empty()) {
328  QueueEntry *entry = outbound_queue_.front();
329  outbound_queue_.pop();
330  boost::asio::async_write(socket_, entry->buffers,
331  boost::bind(&ProtobufStreamClient::handle_write, this,
332  boost::asio::placeholders::error,
333  boost::asio::placeholders::bytes_transferred,
334  entry));
335  } else {
336  outbound_active_ = false;
337  }
338  } else {
339  disconnect_nosig();
340  sig_disconnected_(error);
341  }
342 }
343 
344 
345 /** Send a message to the server.
346  * @param component_id ID of the component to address
347  * @param msg_type numeric message type
348  * @param m message to send
349  */
350 void
351 ProtobufStreamClient::send(uint16_t component_id, uint16_t msg_type,
352  google::protobuf::Message &m)
353 {
354  if (!connected_) {
355  throw std::runtime_error("Cannot send while not connected");
356  }
357 
358  QueueEntry *entry = new QueueEntry();
359  message_register_->serialize(component_id, msg_type, m,
360  entry->frame_header, entry->message_header,
361  entry->serialized_message);
362 
363  if (frame_header_version_ == PB_FRAME_V1) {
367  htonl(ntohl(entry->frame_header.payload_size) - sizeof(message_header_t));
368 
369  entry->buffers[0] = boost::asio::buffer(&entry->frame_header_v1, sizeof(frame_header_v1_t));
370  entry->buffers[1] = boost::asio::const_buffer();
371  } else {
372  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
373  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
374  }
375  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
376 
377  std::lock_guard<std::mutex> lock(outbound_mutex_);
378  if (outbound_active_) {
379  outbound_queue_.push(entry);
380  } else {
381  outbound_active_ = true;
382  boost::asio::async_write(socket_, entry->buffers,
383  boost::bind(&ProtobufStreamClient::handle_write, this,
384  boost::asio::placeholders::error,
385  boost::asio::placeholders::bytes_transferred,
386  entry));
387  }
388 }
389 
390 
391 /** Send a message to the server.
392  * @param m message to send, the message must be of a type with a suitable CompType
393  * enum indicating component ID and message type.
394  */
395 void
396 ProtobufStreamClient::send(google::protobuf::Message &m)
397 {
398  const google::protobuf::Descriptor *desc = m.GetDescriptor();
399  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
400  if (! enumdesc) {
401  throw std::logic_error("Message does not have CompType enum");
402  }
403  const google::protobuf::EnumValueDescriptor *compdesc =
404  enumdesc->FindValueByName("COMP_ID");
405  const google::protobuf::EnumValueDescriptor *msgtdesc =
406  enumdesc->FindValueByName("MSG_TYPE");
407  if (! compdesc || ! msgtdesc) {
408  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
409  }
410  int comp_id = compdesc->number();
411  int msg_type = msgtdesc->number();
412  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
413  throw std::logic_error("Message has invalid COMP_ID");
414  }
415  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
416  throw std::logic_error("Message has invalid MSG_TYPE");
417  }
418 
419  send(comp_id, msg_type, m);
420 }
421 
422 
423 /** Send a message to the server.
424  * @param m message to send, the message must be of a type with a suitable CompType
425  * enum indicating component ID and message type.
426  */
427 void
428 ProtobufStreamClient::send(std::shared_ptr<google::protobuf::Message> m)
429 {
430  send(*m);
431 }
432 
433 } // end namespace protobuf_comm
uint16_t msg_type
message type
Definition: frame_header.h:103
Old network message framing header.
Definition: frame_header.h:117
Outgoing queue entry.
Definition: queue_entry.h:49
uint32_t payload_size
payload size in bytes
Definition: frame_header.h:123
uint8_t cipher
One of PB_ENCRYPTION_*.
Definition: frame_header.h:78
Register to map msg type numbers to Protobuf messages.
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
Network framing header.
Definition: frame_header.h:74
ProtobufStreamClient()
Constructor.
Definition: client.cpp:57
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
Definition: queue_entry.h:61
void disconnect()
Disconnect from remote host.
Definition: client.cpp:225
void serialize(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
uint32_t payload_size
payload size in bytes includes message and header, not IV
Definition: frame_header.h:86
uint16_t component_id
component id
Definition: frame_header.h:119
frame_header_t frame_header
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:58
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the server.
Definition: client.cpp:351
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
Definition: client.cpp:154
uint8_t header_version
Frame header version.
Definition: frame_header.h:76
uint16_t msg_type
message type
Definition: frame_header.h:121
Network message header.
Definition: frame_header.h:99
uint16_t component_id
component id
Definition: frame_header.h:101
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:59
message_header_t message_header
Frame header (network byte order)
Definition: queue_entry.h:60
std::string serialized_message
serialized protobuf message
Definition: queue_entry.h:56