Fawkes API  Fawkes Development Version
server.cpp
1 
2 /***************************************************************************
3  * server.cpp - Protobuf stream protocol - server
4  *
5  * Created: Thu Jan 31 14:57:16 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <protobuf_comm/server.h>
38 
39 #include <cstdlib>
40 
41 using namespace boost::asio;
42 using namespace boost::system;
43 
44 namespace protobuf_comm {
45 #if 0 /* just to make Emacs auto-indent happy */
46 }
47 #endif
48 
49 /** @class ProtobufStreamServer::Session <protobuf_comm/server.h>
50  * Internal class representing a client session.
51  * This class represents a connection to a particular client. It handles
52  * connection management, reading from, and writing to the client.
53  * @author Tim Niemueller
54  */
55 
56 /** Constructor.
57  * @param id ID of the client, used to address messages from within your
58  * application.
59  * @param parent Parent stream server notified about events.
60  * @param io_service ASIO I/O service to use for communication
61  */
62 ProtobufStreamServer::Session::Session(ClientID id, ProtobufStreamServer *parent,
63  boost::asio::io_service& io_service)
64  : id_(id), parent_(parent), socket_(io_service)
65 {
66  in_data_size_ = 1024;
67  in_data_ = malloc(in_data_size_);
68  outbound_active_ = false;
69 }
70 
71 /** Destructor. */
72 ProtobufStreamServer::Session::~Session()
73 {
74  boost::system::error_code err;
75  if (socket_.is_open()) {
76  socket_.shutdown(ip::tcp::socket::shutdown_both, err);
77  socket_.close();
78  }
79  free(in_data_);
80 }
81 
82 /** Do processing required to start a session.
83  */
84 void
85 ProtobufStreamServer::Session::start_session()
86 {
87  remote_endpoint_ = socket_.remote_endpoint();
88 }
89 
90 /** Start reading a message on this session.
91  * This sets up a read handler to read incoming messages. It also notifies
92  * the parent server of the initiated connection.
93  */
94 void
95 ProtobufStreamServer::Session::start_read()
96 {
97  boost::asio::async_read(socket_,
98  boost::asio::buffer(&in_frame_header_, sizeof(frame_header_t)),
99  boost::bind(&ProtobufStreamServer::Session::handle_read_header,
100  shared_from_this(), boost::asio::placeholders::error));
101 }
102 
103 
104  /** Send a message.
105  * @param component_id ID of the component to address
106  * @param msg_type numeric message type
107  * @param m Message to send
108  */
109 void
110 ProtobufStreamServer::Session::send(uint16_t component_id, uint16_t msg_type,
111  google::protobuf::Message &m)
112 {
113  QueueEntry *entry = new QueueEntry();
114  parent_->message_register().serialize(component_id, msg_type, m,
115  entry->frame_header, entry->message_header,
116  entry->serialized_message);
117 
118  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
119  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
120  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
121 
122  std::lock_guard<std::mutex> lock(outbound_mutex_);
123  if (outbound_active_) {
124  outbound_queue_.push(entry);
125  } else {
126  outbound_active_ = true;
127  boost::asio::async_write(socket_, entry->buffers,
128  boost::bind(&ProtobufStreamServer::Session::handle_write,
129  shared_from_this(),
130  boost::asio::placeholders::error,
131  boost::asio::placeholders::bytes_transferred,
132  entry));
133  }
134 }
135 
136 
137 /** Disconnect from client. */
138 void
139 ProtobufStreamServer::Session::disconnect()
140 {
141  boost::system::error_code err;
142  if (socket_.is_open()) {
143  socket_.shutdown(ip::tcp::socket::shutdown_both, err);
144  socket_.close();
145  }
146 }
147 
148 
149 /** Write completion handler. */
150 void
151 ProtobufStreamServer::Session::handle_write(const boost::system::error_code& error,
152  size_t /*bytes_transferred*/,
153  QueueEntry *entry)
154 {
155  delete entry;
156 
157  if (! error) {
158  std::lock_guard<std::mutex> lock(outbound_mutex_);
159  if (! outbound_queue_.empty()) {
160  QueueEntry *entry = outbound_queue_.front();
161  outbound_queue_.pop();
162  boost::asio::async_write(socket_, entry->buffers,
163  boost::bind(&ProtobufStreamServer::Session::handle_write,
164  shared_from_this(),
165  boost::asio::placeholders::error,
166  boost::asio::placeholders::bytes_transferred,
167  entry));
168  } else {
169  outbound_active_ = false;
170  }
171  } else {
172  parent_->disconnected(shared_from_this(), error);
173  }
174 }
175 
176 
177 /** Incoming data handler for header.
178  * This method is called if an error occurs while waiting for data (e.g. if
179  * the remote peer closes the connection), or if new data is available. This
180  * callback expectes header information to be received.
181  * @param error error code
182  */
183 void
184 ProtobufStreamServer::Session::handle_read_header(const boost::system::error_code& error)
185 {
186  if (! error) {
187  size_t to_read = ntohl(in_frame_header_.payload_size);
188  if (to_read > in_data_size_) {
189  void *new_data = realloc(in_data_, to_read);
190  if (new_data) {
191  in_data_size_ = to_read;
192  in_data_ = new_data;
193  } else {
194  parent_->disconnected(shared_from_this(),
195  errc::make_error_code(errc::not_enough_memory));
196  }
197  }
198  // setup new read
199  boost::asio::async_read(socket_,
200  boost::asio::buffer(in_data_, to_read),
201  boost::bind(&ProtobufStreamServer::Session::handle_read_message,
202  shared_from_this(), boost::asio::placeholders::error));
203  } else {
204  parent_->disconnected(shared_from_this(), error);
205  }
206 }
207 
208 
209 /** Incoming data handler for message content.
210  * This method is called if an error occurs while waiting for data (e.g. if
211  * the remote peer closes the connection), or if new data is available. This
212  * callback expectes message to be received that conforms to a previously
213  * received header.
214  * @param error error code
215  */
216 void
217 ProtobufStreamServer::Session::handle_read_message(const boost::system::error_code& error)
218 {
219  if (! error) {
220  message_header_t *message_header = static_cast<message_header_t *>(in_data_);
221 
222  uint16_t comp_id = ntohs(message_header->component_id);
223  uint16_t msg_type = ntohs(message_header->msg_type);
224  try {
225  std::shared_ptr<google::protobuf::Message> m =
226  parent_->message_register().deserialize(in_frame_header_, *message_header,
227  (char *)in_data_ + sizeof(message_header_t));
228  parent_->sig_rcvd_(id_, comp_id, msg_type, m);
229  } catch (std::runtime_error &e) {
230  // ignored, most likely unknown message tpye
231  parent_->sig_recv_failed_(id_, comp_id, msg_type, e.what());
232  }
233  start_read();
234  } else {
235  parent_->disconnected(shared_from_this(), error);
236  }
237 }
238 
239 
240 /** @class ProtobufStreamServer <protobuf_comm/server.h>
241  * Stream server for protobuf message transmission.
242  * The server opens a TCP socket (IPv4) and waits for incoming connections.
243  * Each incoming connection is given a unique client ID. Signals are
244  * provided that can be used to react to connections and incoming data.
245  * @author Tim Niemueller
246  */
247 
248 /** Constructor.
249  * @param port port to listen on
250  */
252  : io_service_(),
253  acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
254 {
255  message_register_ = new MessageRegister();
256  own_message_register_ = true;
257  next_cid_ = 1;
258 
259  acceptor_.set_option(socket_base::reuse_address(true));
260 
261  start_accept();
262  asio_thread_ = std::thread(&ProtobufStreamServer::run_asio, this);
263 }
264 
265 
266 /** Constructor.
267  * @param port port to listen on
268  * @param proto_path file paths to search for proto files. All message types
269  * within these files will automatically be registered and available for dynamic
270  * message creation.
271  */
273  std::vector<std::string> &proto_path)
274  : io_service_(),
275  acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
276 {
277  message_register_ = new MessageRegister(proto_path);
278  own_message_register_ = true;
279  next_cid_ = 1;
280 
281  acceptor_.set_option(socket_base::reuse_address(true));
282 
283  start_accept();
284  asio_thread_ = std::thread(&ProtobufStreamServer::run_asio, this);
285 }
286 
287 /** Constructor.
288  * @param port port to listen on
289  * @param mr message register to use to (de)serialize messages
290  */
292  MessageRegister *mr)
293  : io_service_(),
294  acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port)),
295  message_register_(mr), own_message_register_(false)
296 {
297  next_cid_ = 1;
298 
299  acceptor_.set_option(socket_base::reuse_address(true));
300 
301  start_accept();
302  asio_thread_ = std::thread(&ProtobufStreamServer::run_asio, this);
303 }
304 
305 
306 /** Destructor. */
308 {
309  io_service_.stop();
310  asio_thread_.join();
311  if (own_message_register_) {
312  delete message_register_;
313  }
314 }
315 
316 
317 /** Send a message to the given client.
318  * @param client ID of the client to addresss
319  * @param component_id ID of the component to address
320  * @param msg_type numeric message type
321  * @param m message to send
322  */
323 void
324 ProtobufStreamServer::send(ClientID client, uint16_t component_id, uint16_t msg_type,
325  google::protobuf::Message &m)
326 {
327  if (sessions_.find(client) == sessions_.end()) {
328  throw std::runtime_error("Client does not exist");
329  }
330 
331  sessions_[client]->send(component_id, msg_type, m);
332 }
333 
334 
335 /** Send a message.
336  * @param client ID of the client to addresss
337  * @param component_id ID of the component to address
338  * @param msg_type numeric message type
339  * @param m Message to send
340  */
341 void
342 ProtobufStreamServer::send(ClientID client, uint16_t component_id, uint16_t msg_type,
343  std::shared_ptr<google::protobuf::Message> m)
344 {
345  send(client, component_id, msg_type, *m);
346 }
347 
348 /** Send a message.
349  * @param client ID of the client to addresss
350  * @param m Message to send, the message must have an CompType enum type to
351  * specify component ID and message type.
352  */
353 void
354 ProtobufStreamServer::send(ClientID client, google::protobuf::Message &m)
355 {
356  const google::protobuf::Descriptor *desc = m.GetDescriptor();
357  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
358  if (! enumdesc) {
359  throw std::logic_error("Message does not have CompType enum");
360  }
361  const google::protobuf::EnumValueDescriptor *compdesc =
362  enumdesc->FindValueByName("COMP_ID");
363  const google::protobuf::EnumValueDescriptor *msgtdesc =
364  enumdesc->FindValueByName("MSG_TYPE");
365  if (! compdesc || ! msgtdesc) {
366  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
367  }
368  int comp_id = compdesc->number();
369  int msg_type = msgtdesc->number();
370  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
371  throw std::logic_error("Message has invalid COMP_ID");
372  }
373  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
374  throw std::logic_error("Message has invalid MSG_TYPE");
375  }
376 
377  send(client, comp_id, msg_type, m);
378 }
379 
380 /** Send a message.
381  * @param client ID of the client to addresss
382  * @param m Message to send, the message must have an CompType enum type to
383  * specify component ID and message type.
384  */
385 void
386 ProtobufStreamServer::send(ClientID client, std::shared_ptr<google::protobuf::Message> m)
387 {
388  send(client, *m);
389 }
390 
391 /** Send a message to all clients.
392  * @param component_id ID of the component to address
393  * @param msg_type numeric message type
394  * @param m message to send
395  */
396 void
397 ProtobufStreamServer::send_to_all(uint16_t component_id, uint16_t msg_type,
398  google::protobuf::Message &m)
399 {
400  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
401  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
402  send(s->first, component_id, msg_type, m);
403  }
404 }
405 
406 /** Send a message to all clients.
407  * @param component_id ID of the component to address
408  * @param msg_type numeric message type
409  * @param m message to send
410  */
411 void
412 ProtobufStreamServer::send_to_all(uint16_t component_id, uint16_t msg_type,
413  std::shared_ptr<google::protobuf::Message> m)
414 {
415  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
416  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
417  send(s->first, component_id, msg_type, m);
418  }
419 }
420 
421 /** Send a message to all clients.
422  * @param m message to send
423  */
424 void
425 ProtobufStreamServer::send_to_all(std::shared_ptr<google::protobuf::Message> m)
426 {
427  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
428  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
429  send(s->first, m);
430  }
431 }
432 
433 /** Send a message to all clients.
434  * @param m message to send
435  */
436 void
437 ProtobufStreamServer::send_to_all(google::protobuf::Message &m)
438 {
439  std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
440  for (s = sessions_.begin(); s != sessions_.end(); ++s) {
441  send(s->first, m);
442  }
443 }
444 
445 
446 /** Disconnect specific client.
447  * @param client client ID to disconnect from
448  */
449 void
451 {
452  if (sessions_.find(client) != sessions_.end()) {
453  boost::shared_ptr<Session> session = sessions_[client];
454  session->disconnect();
455  }
456 }
457 
458 /** Start accepting connections. */
459 void
460 ProtobufStreamServer::start_accept()
461 {
462 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 7))
463  std::lock_guard<std::mutex> lock(next_cid_mutex_);
464 #endif
465  Session::Ptr new_session(new Session(next_cid_++, this, io_service_));
466  acceptor_.async_accept(new_session->socket(),
467  boost::bind(&ProtobufStreamServer::handle_accept, this,
468  new_session, boost::asio::placeholders::error));
469 }
470 
471 void
472 ProtobufStreamServer::disconnected(boost::shared_ptr<Session> session,
473  const boost::system::error_code &error)
474 {
475  sessions_.erase(session->id());
476  sig_disconnected_(session->id(), error);
477 }
478 
479 void
480 ProtobufStreamServer::handle_accept(Session::Ptr new_session,
481  const boost::system::error_code& error)
482 {
483  if (!error) {
484  new_session->start_session();
485  sessions_[new_session->id()] = new_session;
486  sig_connected_(new_session->id(), new_session->remote_endpoint());
487  new_session->start_read();
488  }
489 
490  start_accept();
491 }
492 
493 
494 void
495 ProtobufStreamServer::run_asio()
496 {
497 #if BOOST_ASIO_VERSION > 100409
498  while (! io_service_.stopped()) {
499 #endif
500  usleep(0);
501  io_service_.reset();
502  io_service_.run();
503 #if BOOST_ASIO_VERSION > 100409
504  }
505 #endif
506 }
507 
508 } // end namespace protobuf_comm
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:68
Register to map msg type numbers to Protobuf messages.
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:450
void send_to_all(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to all clients.
Definition: server.cpp:397
ProtobufStreamServer(unsigned short port)
Constructor.
Definition: server.cpp:251
QueueEntry()
Constructor.
Definition: queue_entry.h:52
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:324
message_header_t message_header
Frame header (network byte order)
Definition: queue_entry.h:60