Fawkes API  Fawkes Development Version
oprs_protobuf.h
1 
2 /***************************************************************************
3  * oprs_protobuf.h - protobuf network communication for OpenPRS
4  *
5  * Created: Tue Sep 02 16:34:09 2014 (based on CLIPS version)
6  * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #ifndef __OPENPRS_AGENT_OPRS_PROTOBUF_H_
38 #define __OPENPRS_AGENT_OPRS_PROTOBUF_H_
39 
40 #include <oprs-type-pub.h>
41 #include <oprs-type_f-pub.h>
42 
43 #include <list>
44 #include <map>
45 
46 #include <protobuf_comm/server.h>
47 #include <core/threading/mutex.h>
48 #include <core/utils/lock_queue.h>
49 
50 namespace protobuf_comm {
51  class ProtobufStreamClient;
52  class ProtobufBroadcastPeer;
53 }
54 
55 namespace oprs_protobuf {
56 #if 0 /* just to make Emacs auto-indent happy */
57 }
58 #endif
59 
61 {
62  public:
63  OpenPRSProtobuf(std::vector<std::string> &proto_path);
64  ~OpenPRSProtobuf();
65 
66  /** Get Protobuf server.
67  * @return protobuf server */
69  { return server_; }
70 
71  /** Get protobuf_comm peers.
72  * @return protobuf_comm peer */
73  const std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> &
74  peers() const
75  { return peers_; }
76 
77  /** Get the communicator's message register.
78  * @return message register */
80  { return *message_register_; }
81 
82  /** Signal invoked for a message that has been sent to a server client.
83  * @return signal
84  */
85  boost::signals2::signal<void (protobuf_comm::ProtobufStreamServer::ClientID,
86  std::shared_ptr<google::protobuf::Message>)> &
87  signal_server_sent() { return sig_server_sent_; }
88 
89  /** Signal invoked for a message that has been sent to a client.
90  * @return signal
91  */
92  boost::signals2::signal<void (std::string, unsigned short,
93  std::shared_ptr<google::protobuf::Message>)> &
94  signal_client_sent() { return sig_client_sent_; }
95 
96  /** Signal invoked for a message that has been sent via broadcast.
97  * @return signal
98  */
99  boost::signals2::signal<void (long int, std::shared_ptr<google::protobuf::Message>)> &
100  signal_peer_sent() { return sig_peer_sent_; }
101 
102  bool oprs_pb_register_type(std::string full_name);
103  Term * oprs_pb_field_names(void *msgptr);
104  bool oprs_pb_has_field(void *msgptr, std::string field_name);
105  Term * oprs_pb_field_value(void *msgptr, std::string field_name);
106  Term * oprs_pb_field_type(void *msgptr, std::string field_name);
107  Term * oprs_pb_field_label(void *msgptr, std::string field_name);
108  Term * oprs_pb_field_list(void *msgptr, std::string field_name);
109  bool oprs_pb_field_is_list(void *msgptr, std::string field_name);
110  std::shared_ptr<google::protobuf::Message> * oprs_create_msg(std::string full_name);
111  Term * oprs_pb_ref(void *msgptr);
112  Term * oprs_pb_destroy(void *msgptr);
113  void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value);
114  void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value);
115  void oprs_pb_send(long int client_id, void *msgptr);
116  Term * oprs_pb_client_connect(std::string host, int port);
117  void oprs_pb_disconnect(long int client_id);
118  void oprs_pb_broadcast(long int peer_id, void *msgptr);
119  void oprs_pb_enable_server(int port);
120  void oprs_pb_disable_server();
121 
122  Term * oprs_pb_peer_create(std::string host, int port);
123  Term * oprs_pb_peer_create_local(std::string host,
124  int send_port, int recv_port);
125  Term * oprs_pb_peer_create_crypto(std::string host, int port,
126  std::string crypto_key = "", std::string cipher = "");
127  Term * oprs_pb_peer_create_local_crypto(std::string host,
128  int send_port, int recv_port,
129  std::string crypto_key = "", std::string cipher = "");
130  void oprs_pb_peer_destroy(long int peer_id);
131  void oprs_pb_peer_setup_crypto(long int peer_id,
132  std::string crypto_key, std::string cipher);
133 
134  bool oprs_pb_events_pending();
135  void oprs_pb_process();
136 
137  private:
138  typedef enum {
139  CT_SERVER, CT_CLIENT, CT_PEER
140  } ClientType;
141  void clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
142  uint16_t comp_id, uint16_t msg_type,
143  std::shared_ptr<google::protobuf::Message> &msg,
144  ClientType ct, unsigned int client_id = 0);
145  void handle_server_client_connected(protobuf_comm::ProtobufStreamServer::ClientID client,
146  boost::asio::ip::tcp::endpoint &endpoint);
147  void handle_server_client_disconnected(protobuf_comm::ProtobufStreamServer::ClientID client,
148  const boost::system::error_code &error);
149 
150  void handle_server_client_msg(protobuf_comm::ProtobufStreamServer::ClientID client,
151  uint16_t component_id, uint16_t msg_type,
152  std::shared_ptr<google::protobuf::Message> msg);
153 
154  void handle_server_client_fail(protobuf_comm::ProtobufStreamServer::ClientID client,
155  uint16_t component_id, uint16_t msg_type,
156  std::string msg);
157 
158  void handle_peer_msg(long int peer_id,
159  boost::asio::ip::udp::endpoint &endpoint,
160  uint16_t component_id, uint16_t msg_type,
161  std::shared_ptr<google::protobuf::Message> msg);
162  void handle_peer_recv_error(long int peer_id, boost::asio::ip::udp::endpoint &endpoint, std::string msg);
163  void handle_peer_send_error(long int peer_id, std::string msg);
164 
165  void handle_client_connected(long int client_id);
166  void handle_client_disconnected(long int client_id,
167  const boost::system::error_code &error);
168  void handle_client_msg(long int client_id,
169  uint16_t comp_id, uint16_t msg_type,
170  std::shared_ptr<google::protobuf::Message> msg);
171  void handle_client_receive_fail(long int client_id,
172  uint16_t comp_id, uint16_t msg_type, std::string msg);
173  void oprs_assert_message(std::string &endpoint_host, unsigned short endpoint_port,
174  uint16_t comp_id, uint16_t msg_type,
175  std::shared_ptr<google::protobuf::Message> &msg,
176  OpenPRSProtobuf::ClientType ct,
177  unsigned int client_id);
178  void oprs_assert_server_client_event(long int client_id,
179  std::string &host, unsigned short port, bool connect);
180  void oprs_assert_client_event(long int client_id, bool connect);
181 
182  private:
183  protobuf_comm::MessageRegister *message_register_;
185 
186  boost::signals2::signal<void (protobuf_comm::ProtobufStreamServer::ClientID,
187  std::shared_ptr<google::protobuf::Message>)> sig_server_sent_;
188  boost::signals2::signal<void (std::string, unsigned short,
189  std::shared_ptr<google::protobuf::Message>)> sig_client_sent_;
190  boost::signals2::signal<void (long int, std::shared_ptr<google::protobuf::Message>)> sig_peer_sent_;
191 
192  fawkes::Mutex map_mutex_;
193  long int next_client_id_;
194 
195 
196  std::map<long int, protobuf_comm::ProtobufStreamServer::ClientID> server_clients_;
197  typedef std::map<protobuf_comm::ProtobufStreamServer::ClientID, long int> RevServerClientMap;
198  RevServerClientMap rev_server_clients_;
199  std::map<long int, protobuf_comm::ProtobufStreamClient *> clients_;
200  std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> peers_;
201 
202  std::map<long int, std::pair<std::string, unsigned short>> client_endpoints_;
203 
205  std::tuple<std::string, unsigned short, uint16_t, uint16_t,
206  std::shared_ptr<google::protobuf::Message>, ClientType, unsigned int>> q_msgs_;
209 };
210 
211 } // end namespace protobuf_clips
212 
213 #endif
OpenPRS protobuf integration class.
Definition: oprs_protobuf.h:60
const std::map< long int, protobuf_comm::ProtobufBroadcastPeer * > & peers() const
Get protobuf_comm peers.
Definition: oprs_protobuf.h:74
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:68
Register to map msg type numbers to Protobuf messages.
protobuf_comm::MessageRegister & message_register()
Get the communicator&#39;s message register.
Definition: oprs_protobuf.h:79
protobuf_comm::ProtobufStreamServer * server() const
Get Protobuf server.
Definition: oprs_protobuf.h:68
boost::signals2::signal< void(std::string, unsigned short, std::shared_ptr< google::protobuf::Message >)> & signal_client_sent()
Signal invoked for a message that has been sent to a client.
Definition: oprs_protobuf.h:94
Queue with a lock.
Definition: lock_queue.h:43
Stream server for protobuf message transmission.
Definition: server.h:64
Mutex mutual exclusion lock.
Definition: mutex.h:32
boost::signals2::signal< void(long int, std::shared_ptr< google::protobuf::Message >)> & signal_peer_sent()
Signal invoked for a message that has been sent via broadcast.
boost::signals2::signal< void(protobuf_comm::ProtobufStreamServer::ClientID, std::shared_ptr< google::protobuf::Message >)> & signal_server_sent()
Signal invoked for a message that has been sent to a server client.
Definition: oprs_protobuf.h:87