Fawkes API  Fawkes Development Version
communicator.cpp
1 
2 /***************************************************************************
3  * communicator.cpp - protobuf network communication for CLIPS
4  *
5  * Created: Tue Apr 16 13:51:14 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <protobuf_clips/communicator.h>
38 
39 #include <core/threading/mutex_locker.h>
40 #include <logging/logger.h>
41 #include <protobuf_comm/client.h>
42 #include <protobuf_comm/server.h>
43 #include <protobuf_comm/peer.h>
44 
45 #include <google/protobuf/descriptor.h>
46 
47 #include <boost/format.hpp>
48 
49 using namespace google::protobuf;
50 using namespace protobuf_comm;
51 
52 namespace protobuf_clips {
53 #if 0 /* just to make Emacs auto-indent happy */
54 }
55 #endif
56 
57 /** @class ClipsProtobufCommunicator <protobuf_clips/communicator.h>
58  * CLIPS protobuf integration class.
59  * This class adds functionality related to protobuf to a given CLIPS
60  * environment. It supports the creation of communication channels
61  * through protobuf_comm. An instance maintains its own message register
62  * shared among server, peer, and clients.
63  * @author Tim Niemueller
64  */
65 
66 /** Constructor.
67  * @param env CLIPS environment to which to provide the protobuf functionality
68  * @param env_mutex mutex to lock when operating on the CLIPS environment.
69  * @param logger optional logger for informational output
70  */
71 ClipsProtobufCommunicator::ClipsProtobufCommunicator(CLIPS::Environment *env,
72  fawkes::Mutex &env_mutex,
73  fawkes::Logger *logger)
74  : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL),
75  next_client_id_(0)
76 {
77  message_register_ = new MessageRegister();
78  setup_clips();
79 }
80 
81 /** Constructor.
82  * @param env CLIPS environment to which to provide the protobuf functionality
83  * @param env_mutex mutex to lock when operating on the CLIPS environment.
84  * @param proto_path proto path passed to a newly instantiated message register
85  * @param logger optional logger for informational output
86  */
88  fawkes::Mutex &env_mutex,
89  std::vector<std::string> &proto_path,
90  fawkes::Logger *logger)
91  : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL),
92  next_client_id_(0)
93 {
94  message_register_ = new MessageRegister(proto_path);
95  setup_clips();
96 }
97 
98 
99 /** Destructor. */
101 {
102  {
103  fawkes::MutexLocker lock(&clips_mutex_);
104 
105  for (auto f : functions_) {
106  clips_->remove_function(f);
107  }
108  functions_.clear();
109  }
110 
111  for (auto c : clients_) {
112  delete c.second;
113  }
114  clients_.clear();
115 
116  delete message_register_;
117  delete server_;
118 }
119 
120 
121 #define ADD_FUNCTION(n, s) \
122  clips_->add_function(n, s); \
123  functions_.push_back(n);
124 
125 
126 /** Setup CLIPS environment. */
127 void
128 ClipsProtobufCommunicator::setup_clips()
129 {
130  fawkes::MutexLocker lock(&clips_mutex_);
131 
132  ADD_FUNCTION("pb-register-type", (sigc::slot<CLIPS::Value, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_register_type))));
133  ADD_FUNCTION("pb-field-names", (sigc::slot<CLIPS::Values, void *>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_names))));
134  ADD_FUNCTION("pb-field-type", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_type))));
135  ADD_FUNCTION("pb-has-field", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_has_field))));
136  ADD_FUNCTION("pb-field-label", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_label))));
137  ADD_FUNCTION("pb-field-value", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_value))));
138  ADD_FUNCTION("pb-field-list", (sigc::slot<CLIPS::Values, void *, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_list))));
139  ADD_FUNCTION("pb-field-is-list", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
140  ADD_FUNCTION("pb-create", (sigc::slot<CLIPS::Value, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_create))));
141  ADD_FUNCTION("pb-destroy", (sigc::slot<void, void *>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_destroy))));
142  ADD_FUNCTION("pb-ref", (sigc::slot<CLIPS::Value, void *>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_ref))));
143  ADD_FUNCTION("pb-set-field", (sigc::slot<void, void *, std::string, CLIPS::Value>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_set_field))));
144  ADD_FUNCTION("pb-add-list", (sigc::slot<void, void *, std::string, CLIPS::Value>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_add_list))));
145  ADD_FUNCTION("pb-send", (sigc::slot<void, long int, void *>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_send))));
146  ADD_FUNCTION("pb-tostring", (sigc::slot<std::string, void *>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_tostring))));
147  ADD_FUNCTION("pb-server-enable", (sigc::slot<void, int>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::enable_server))));
148  ADD_FUNCTION("pb-server-disable", (sigc::slot<void>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::disable_server))));
149  ADD_FUNCTION("pb-peer-create", (sigc::slot<long int, std::string, int>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
150  ADD_FUNCTION("pb-peer-create-local", (sigc::slot<long int, std::string, int, int>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
151  ADD_FUNCTION("pb-peer-create-crypto",
152  (sigc::slot<long int, std::string, int, std::string, std::string>
153  (sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
154  ADD_FUNCTION("pb-peer-create-local-crypto",
155  (sigc::slot<long int, std::string, int, int, std::string, std::string>
156  (sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
157  ADD_FUNCTION("pb-peer-destroy", (sigc::slot<void, long int>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
158  ADD_FUNCTION("pb-peer-setup-crypto", (sigc::slot<void, long int, std::string, std::string>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
159  ADD_FUNCTION("pb-broadcast", (sigc::slot<void, long int, void *>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
160  ADD_FUNCTION("pb-connect", (sigc::slot<long int, std::string, int>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
161  ADD_FUNCTION("pb-disconnect", (sigc::slot<void, long int>(sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
162 }
163 
164 /** Enable protobuf stream server.
165  * @param port TCP port to listen on for connections
166  */
167 void
169 {
170  if ((port > 0) && ! server_) {
171  server_ = new protobuf_comm::ProtobufStreamServer(port, message_register_);
172 
173  server_->signal_connected()
174  .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected, this, _1, _2));
175  server_->signal_disconnected()
176  .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected, this, _1, _2));
177  server_->signal_received()
178  .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg, this, _1, _2, _3, _4));
179  server_->signal_receive_failed()
180  .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail, this, _1, _2, _3, _4));
181  }
182 
183 }
184 
185 
186 /** Disable protobu stream server. */
187 void
189 {
190  delete server_;
191  server_ = NULL;
192 }
193 
194 
195 /** Enable protobuf peer.
196  * @param address IP address to send messages to
197  * @param send_port UDP port to send messages to
198  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
199  * @param crypto_key encryption key
200  * @param cipher cipher suite, see BufferEncryptor for supported types
201  * @return peer identifier
202  */
203 long int
204 ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address, int send_port, int recv_port,
205  std::string crypto_key, std::string cipher)
206 {
207  if (recv_port <= 0) recv_port = send_port;
208 
209  if (send_port > 0) {
211  new protobuf_comm::ProtobufBroadcastPeer(address, send_port, recv_port,
212  message_register_, crypto_key, cipher);
213 
214  long int peer_id;
215  {
216  fawkes::MutexLocker lock(&map_mutex_);
217  peer_id = ++next_client_id_;
218  peers_[peer_id] = peer;
219  }
220 
221  peer->signal_received()
222  .connect(boost::bind(&ClipsProtobufCommunicator::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
223  peer->signal_recv_error()
224  .connect(boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error, this, peer_id, _1, _2));
225  peer->signal_send_error()
226  .connect(boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error, this, peer_id, _1));
227 
228  return peer_id;
229  } else {
230  return 0;
231  }
232 }
233 
234 /** Enable protobuf peer.
235  * @param address IP address to send messages to
236  * @param port UDP port to send and receive messages
237  * @param crypto_key encryption key
238  * @param cipher cipher suite, see BufferEncryptor for supported types
239  * @return peer identifier
240  */
241 long int
242 ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address, int port,
243  std::string crypto_key, std::string cipher)
244 {
245  return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
246 }
247 
248 /** Enable protobuf peer.
249  * @param address IP address to send messages to
250  * @param port UDP port to send and receive messages
251  * @return peer identifier
252  */
253 long int
254 ClipsProtobufCommunicator::clips_pb_peer_create(std::string address, int port)
255 {
256  return clips_pb_peer_create_local_crypto(address, port, port);
257 }
258 
259 /** Enable protobuf peer.
260  * @param address IP address to send messages to
261  * @param send_port UDP port to send messages to
262  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
263  * @return peer identifier
264  */
265 long int
266 ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address, int send_port,
267  int recv_port)
268 {
269  return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
270 }
271 
272 
273 /** Disable peer.
274  * @param peer_id ID of the peer to destroy
275  */
276 void
277 ClipsProtobufCommunicator::clips_pb_peer_destroy(long int peer_id)
278 {
279  if (peers_.find(peer_id) != peers_.end()) {
280  delete peers_[peer_id];
281  peers_.erase(peer_id);
282  }
283 }
284 
285 
286 /** Setup crypto for peer.
287  * @param peer_id ID of the peer to destroy
288  * @param crypto_key encryption key
289  * @param cipher cipher suite, see BufferEncryptor for supported types
290  */
291 void
292 ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(long int peer_id,
293  std::string crypto_key, std::string cipher)
294 {
295  if (peers_.find(peer_id) != peers_.end()) {
296  peers_[peer_id]->setup_crypto(crypto_key, cipher);
297  }
298 }
299 
300 
301 /** Register a new message type.
302  * @param full_name full name of type to register
303  * @return true if the type was successfully registered, false otherwise
304  */
305 CLIPS::Value
306 ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
307 {
308  try {
309  message_register_->add_message_type(full_name);
310  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
311  } catch (std::runtime_error &e) {
312  if (logger_) {
313  logger_->log_error("CLIPS-Protobuf", "Registering type %s failed: %s",
314  full_name.c_str(), e.what());
315  }
316  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
317  }
318 }
319 
320 
321 
322 CLIPS::Value
323 ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
324 {
325  try {
326  std::shared_ptr<google::protobuf::Message> m =
327  message_register_->new_message_for(full_name);
328  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(m));
329  } catch (std::runtime_error &e) {
330  if (logger_) {
331  logger_->log_warn("CLIPS-Protobuf", "Cannot create message of type %s: %s",
332  full_name.c_str(), e.what());
333  }
334  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>());
335  }
336 }
337 
338 
339 CLIPS::Value
340 ClipsProtobufCommunicator::clips_pb_ref(void *msgptr)
341 {
342  std::shared_ptr<google::protobuf::Message> *m =
343  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
344  if (!*m) return new std::shared_ptr<google::protobuf::Message>();
345 
346  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(*m));
347 }
348 
349 
350 void
351 ClipsProtobufCommunicator::clips_pb_destroy(void *msgptr)
352 {
353  std::shared_ptr<google::protobuf::Message> *m =
354  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
355  if (!*m) return;
356 
357  delete m;
358 }
359 
360 
361 CLIPS::Values
362 ClipsProtobufCommunicator::clips_pb_field_names(void *msgptr)
363 {
364  std::shared_ptr<google::protobuf::Message> *m =
365  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
366  if (!*m) return CLIPS::Values();
367 
368  const Descriptor *desc = (*m)->GetDescriptor();
369  const int field_count = desc->field_count();
370  CLIPS::Values field_names(field_count);
371  for (int i = 0; i < field_count; ++i) {
372  field_names[i].set(desc->field(i)->name(), true);
373  }
374  return field_names;
375 }
376 
377 CLIPS::Value
378 ClipsProtobufCommunicator::clips_pb_field_type(void *msgptr, std::string field_name)
379 {
380  std::shared_ptr<google::protobuf::Message> *m =
381  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
382  if (!*m) return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
383 
384  const Descriptor *desc = (*m)->GetDescriptor();
385  const FieldDescriptor *field = desc->FindFieldByName(field_name);
386  if (! field) {
387  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
388  }
389  switch (field->type()) {
390  case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value("DOUBLE", CLIPS::TYPE_SYMBOL);
391  case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value("FLOAT", CLIPS::TYPE_SYMBOL);
392  case FieldDescriptor::TYPE_INT64: return CLIPS::Value("INT64", CLIPS::TYPE_SYMBOL);
393  case FieldDescriptor::TYPE_UINT64: return CLIPS::Value("UINT64", CLIPS::TYPE_SYMBOL);
394  case FieldDescriptor::TYPE_INT32: return CLIPS::Value("INT32", CLIPS::TYPE_SYMBOL);
395  case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value("FIXED64", CLIPS::TYPE_SYMBOL);
396  case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value("FIXED32", CLIPS::TYPE_SYMBOL);
397  case FieldDescriptor::TYPE_BOOL: return CLIPS::Value("BOOL", CLIPS::TYPE_SYMBOL);
398  case FieldDescriptor::TYPE_STRING: return CLIPS::Value("STRING", CLIPS::TYPE_SYMBOL);
399  case FieldDescriptor::TYPE_MESSAGE: return CLIPS::Value("MESSAGE", CLIPS::TYPE_SYMBOL);
400  case FieldDescriptor::TYPE_BYTES: return CLIPS::Value("BYTES", CLIPS::TYPE_SYMBOL);
401  case FieldDescriptor::TYPE_UINT32: return CLIPS::Value("UINT32", CLIPS::TYPE_SYMBOL);
402  case FieldDescriptor::TYPE_ENUM: return CLIPS::Value("ENUM", CLIPS::TYPE_SYMBOL);
403  case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value("SFIXED32", CLIPS::TYPE_SYMBOL);
404  case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value("SFIXED64", CLIPS::TYPE_SYMBOL);
405  case FieldDescriptor::TYPE_SINT32: return CLIPS::Value("SINT32", CLIPS::TYPE_SYMBOL);
406  case FieldDescriptor::TYPE_SINT64: return CLIPS::Value("SINT64", CLIPS::TYPE_SYMBOL);
407  default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
408  }
409 }
410 
411 CLIPS::Value
412 ClipsProtobufCommunicator::clips_pb_has_field(void *msgptr, std::string field_name)
413 {
414  std::shared_ptr<google::protobuf::Message> *m =
415  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
416  if (!*m) return false;
417 
418  const Descriptor *desc = (*m)->GetDescriptor();
419  const FieldDescriptor *field = desc->FindFieldByName(field_name);
420  if (! field) return false;
421 
422  const Reflection *refl = (*m)->GetReflection();
423 
424  if (field->is_repeated()) {
425  return CLIPS::Value((refl->FieldSize(**m, field) > 0) ? "TRUE" : "FALSE",
426  CLIPS::TYPE_SYMBOL);
427  } else if (field->is_optional()) {
428  return CLIPS::Value(refl->HasField(**m, field) ? "TRUE" : "FALSE",
429  CLIPS::TYPE_SYMBOL);
430  } else {
431  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
432  }
433 }
434 
435 
436 CLIPS::Value
437 ClipsProtobufCommunicator::clips_pb_field_label(void *msgptr, std::string field_name)
438 {
439  std::shared_ptr<google::protobuf::Message> *m =
440  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
441  if (!*m) return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
442 
443  const Descriptor *desc = (*m)->GetDescriptor();
444  const FieldDescriptor *field = desc->FindFieldByName(field_name);
445  if (! field) {
446  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
447  }
448  switch (field->label()) {
449  case FieldDescriptor::LABEL_OPTIONAL: return CLIPS::Value("OPTIONAL", CLIPS::TYPE_SYMBOL);
450  case FieldDescriptor::LABEL_REQUIRED: return CLIPS::Value("REQUIRED", CLIPS::TYPE_SYMBOL);
451  case FieldDescriptor::LABEL_REPEATED: return CLIPS::Value("REPEATED", CLIPS::TYPE_SYMBOL);
452  default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
453  }
454 }
455 
456 CLIPS::Value
457 ClipsProtobufCommunicator::clips_pb_field_value(void *msgptr, std::string field_name)
458 {
459  std::shared_ptr<google::protobuf::Message> *m =
460  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
461  if (!(m && *m)) {
462  if (logger_) {
463  logger_->log_warn("CLIPS-Protobuf", "Invalid message when setting %s", field_name.c_str());
464  }
465  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
466  }
467 
468  const Descriptor *desc = (*m)->GetDescriptor();
469  const FieldDescriptor *field = desc->FindFieldByName(field_name);
470  if (! field) {
471  if (logger_) {
472  logger_->log_warn("CLIPS-Protobuf", "Field %s of %s does not exist",
473  field_name.c_str(), (*m)->GetTypeName().c_str());
474  }
475  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
476  }
477  const Reflection *refl = (*m)->GetReflection();
478  if (field->type() != FieldDescriptor::TYPE_MESSAGE && ! refl->HasField(**m, field)) {
479  if (logger_) {
480  logger_->log_warn("CLIPS-Protobuf", "Field %s of %s not set",
481  field_name.c_str(), (*m)->GetTypeName().c_str());
482  }
483  return CLIPS::Value("NOT-SET", CLIPS::TYPE_SYMBOL);
484  }
485  switch (field->type()) {
486  case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value(refl->GetDouble(**m, field));
487  case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value(refl->GetFloat(**m, field));
488  case FieldDescriptor::TYPE_INT64: return CLIPS::Value(refl->GetInt64(**m, field));
489  case FieldDescriptor::TYPE_UINT64:
490  return CLIPS::Value((long int)refl->GetUInt64(**m, field));
491  case FieldDescriptor::TYPE_INT32: return CLIPS::Value(refl->GetInt32(**m, field));
492  case FieldDescriptor::TYPE_FIXED64:
493  return CLIPS::Value((long int)refl->GetUInt64(**m, field));
494  case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value(refl->GetUInt32(**m, field));
495  case FieldDescriptor::TYPE_BOOL:
496  //Booleans are represented as Symbols in CLIPS
497  if(refl->GetBool(**m, field)){
498  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
499  }
500  else{
501  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
502  }
503  case FieldDescriptor::TYPE_STRING: return CLIPS::Value(refl->GetString(**m, field));
504  case FieldDescriptor::TYPE_MESSAGE:
505  {
506  const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
507  google::protobuf::Message *mcopy = mfield.New();
508  mcopy->CopyFrom(mfield);
509  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
510  return CLIPS::Value(ptr);
511  }
512  case FieldDescriptor::TYPE_BYTES: return CLIPS::Value((char *)"bytes");
513  case FieldDescriptor::TYPE_UINT32: return CLIPS::Value(refl->GetUInt32(**m, field));
514  case FieldDescriptor::TYPE_ENUM:
515  return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
516  case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value(refl->GetInt32(**m, field));
517  case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value(refl->GetInt64(**m, field));
518  case FieldDescriptor::TYPE_SINT32: return CLIPS::Value(refl->GetInt32(**m, field));
519  case FieldDescriptor::TYPE_SINT64: return CLIPS::Value(refl->GetInt64(**m, field));
520  default:
521  throw std::logic_error("Unknown protobuf field type encountered");
522  }
523 }
524 
525 
526 void
527 ClipsProtobufCommunicator::clips_pb_set_field(void *msgptr, std::string field_name, CLIPS::Value value)
528 {
529  std::shared_ptr<google::protobuf::Message> *m =
530  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
531  if (!(m && *m)) return;
532 
533  const Descriptor *desc = (*m)->GetDescriptor();
534  const FieldDescriptor *field = desc->FindFieldByName(field_name);
535  if (! field) {
536  if (logger_) {
537  logger_->log_warn("CLIPS-Protobuf",
538  "Could not find field %s", field_name.c_str());
539  }
540  return;
541  }
542  const Reflection *refl = (*m)->GetReflection();
543 
544  try {
545  switch (field->type()) {
546  case FieldDescriptor::TYPE_DOUBLE:
547  refl->SetDouble(m->get(), field, value.as_float()); break;
548  case FieldDescriptor::TYPE_FLOAT:
549  refl->SetFloat(m->get(), field, value.as_float()); break;
550  case FieldDescriptor::TYPE_SFIXED64:
551  case FieldDescriptor::TYPE_SINT64:
552  case FieldDescriptor::TYPE_INT64:
553  refl->SetInt64(m->get(), field, value.as_integer()); break;
554  case FieldDescriptor::TYPE_FIXED64:
555  case FieldDescriptor::TYPE_UINT64:
556  refl->SetUInt64(m->get(), field, value.as_integer()); break;
557  case FieldDescriptor::TYPE_SFIXED32:
558  case FieldDescriptor::TYPE_SINT32:
559  case FieldDescriptor::TYPE_INT32:
560  refl->SetInt32(m->get(), field, value.as_integer()); break;
561  case FieldDescriptor::TYPE_BOOL:
562  refl->SetBool(m->get(), field, (value == "TRUE"));
563  break;
564  case FieldDescriptor::TYPE_STRING:
565  refl->SetString(m->get(), field, value.as_string()); break;
566  case FieldDescriptor::TYPE_MESSAGE:
567  {
568  std::shared_ptr<google::protobuf::Message> *mfrom =
569  static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
570  Message *mut_msg = refl->MutableMessage(m->get(), field);
571  mut_msg->CopyFrom(**mfrom);
572  delete mfrom;
573  }
574  break;
575  case FieldDescriptor::TYPE_BYTES: break;
576  case FieldDescriptor::TYPE_FIXED32:
577  case FieldDescriptor::TYPE_UINT32:
578  refl->SetUInt32(m->get(), field, value.as_integer()); break;
579  case FieldDescriptor::TYPE_ENUM:
580  {
581  const EnumDescriptor *enumdesc = field->enum_type();
582  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
583  if (enumval) {
584  refl->SetEnum(m->get(), field, enumval);
585  } else {
586  if (logger_) {
587  logger_->log_warn("CLIPS-Protobuf", "%s: cannot set invalid "
588  "enum value '%s' on '%s'",
589  (*m)->GetTypeName().c_str(),
590  value.as_string().c_str(), field_name.c_str());
591  }
592  }
593  }
594  break;
595  default:
596  throw std::logic_error("Unknown protobuf field type encountered");
597  }
598  } catch (std::logic_error &e) {
599  if (logger_) {
600  logger_->log_warn("CLIPS-Protobuf", "Failed to set field %s of %s: %s "
601  "(type %d, as string %s)",
602  field_name.c_str(), (*m)->GetTypeName().c_str(), e.what(),
603  value.type(), to_string(value).c_str());
604  }
605  }
606 }
607 
608 
609 void
610 ClipsProtobufCommunicator::clips_pb_add_list(void *msgptr, std::string field_name, CLIPS::Value value)
611 {
612  std::shared_ptr<google::protobuf::Message> *m =
613  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
614  if (!(m && *m)) return;
615 
616  const Descriptor *desc = (*m)->GetDescriptor();
617  const FieldDescriptor *field = desc->FindFieldByName(field_name);
618  if (! field) {
619  if (logger_) {
620  logger_->log_warn("CLIPS-Protobuf", "Could not find field %s",
621  field_name.c_str());
622  }
623  return;
624  }
625  const Reflection *refl = (*m)->GetReflection();
626 
627  try {
628  switch (field->type()) {
629  case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value); break;
630  case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value); break;
631  case FieldDescriptor::TYPE_SFIXED64:
632  case FieldDescriptor::TYPE_SINT64:
633  case FieldDescriptor::TYPE_INT64:
634  refl->AddInt64(m->get(), field, value); break;
635  case FieldDescriptor::TYPE_FIXED64:
636  case FieldDescriptor::TYPE_UINT64:
637  refl->AddUInt64(m->get(), field, (long int)value); break;
638  case FieldDescriptor::TYPE_SFIXED32:
639  case FieldDescriptor::TYPE_SINT32:
640  case FieldDescriptor::TYPE_INT32:
641  refl->AddInt32(m->get(), field, value); break;
642  case FieldDescriptor::TYPE_BOOL:
643  refl->AddBool(m->get(), field, (value == "TRUE"));
644  break;
645  case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value); break;
646  case FieldDescriptor::TYPE_MESSAGE:
647  {
648  std::shared_ptr<google::protobuf::Message> *mfrom =
649  static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
650  Message *new_msg = refl->AddMessage(m->get(), field);
651  new_msg->CopyFrom(**mfrom);
652  delete mfrom;
653  }
654  break;
655  case FieldDescriptor::TYPE_BYTES: break;
656  case FieldDescriptor::TYPE_FIXED32:
657  case FieldDescriptor::TYPE_UINT32:
658  refl->AddUInt32(m->get(), field, value); break;
659  case FieldDescriptor::TYPE_ENUM:
660  {
661  const EnumDescriptor *enumdesc = field->enum_type();
662  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
663  if (enumval) refl->AddEnum(m->get(), field, enumval);
664  }
665  break;
666  default:
667  throw std::logic_error("Unknown protobuf field type encountered");
668  }
669  } catch (std::logic_error &e) {
670  if (logger_) {
671  logger_->log_warn("CLIPS-Protobuf", "Failed to add field %s of %s: %s",
672  field_name.c_str(), (*m)->GetTypeName().c_str(), e.what());
673  }
674  }
675 }
676 
677 
678 long int
679 ClipsProtobufCommunicator::clips_pb_client_connect(std::string host, int port)
680 {
681  if (port <= 0) return false;
682 
683  ProtobufStreamClient *client = new ProtobufStreamClient(message_register_);
684 
685  long int client_id;
686  {
687  fawkes::MutexLocker lock(&map_mutex_);
688  client_id = ++next_client_id_;
689  clients_[client_id] = client;
690  }
691 
692  client->signal_connected().connect(
693  boost::bind(&ClipsProtobufCommunicator::handle_client_connected, this, client_id));
694  client->signal_disconnected().connect(
695  boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
696  this, client_id, boost::asio::placeholders::error));
697  client->signal_received().connect(
698  boost::bind(&ClipsProtobufCommunicator::handle_client_msg, this, client_id, _1, _2, _3));
699  client->signal_receive_failed().connect(
700  boost::bind(&ClipsProtobufCommunicator::handle_client_receive_fail, this, client_id, _1, _2, _3));
701 
702  client->async_connect(host.c_str(), port);
703  return CLIPS::Value(client_id);
704 }
705 
706 
707 void
708 ClipsProtobufCommunicator::clips_pb_send(long int client_id, void *msgptr)
709 {
710  std::shared_ptr<google::protobuf::Message> *m =
711  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
712  if (!(m && *m)) {
713  if (logger_) {
714  logger_->log_warn("CLIPS-Protobuf",
715  "Cannot send to %li: invalid message", client_id);
716  }
717  return;
718  }
719 
720  try {
721  fawkes::MutexLocker lock(&map_mutex_);
722 
723  if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
724  //printf("***** SENDING via SERVER\n");
725  server_->send(server_clients_[client_id], *m);
726  sig_server_sent_(server_clients_[client_id], *m);
727  } else if (clients_.find(client_id) != clients_.end()) {
728  //printf("***** SENDING via CLIENT\n");
729  clients_[client_id]->send(*m);
730  std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
731  sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
732  } else if (peers_.find(client_id) != peers_.end()) {
733  //printf("***** SENDING via CLIENT\n");
734  peers_[client_id]->send(*m);
735  sig_peer_sent_(client_id, *m);
736  } else {
737  //printf("Client ID %li is unknown, cannot send message of type %s\n",
738  // client_id, (*m)->GetTypeName().c_str());
739  }
740  } catch (google::protobuf::FatalException &e) {
741  if (logger_) {
742  logger_->log_warn("CLIPS-Profobuf", "Failed to send message of type %s: %s",
743  (*m)->GetTypeName().c_str(), e.what());
744  }
745  } catch (fawkes::Exception &e) {
746  if (logger_) {
747  logger_->log_warn("CLIPS-Protobuf", "Failed to send message of type %s: %s",
748  (*m)->GetTypeName().c_str(), e.what_no_backtrace());
749  }
750  } catch (std::runtime_error &e) {
751  if (logger_) {
752  logger_->log_warn("CLIPS-Protobuf", "Failed to send message of type %s: %s",
753  (*m)->GetTypeName().c_str(), e.what());
754  }
755  }
756 }
757 
758 std::string
759 ClipsProtobufCommunicator::clips_pb_tostring(void *msgptr)
760 {
761  std::shared_ptr<google::protobuf::Message> *m =
762  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
763  if (!(m && *m)) {
764  if (logger_) {
765  logger_->log_warn("CLIPS-Protobuf",
766  "Cannot convert message to string: invalid message");
767  }
768  return "";
769  }
770 
771  return (*m)->DebugString();
772 }
773 
774 
775 void
776 ClipsProtobufCommunicator::clips_pb_broadcast(long int peer_id, void *msgptr)
777 {
778  std::shared_ptr<google::protobuf::Message> *m =
779  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
780  if (!(m && *m)) {
781  if (logger_) {
782  logger_->log_warn("CLIPS-Protobuf", "Cannot send broadcast: invalid message");
783  }
784  return;
785  }
786 
787  fawkes::MutexLocker lock(&map_mutex_);
788  if (peers_.find(peer_id) == peers_.end()) return;
789 
790  //logger_->log_info("CLIPS-Protobuf", "Broadcasting %s", (*m)->GetTypeName().c_str());
791  try {
792  peers_[peer_id]->send(*m);
793  } catch (google::protobuf::FatalException &e) {
794  if (logger_) {
795  logger_->log_warn("CLIPS-Protobuf",
796  "Failed to broadcast message of type %s: %s",
797  (*m)->GetTypeName().c_str(), e.what());
798  }
799  } catch (fawkes::Exception &e) {
800  if (logger_) {
801  logger_->log_warn("CLIPS-Protobuf",
802  "Failed to broadcast message of type %s: %s",
803  (*m)->GetTypeName().c_str(), e.what_no_backtrace());
804  }
805  } catch (std::runtime_error &e) {
806  if (logger_) {
807  logger_->log_warn("CLIPS-Protobuf",
808  "Failed to broadcast message of type %s: %s",
809  (*m)->GetTypeName().c_str(), e.what());
810  }
811  }
812 
813  sig_peer_sent_(peer_id, *m);
814 }
815 
816 
817 void
818 ClipsProtobufCommunicator::clips_pb_disconnect(long int client_id)
819 {
820  //logger_->log_info("CLIPS-Protobuf", "Disconnecting client %li", client_id);
821 
822  try {
823  fawkes::MutexLocker lock(&map_mutex_);
824 
825  if (server_clients_.find(client_id) != server_clients_.end()) {
826  protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
827  server_->disconnect(srv_client);
828  server_clients_.erase(client_id);
829  rev_server_clients_.erase(srv_client);
830  } else if (clients_.find(client_id) != clients_.end()) {
831  delete clients_[client_id];
832  clients_.erase(client_id);
833  }
834  } catch (std::runtime_error &e) {
835  if (logger_) {
836  logger_->log_warn("CLIPS-Protobuf",
837  "Failed to disconnect from client %li: %s",
838  client_id, e.what());
839  }
840  }
841 }
842 
843 CLIPS::Values
844 ClipsProtobufCommunicator::clips_pb_field_list(void *msgptr, std::string field_name)
845 {
846  std::shared_ptr<google::protobuf::Message> *m =
847  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
848  if (!(m && *m)) return CLIPS::Values(1, CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
849 
850  const Descriptor *desc = (*m)->GetDescriptor();
851  const FieldDescriptor *field = desc->FindFieldByName(field_name);
852  if (! field) {
853  return CLIPS::Values(1, CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
854  }
855  if (field->label() == FieldDescriptor::LABEL_REQUIRED ||
856  field->label() == FieldDescriptor::LABEL_OPTIONAL)
857  {
858  CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
859  return rv;
860  }
861 
862  const Reflection *refl = (*m)->GetReflection();
863  int field_size = refl->FieldSize(**m, field);
864  CLIPS::Values rv(field_size);
865  for (int i = 0; i < field_size; ++i) {
866  switch (field->type()) {
867  case FieldDescriptor::TYPE_DOUBLE:
868  rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
869  break;
870  case FieldDescriptor::TYPE_FLOAT:
871  rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
872  break;
873  break;
874  case FieldDescriptor::TYPE_UINT64:
875  case FieldDescriptor::TYPE_FIXED64:
876  rv[i] = CLIPS::Value((long int)refl->GetRepeatedUInt64(**m, field, i));
877  break;
878  case FieldDescriptor::TYPE_UINT32:
879  case FieldDescriptor::TYPE_FIXED32:
880  rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
881  break;
882  case FieldDescriptor::TYPE_BOOL:
883  //Booleans are represented as Symbols in CLIPS
884  if(refl->GetRepeatedBool(**m, field, i)){
885  rv[i] = CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
886  }
887  else{
888  rv[i] = CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
889  }
890  break;
891  case FieldDescriptor::TYPE_STRING:
892  rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
893  break;
894  case FieldDescriptor::TYPE_MESSAGE:
895  {
896  const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
897  google::protobuf::Message *mcopy = msg.New();
898  mcopy->CopyFrom(msg);
899  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
900  rv[i] = CLIPS::Value(ptr);
901  }
902  break;
903  case FieldDescriptor::TYPE_BYTES:
904  rv[i] = CLIPS::Value((char *)"BYTES", CLIPS::TYPE_SYMBOL);
905  break;
906  case FieldDescriptor::TYPE_ENUM:
907  rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
908  break;
909  case FieldDescriptor::TYPE_SFIXED32:
910  case FieldDescriptor::TYPE_INT32:
911  case FieldDescriptor::TYPE_SINT32:
912  rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
913  break;
914  case FieldDescriptor::TYPE_SFIXED64:
915  case FieldDescriptor::TYPE_SINT64:
916  case FieldDescriptor::TYPE_INT64:
917  rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
918  break;
919  default:
920  throw std::logic_error("Unknown protobuf field type encountered");
921  }
922  }
923 
924  return rv;
925 }
926 
927 
928 CLIPS::Value
929 ClipsProtobufCommunicator::clips_pb_field_is_list(void *msgptr, std::string field_name)
930 {
931  std::shared_ptr<google::protobuf::Message> *m =
932  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
933  if (!(m && *m)) return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
934 
935  const Descriptor *desc = (*m)->GetDescriptor();
936  const FieldDescriptor *field = desc->FindFieldByName(field_name);
937  if (! field) return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
938  return CLIPS::Value(field->is_repeated() ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
939 }
940 
941 
942 void
943 ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
944  uint16_t comp_id, uint16_t msg_type,
945  std::shared_ptr<google::protobuf::Message> &msg,
946  ClipsProtobufCommunicator::ClientType ct,
947  unsigned int client_id)
948 {
949  CLIPS::Template::pointer temp = clips_->get_template("protobuf-msg");
950  if (temp) {
951  struct timeval tv;
952  gettimeofday(&tv, 0);
953  void *ptr = new std::shared_ptr<google::protobuf::Message>(msg);
954  CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
955  fact->set_slot("type", msg->GetTypeName());
956  fact->set_slot("comp-id", comp_id);
957  fact->set_slot("msg-type", msg_type);
958  fact->set_slot("rcvd-via",
959  CLIPS::Value((ct == CT_PEER) ? "BROADCAST" : "STREAM", CLIPS::TYPE_SYMBOL));
960  CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
961  rcvd_at[0] = tv.tv_sec;
962  rcvd_at[1] = tv.tv_usec;
963  fact->set_slot("rcvd-at", rcvd_at);
964  CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
965  host_port[0] = endpoint.first;
966  host_port[1] = CLIPS::Value(endpoint.second);
967  fact->set_slot("rcvd-from", host_port);
968  fact->set_slot("client-type",
969  CLIPS::Value(ct == CT_CLIENT ? "CLIENT" :
970  (ct == CT_SERVER ? "SERVER" : "PEER"), CLIPS::TYPE_SYMBOL));
971  fact->set_slot("client-id", client_id);
972  fact->set_slot("ptr", CLIPS::Value(ptr));
973  CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
974 
975  if (! new_fact) {
976  if (logger_) {
977  logger_->log_warn("CLIPS-Protobuf", "Asserting protobuf-msg fact failed");
978  }
979  delete static_cast<std::shared_ptr<google::protobuf::Message> *>(ptr);
980  }
981  } else {
982  if (logger_) {
983  logger_->log_warn("CLIPS-Protobuf",
984  "Did not get template, did you load protobuf.clp?");
985  }
986  }
987 }
988 
989 void
990 ClipsProtobufCommunicator::handle_server_client_connected(ProtobufStreamServer::ClientID client,
991  boost::asio::ip::tcp::endpoint &endpoint)
992 {
993 
994  long int client_id = -1;
995  {
996  fawkes::MutexLocker lock(&map_mutex_);
997  client_id = ++next_client_id_;
998  client_endpoints_[client_id] =
999  std::make_pair(endpoint.address().to_string(), endpoint.port());
1000  server_clients_[client_id] = client;
1001  rev_server_clients_[client] = client_id;
1002  }
1003 
1004  fawkes::MutexLocker lock(&clips_mutex_);
1005  clips_->assert_fact_f("(protobuf-server-client-connected %li %s %u)", client_id,
1006  endpoint.address().to_string().c_str(), endpoint.port());
1007 }
1008 
1009 
1010 void
1011 ClipsProtobufCommunicator::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1012  const boost::system::error_code &error)
1013 {
1014  long int client_id = -1;
1015  {
1016  fawkes::MutexLocker lock(&map_mutex_);
1017  RevServerClientMap::iterator c;
1018  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1019  client_id = c->second;
1020  rev_server_clients_.erase(c);
1021  server_clients_.erase(client_id);
1022  }
1023  }
1024 
1025  if (client_id >= 0) {
1026  fawkes::MutexLocker lock(&clips_mutex_);
1027  clips_->assert_fact_f("(protobuf-server-client-disconnected %li)", client_id);
1028  }
1029 }
1030 
1031 
1032 /** Handle message that came from a client.
1033  * @param client client ID
1034  * @param component_id component the message was addressed to
1035  * @param msg_type type of the message
1036  * @param msg the message
1037  */
1038 void
1039 ClipsProtobufCommunicator::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1040  uint16_t component_id, uint16_t msg_type,
1041  std::shared_ptr<google::protobuf::Message> msg)
1042 {
1043  fawkes::MutexLocker lock(&clips_mutex_);
1044  fawkes::MutexLocker lock2(&map_mutex_);
1045  RevServerClientMap::iterator c;
1046  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1047  clips_assert_message(client_endpoints_[c->second],
1048  component_id, msg_type, msg, CT_SERVER, c->second);
1049  }
1050 }
1051 
1052 /** Handle server reception failure
1053  * @param client client ID
1054  * @param component_id component the message was addressed to
1055  * @param msg_type type of the message
1056  * @param msg the message string
1057  */
1058 void
1059 ClipsProtobufCommunicator::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1060  uint16_t component_id, uint16_t msg_type,
1061  std::string msg)
1062 {
1063  fawkes::MutexLocker lock(&map_mutex_);
1064  RevServerClientMap::iterator c;
1065  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1066  fawkes::MutexLocker lock(&clips_mutex_);
1067  clips_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1068  "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1069  "(rcvd-from (\"%s\" %u)))",
1070  component_id, msg_type, c->second, msg.c_str(),
1071  client_endpoints_[c->second].first.c_str(),
1072  client_endpoints_[c->second].second);
1073  }
1074 }
1075 
1076 
1077 /** Handle message that came from a peer/robot
1078  * @param endpoint the endpoint from which the message was received
1079  * @param component_id component the message was addressed to
1080  * @param msg_type type of the message
1081  * @param msg the message
1082  */
1083 void
1084 ClipsProtobufCommunicator::handle_peer_msg(long int peer_id,
1085  boost::asio::ip::udp::endpoint &endpoint,
1086  uint16_t component_id, uint16_t msg_type,
1087  std::shared_ptr<google::protobuf::Message> msg)
1088 {
1089  fawkes::MutexLocker lock(&clips_mutex_);
1090  std::pair<std::string, unsigned short> endpp =
1091  std::make_pair(endpoint.address().to_string(), endpoint.port());
1092  clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1093 }
1094 
1095 
1096 /** Handle error during peer message processing.
1097  * @param endpoint endpoint of incoming message
1098  * @param msg error message
1099  */
1100 void
1101 ClipsProtobufCommunicator::handle_peer_recv_error(long int peer_id,
1102  boost::asio::ip::udp::endpoint &endpoint, std::string msg)
1103 {
1104  if (logger_) {
1105  logger_->log_warn("CLIPS-Protobuf",
1106  "Failed to receive peer message from %s:%u: %s",
1107  endpoint.address().to_string().c_str(), endpoint.port(),
1108  msg.c_str());
1109  }
1110 }
1111 
1112 /** Handle error during peer message processing.
1113  * @param msg error message
1114  */
1115 void
1116 ClipsProtobufCommunicator::handle_peer_send_error(long int peer_id, std::string msg)
1117 {
1118  if (logger_) {
1119  logger_->log_warn("CLIPS-Protobuf",
1120  "Failed to send peer message: %s", msg.c_str());
1121  }
1122 }
1123 
1124 
1125 void
1126 ClipsProtobufCommunicator::handle_client_connected(long int client_id)
1127 {
1128  fawkes::MutexLocker lock(&clips_mutex_);
1129  clips_->assert_fact_f("(protobuf-client-connected %li)", client_id);
1130 }
1131 
1132 void
1133 ClipsProtobufCommunicator::handle_client_disconnected(long int client_id,
1134  const boost::system::error_code &error)
1135 {
1136  fawkes::MutexLocker lock(&clips_mutex_);
1137  clips_->assert_fact_f("(protobuf-client-disconnected %li)", client_id);
1138 }
1139 
1140 void
1141 ClipsProtobufCommunicator::handle_client_msg(long int client_id,
1142  uint16_t comp_id, uint16_t msg_type,
1143  std::shared_ptr<google::protobuf::Message> msg)
1144 {
1145  fawkes::MutexLocker lock(&clips_mutex_);
1146  std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1147  clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1148 }
1149 
1150 
1151 void
1152 ClipsProtobufCommunicator::handle_client_receive_fail(long int client_id,
1153  uint16_t comp_id, uint16_t msg_type, std::string msg)
1154 {
1155  fawkes::MutexLocker lock(&clips_mutex_);
1156  clips_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1157  "(comp-id %u) (msg-type %u) (message \"%s\"))",
1158  client_id, comp_id, msg_type, msg.c_str());
1159 }
1160 
1161 std::string
1162 ClipsProtobufCommunicator::to_string(const CLIPS::Value &v)
1163 {
1164  switch (v.type()) {
1165  case CLIPS::TYPE_UNKNOWN: return "Unknown Type";
1166  case CLIPS::TYPE_FLOAT: return std::to_string(v.as_float());
1167  case CLIPS::TYPE_INTEGER: return std::to_string(v.as_integer());
1168  case CLIPS::TYPE_SYMBOL:
1169  case CLIPS::TYPE_INSTANCE_NAME:
1170  case CLIPS::TYPE_STRING: return v.as_string();
1171  case CLIPS::TYPE_INSTANCE_ADDRESS:
1172  case CLIPS::TYPE_EXTERNAL_ADDRESS:
1173  return boost::str(boost::format("%p") % v.as_address());
1174  }
1175  return "Implicit unknown type";
1176 }
1177 
1178 } // end namespace protobuf_clips
void enable_server(int port)
Enable protobuf stream server.
Mutex locking helper.
Definition: mutex_locker.h:33
void disable_server()
Disable protobu stream server.
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:68
Register to map msg type numbers to Protobuf messages.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
Definition: server.h:120
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:450
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
Communicate by broadcasting protobuf messages.
Definition: peer.h:60
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
Definition: peer.h:152
Base class for exceptions in Fawkes.
Definition: exception.h:36
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: client.h:93
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
Definition: peer.h:132
std::shared_ptr< google::protobuf::Message > new_message_for(uint16_t component_id, uint16_t msg_type)
Create a new message instance.
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: client.h:99
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:686
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
Definition: client.cpp:154
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:324
Stream server for protobuf message transmission.
Definition: server.h:64
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
Definition: client.h:105
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
Definition: server.h:114
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
Definition: peer.h:146
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: server.h:108
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Definition: client.h:111
Mutex mutual exclusion lock.
Definition: mutex.h:32
Stream client for protobuf message transmission.
Definition: client.h:59
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: server.h:102
void add_message_type(std::string msg_type)
Add a message type from generated pool.
Interface for logging.
Definition: logger.h:34