Fawkes API  Fawkes Development Version
server.h
1 
2 /***************************************************************************
3  * server.h - Protobuf stream protocol - server
4  *
5  * Created: Wed Jan 30 16:41:22 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #ifndef __PROTOBUF_COMM_SERVER_H_
38 #define __PROTOBUF_COMM_SERVER_H_
39 
40 #include <protobuf_comm/frame_header.h>
41 #include <protobuf_comm/message_register.h>
42 #include <protobuf_comm/queue_entry.h>
43 
44 #include <boost/asio.hpp>
45 #include <boost/signals2.hpp>
46 #include <boost/enable_shared_from_this.hpp>
47 #include <google/protobuf/message.h>
48 
49 #ifndef _GLIBCXX_USE_SCHED_YIELD
50 # define _GLIBCXX_USE_SCHED_YIELD
51 #endif
52 #include <thread>
53 #include <mutex>
54 #include <queue>
55 #if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 7))
56 # include <atomic>
57 #endif
58 
59 namespace protobuf_comm {
60 #if 0 /* just to make Emacs auto-indent happy */
61 }
62 #endif
63 
65 {
66  public:
67  /** ID to identify connected clients. */
68  typedef unsigned int ClientID;
69 
70  ProtobufStreamServer(unsigned short port);
71  ProtobufStreamServer(unsigned short port, std::vector<std::string> &proto_path);
72  ProtobufStreamServer(unsigned short port, MessageRegister *mr);
74 
75  void send(ClientID client, uint16_t component_id, uint16_t msg_type,
76  google::protobuf::Message &m);
77  void send(ClientID client, uint16_t component_id, uint16_t msg_type,
78  std::shared_ptr<google::protobuf::Message> m);
79  void send(ClientID client, std::shared_ptr<google::protobuf::Message> m);
80  void send(ClientID client, google::protobuf::Message &m);
81 
82  void send_to_all(uint16_t component_id, uint16_t msg_type,
83  google::protobuf::Message &m);
84  void send_to_all(uint16_t component_id, uint16_t msg_type,
85  std::shared_ptr<google::protobuf::Message> m);
86  void send_to_all(std::shared_ptr<google::protobuf::Message> m);
87  void send_to_all(google::protobuf::Message &m);
88 
89  void disconnect(ClientID client);
90 
91  /** Get the server's message register.
92  * @return message register
93  */
95  { return *message_register_; }
96 
97  /** Signal that is invoked when a message has been received.
98  * @return signal
99  */
100  boost::signals2::signal<void (ClientID, uint16_t, uint16_t,
101  std::shared_ptr<google::protobuf::Message>)> &
102  signal_received() { return sig_rcvd_; }
103 
104  /** Signal that is invoked when receiving a message failed.
105  * @return signal
106  */
107  boost::signals2::signal<void (ClientID, uint16_t, uint16_t, std::string)> &
108  signal_receive_failed() { return sig_recv_failed_; }
109 
110  /** Signal that is invoked when a new client has connected.
111  * @return signal
112  */
113  boost::signals2::signal<void (ClientID, boost::asio::ip::tcp::endpoint &)> &
114  signal_connected() { return sig_connected_; }
115 
116  /** Signal that is invoked when a new client has disconnected.
117  * @return signal
118  */
119  boost::signals2::signal<void (ClientID, const boost::system::error_code &)> &
120  signal_disconnected() { return sig_disconnected_; }
121 
122  private:
123  class Session : public boost::enable_shared_from_this<Session>
124  {
125  public:
126  /** Shortcut for shared pointer of session. */
127  typedef boost::shared_ptr<Session> Ptr;
128 
129  Session(ClientID id, ProtobufStreamServer *parent,
130  boost::asio::io_service &io_service);
131  ~Session();
132 
133  /** Get underlying socket.
134  * @return socket */
135  boost::asio::ip::tcp::socket & socket() { return socket_; }
136 
137  /** Get client ID.
138  * @return client ID */
139  ClientID id() const { return id_; }
140  /** Get client's endpoint.
141  * @return remote client's endpoint */
142  boost::asio::ip::tcp::endpoint & remote_endpoint()
143  { return remote_endpoint_; }
144 
145  void start_session();
146  void start_read();
147  void send(uint16_t component_id, uint16_t msg_type,
148  google::protobuf::Message &m);
149  void disconnect();
150 
151  private:
152  void handle_read_message(const boost::system::error_code& error);
153  void handle_read_header(const boost::system::error_code& error);
154  void handle_write(const boost::system::error_code& error,
155  size_t /*bytes_transferred*/, QueueEntry *entry);
156 
157  private:
158  ClientID id_;
159  ProtobufStreamServer *parent_;
160  boost::asio::ip::tcp::socket socket_;
161  boost::asio::ip::tcp::endpoint remote_endpoint_;
162 
163  frame_header_t in_frame_header_;
164  size_t in_data_size_;
165  void * in_data_;
166 
167  std::queue<QueueEntry *> outbound_queue_;
168  std::mutex outbound_mutex_;
169  bool outbound_active_;
170  };
171 
172  private: // methods
173  void run_asio();
174  void start_accept();
175  void handle_accept(Session::Ptr new_session, const boost::system::error_code& error);
176 
177  void disconnected(boost::shared_ptr<Session> session,
178  const boost::system::error_code &error);
179 
180  private: // members
181  boost::asio::io_service io_service_;
182  boost::asio::ip::tcp::acceptor acceptor_;
183  boost::signals2::signal<void (ClientID, uint16_t, uint16_t,
184  std::shared_ptr<google::protobuf::Message>)> sig_rcvd_;
185  boost::signals2::signal<void (ClientID, uint16_t, uint16_t, std::string)> sig_recv_failed_;
186  boost::signals2::signal<void (ClientID, boost::asio::ip::tcp::endpoint &)> sig_connected_;
187  boost::signals2::signal<void (ClientID, const boost::system::error_code &)>
188  sig_disconnected_;
189 
190  std::thread asio_thread_;
191 
192  std::map<ClientID, boost::shared_ptr<Session>> sessions_;
193 
194 #if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 7))
195  std::atomic<ClientID> next_cid_;
196 #else
197  ClientID next_cid_;
198  std::mutex next_cid_mutex_;
199 #endif
200 
201  MessageRegister *message_register_;
202  bool own_message_register_;
203 };
204 
205 } // end namespace protobuf_comm
206 
207 #endif
Outgoing queue entry.
Definition: queue_entry.h:49
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:68
Register to map msg type numbers to Protobuf messages.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
Definition: server.h:120
Network framing header.
Definition: frame_header.h:74
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:450
void send_to_all(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to all clients.
Definition: server.cpp:397
ProtobufStreamServer(unsigned short port)
Constructor.
Definition: server.cpp:251
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:324
Stream server for protobuf message transmission.
Definition: server.h:64
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
Definition: server.h:114
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: server.h:108
MessageRegister & message_register()
Get the server&#39;s message register.
Definition: server.h:94
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: server.h:102