Fawkes API  Fawkes Development Version
oprs_protobuf.cpp
1 
2 /***************************************************************************
3  * oprs_protobuf.cpp - protobuf network communication for OpenPRS
4  *
5  * Created: Tue Sep 02 16:53:26 2014 (based on CLIPS version)
6  * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "oprs_protobuf.h"
38 
39 #include <core/threading/mutex_locker.h>
40 #include <core/exception.h>
41 #include <protobuf_comm/client.h>
42 #include <protobuf_comm/server.h>
43 #include <protobuf_comm/peer.h>
44 
45 #include <oprs_f-pub.h>
46 #include <google/protobuf/descriptor.h>
47 
48 #include <algorithm>
49 
50 using namespace google::protobuf;
51 using namespace protobuf_comm;
52 
53 namespace oprs_protobuf {
54 #if 0 /* just to make Emacs auto-indent happy */
55 }
56 #endif
57 
58 /** @class OpenPRSProtobuf "oprs_protobuf.h"
59  * OpenPRS protobuf integration class.
60  * This class adds functionality related to protobuf to OpenPRS.
61  * It supports the creation of communication channels through protobuf_comm.
62  * An instance maintains its own message register shared among server, peer,
63  * and clients.
64  * @author Tim Niemueller
65  */
66 
67 /** Constructor.
68  * @param proto_path proto path passed to a newly instantiated message register
69  */
70 OpenPRSProtobuf::OpenPRSProtobuf(std::vector<std::string> &proto_path)
71  : server_(NULL), next_client_id_(0)
72 {
73  message_register_ = new MessageRegister(proto_path);
74 }
75 
76 
77 /** Destructor. */
79 {
80  for (auto c : clients_) {
81  delete c.second;
82  }
83  clients_.clear();
84 
85  delete message_register_;
86  delete server_;
87 }
88 
89 
90 /** Enable protobuf stream server.
91  * @param port TCP port to listen on for connections
92  */
93 void
95 {
96  if ((port > 0) && ! server_) {
97  server_ = new protobuf_comm::ProtobufStreamServer(port, message_register_);
98 
99  server_->signal_connected()
100  .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_connected, this, _1, _2));
101  server_->signal_disconnected()
102  .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_disconnected, this, _1, _2));
103  server_->signal_received()
104  .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_msg, this, _1, _2, _3, _4));
105  server_->signal_receive_failed()
106  .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_fail, this, _1, _2, _3, _4));
107  }
108 
109 }
110 
111 
112 /** Disable protobuf stream server. */
113 void
115 {
116  delete server_;
117  server_ = NULL;
118 }
119 
120 
121 /** Enable protobuf peer.
122  * @param address IP address to send messages to
123  * @param send_port UDP port to send messages to
124  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
125  * @param crypto_key encryption key
126  * @param cipher cipher suite, see BufferEncryptor for supported types
127  * @return peer identifier
128  */
129 Term *
130 OpenPRSProtobuf::oprs_pb_peer_create_local_crypto(std::string address, int send_port, int recv_port,
131  std::string crypto_key, std::string cipher)
132 {
133  if (recv_port <= 0) recv_port = send_port;
134 
135  if (send_port > 0) {
137  new protobuf_comm::ProtobufBroadcastPeer(address, send_port, recv_port,
138  message_register_, crypto_key, cipher);
139 
140  long int peer_id;
141  {
142  fawkes::MutexLocker lock(&map_mutex_);
143  peer_id = ++next_client_id_;
144  peers_[peer_id] = peer;
145  }
146 
147  peer->signal_received()
148  .connect(boost::bind(&OpenPRSProtobuf::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
149  peer->signal_recv_error()
150  .connect(boost::bind(&OpenPRSProtobuf::handle_peer_recv_error, this, peer_id, _1, _2));
151  peer->signal_send_error()
152  .connect(boost::bind(&OpenPRSProtobuf::handle_peer_send_error, this, peer_id, _1));
153 
154  return build_long_long(peer_id);
155  } else {
156  return build_long_long(0);
157  }
158 }
159 
160 /** Enable protobuf peer.
161  * @param address IP address to send messages to
162  * @param port UDP port to send and receive messages
163  * @param crypto_key encryption key
164  * @param cipher cipher suite, see BufferEncryptor for supported types
165  * @return peer identifier
166  */
167 Term *
168 OpenPRSProtobuf::oprs_pb_peer_create_crypto(std::string address, int port,
169  std::string crypto_key, std::string cipher)
170 {
171  return oprs_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
172 }
173 
174 /** Enable protobuf peer.
175  * @param address IP address to send messages to
176  * @param port UDP port to send and receive messages
177  * @return peer identifier
178  */
179 Term *
180 OpenPRSProtobuf::oprs_pb_peer_create(std::string address, int port)
181 {
182  return oprs_pb_peer_create_local_crypto(address, port, port);
183 }
184 
185 /** Enable protobuf peer.
186  * @param address IP address to send messages to
187  * @param send_port UDP port to send messages to
188  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
189  * @return peer identifier
190  */
191 Term *
192 OpenPRSProtobuf::oprs_pb_peer_create_local(std::string address, int send_port,
193  int recv_port)
194 {
195  return oprs_pb_peer_create_local_crypto(address, send_port, recv_port);
196 }
197 
198 
199 /** Disable peer.
200  * @param peer_id ID of the peer to destroy
201  */
202 void
204 {
205  if (peers_.find(peer_id) != peers_.end()) {
206  delete peers_[peer_id];
207  peers_.erase(peer_id);
208  }
209 }
210 
211 
212 /** Setup crypto for peer.
213  * @param peer_id ID of the peer to destroy
214  * @param crypto_key encryption key
215  * @param cipher cipher suite, see BufferEncryptor for supported types
216  */
217 void
219  std::string crypto_key, std::string cipher)
220 {
221  if (peers_.find(peer_id) != peers_.end()) {
222  peers_[peer_id]->setup_crypto(crypto_key, cipher);
223  }
224 }
225 
226 
227 /** Register a new message type.
228  * @param full_name full name of type to register
229  * @return true if the type was successfully registered, false otherwise
230  */
231 bool
233 {
234  try {
235  message_register_->add_message_type(full_name);
236  return true;
237  } catch (std::runtime_error &e) {
238  //logger_->log_error("RefBox", "Registering type %s failed: %s", full_name.c_str(), e.what());
239  return false;
240  }
241 }
242 
243 
244 
245 /** Create a new message of given type.
246  * @param full_name name of message type (fully qualified, i.e. including package name)
247  * @return shared pointer to new mesage
248  * @exception std::runtime_error thrown if creating the message failed
249  */
250 std::shared_ptr<google::protobuf::Message> *
251 OpenPRSProtobuf::oprs_create_msg(std::string full_name)
252 {
253  std::shared_ptr<google::protobuf::Message> m =
254  message_register_->new_message_for(full_name);
255  return new std::shared_ptr<google::protobuf::Message>(m);
256 }
257 
258 
259 /** Create new reference to message.
260  * @param msgptr message to create reference for
261  * @return new message reference pointing to the very same message as @p msgptr
262  */
263 Term *
265 {
266  std::shared_ptr<google::protobuf::Message> *m =
267  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
268  if (!*m) return build_pointer(new std::shared_ptr<google::protobuf::Message>());
269 
270  return build_pointer(new std::shared_ptr<google::protobuf::Message>(*m));
271 }
272 
273 
274 /** Destroy given message (reference).
275  * This will decrement the reference count to the message and delete it.
276  * The message itself is deleted if the reference counter reaches zero.
277  * @param msgptr message (reference) to delete, any access to this message
278  * afterwards is illegal.
279  * @return T
280  */
281 Term *
283 {
284  std::shared_ptr<google::protobuf::Message> *m =
285  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
286  if (!*m) return build_nil();
287 
288  delete m;
289  return build_t();
290 }
291 
292 
293 /** Get field names of message.
294  * @param msgptr user pointer to message
295  * @return term containing lisp list of field names
296  */
297 Term *
299 {
300  TermList tl = sl_make_slist();
301 
302  std::shared_ptr<google::protobuf::Message> *m =
303  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
304  if (!*m) return build_term_l_list_from_c_list(tl);
305 
306 
307  const Descriptor *desc = (*m)->GetDescriptor();
308  const int field_count = desc->field_count();
309  for (int i = 0; i < field_count; ++i) {
310  tl = build_term_list(tl, build_string(desc->field(i)->name().c_str()));
311  }
312  return build_term_l_list_from_c_list(tl);
313 }
314 
315 
316 /** Get type if a specific field.
317  * @param msgptr message for which to get the field type
318  * @param field_name name of the field
319  * @return term with a symbol for the type
320  */
321 Term *
322 OpenPRSProtobuf::oprs_pb_field_type(void *msgptr, std::string field_name)
323 {
324  std::shared_ptr<google::protobuf::Message> *m =
325  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
326  if (!*m) return build_id(declare_atom("INVALID-MESSAGE"));
327 
328  const Descriptor *desc = (*m)->GetDescriptor();
329  const FieldDescriptor *field = desc->FindFieldByName(field_name);
330  if (! field) {
331  return build_id(declare_atom("DOES-NOT-EXIST"));
332  }
333  switch (field->type()) {
334  case FieldDescriptor::TYPE_DOUBLE: return build_id(declare_atom("DOUBLE"));
335  case FieldDescriptor::TYPE_FLOAT: return build_id(declare_atom("FLOAT"));
336  case FieldDescriptor::TYPE_INT64: return build_id(declare_atom("INT64"));
337  case FieldDescriptor::TYPE_UINT64: return build_id(declare_atom("UINT64"));
338  case FieldDescriptor::TYPE_INT32: return build_id(declare_atom("INT32"));
339  case FieldDescriptor::TYPE_FIXED64: return build_id(declare_atom("FIXED64"));
340  case FieldDescriptor::TYPE_FIXED32: return build_id(declare_atom("FIXED32"));
341  case FieldDescriptor::TYPE_BOOL: return build_id(declare_atom("BOOL"));
342  case FieldDescriptor::TYPE_STRING: return build_id(declare_atom("STRING"));
343  case FieldDescriptor::TYPE_MESSAGE: return build_id(declare_atom("MESSAGE"));
344  case FieldDescriptor::TYPE_BYTES: return build_id(declare_atom("BYTES"));
345  case FieldDescriptor::TYPE_UINT32: return build_id(declare_atom("UINT32"));
346  case FieldDescriptor::TYPE_ENUM: return build_id(declare_atom("ENUM"));
347  case FieldDescriptor::TYPE_SFIXED32: return build_id(declare_atom("SFIXED32"));
348  case FieldDescriptor::TYPE_SFIXED64: return build_id(declare_atom("SFIXED64"));
349  case FieldDescriptor::TYPE_SINT32: return build_id(declare_atom("SINT32"));
350  case FieldDescriptor::TYPE_SINT64: return build_id(declare_atom("SINT64"));
351  default: return build_id(declare_atom("UNKNOWN"));
352  }
353 }
354 
355 
356 /** Check if message has a specific field.
357  * This is relevant in particular for optional fields.
358  * @param msgptr message
359  * @param field_name name of the field
360  * @return true if the field is present, false otherwise
361  */
362 bool
363 OpenPRSProtobuf::oprs_pb_has_field(void *msgptr, std::string field_name)
364 {
365  std::shared_ptr<google::protobuf::Message> *m =
366  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
367  if (!*m) return false;
368 
369  const Descriptor *desc = (*m)->GetDescriptor();
370  const FieldDescriptor *field = desc->FindFieldByName(field_name);
371  if (! field) return false;
372 
373  const Reflection *refl = (*m)->GetReflection();
374 
375  if (field->is_repeated()) {
376  return (refl->FieldSize(**m, field) > 0);
377  } else {
378  return refl->HasField(**m, field);
379  }
380 }
381 
382 
383 /** Get a fields label.
384  * @param msgptr message for which to get the field type
385  * @param field_name name of the field
386  * @return Term with Symbol, one of INVALID-MESSAGE, DOES-NOT-EXIST, OPTIONAL, REPEATED, UNKNOWN
387  */
388 Term *
389 OpenPRSProtobuf::oprs_pb_field_label(void *msgptr, std::string field_name)
390 {
391  std::shared_ptr<google::protobuf::Message> *m =
392  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
393  if (!*m) return build_id(declare_atom("INVALID-MESSAGE"));
394 
395  const Descriptor *desc = (*m)->GetDescriptor();
396  const FieldDescriptor *field = desc->FindFieldByName(field_name);
397  if (! field) return build_id(declare_atom("DOES-NOT-EXIST"));
398  switch (field->label()) {
399  case FieldDescriptor::LABEL_OPTIONAL: return build_id(declare_atom("OPTIONAL"));
400  case FieldDescriptor::LABEL_REQUIRED: return build_id(declare_atom("REQUIRED"));
401  case FieldDescriptor::LABEL_REPEATED: return build_id(declare_atom("REPEATED"));
402  default: return build_id(declare_atom("UNKNOWN"));
403  }
404 }
405 
406 /** Get properly typed field value.
407  * @param msgptr message for which to get the field type
408  * @param field_name name of the field
409  * @return Term with value of proper type
410  */
411 Term *
412 OpenPRSProtobuf::oprs_pb_field_value(void *msgptr, std::string field_name)
413 {
414  std::shared_ptr<google::protobuf::Message> *m =
415  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
416  if (!*m) return build_id(declare_atom("INVALID-MESSAGE"));
417 
418  const Descriptor *desc = (*m)->GetDescriptor();
419  const FieldDescriptor *field = desc->FindFieldByName(field_name);
420  if (! field) return build_id(declare_atom("DOES-NOT-EXIST"));
421  const Reflection *refl = (*m)->GetReflection();
422  if (field->type() != FieldDescriptor::TYPE_MESSAGE && ! refl->HasField(**m, field)) {
423  //logger_->log_warn("RefBox", "Field %s of %s not set",
424  // field_name.c_str(), (*m)->GetTypeName().c_str());
425  return build_id(declare_atom("NOT-SET"));
426  }
427  switch (field->type()) {
428  case FieldDescriptor::TYPE_DOUBLE: return build_float(refl->GetDouble(**m, field));
429  case FieldDescriptor::TYPE_FLOAT: return build_float(refl->GetFloat(**m, field));
430  case FieldDescriptor::TYPE_INT64: return build_long_long(refl->GetInt64(**m, field));
431  case FieldDescriptor::TYPE_UINT64:
432  return build_long_long((long int)refl->GetUInt64(**m, field));
433  case FieldDescriptor::TYPE_INT32: return build_integer(refl->GetInt32(**m, field));
434  case FieldDescriptor::TYPE_FIXED64:
435  return build_long_long((long int)refl->GetUInt64(**m, field));
436  case FieldDescriptor::TYPE_FIXED32: return build_long_long(refl->GetUInt32(**m, field));
437  case FieldDescriptor::TYPE_BOOL: return refl->GetBool(**m, field) ? build_t() : build_nil();
438  case FieldDescriptor::TYPE_STRING: return build_string(refl->GetString(**m, field).c_str());
439  case FieldDescriptor::TYPE_MESSAGE:
440  {
441  const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
442  google::protobuf::Message *mcopy = mfield.New();
443  mcopy->CopyFrom(mfield);
444  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
445  return build_pointer(ptr);
446  }
447  case FieldDescriptor::TYPE_BYTES: return build_string((char *)"bytes");
448  case FieldDescriptor::TYPE_UINT32: return build_long_long(refl->GetUInt32(**m, field));
449  case FieldDescriptor::TYPE_ENUM:
450  return build_id(declare_atom(refl->GetEnum(**m, field)->name().c_str()));
451  case FieldDescriptor::TYPE_SFIXED32: return build_integer(refl->GetInt32(**m, field));
452  case FieldDescriptor::TYPE_SFIXED64: return build_long_long(refl->GetInt64(**m, field));
453  case FieldDescriptor::TYPE_SINT32: return build_integer(refl->GetInt32(**m, field));
454  case FieldDescriptor::TYPE_SINT64: return build_long_long(refl->GetInt64(**m, field));
455  default:
456  throw std::logic_error("Unknown protobuf field type encountered");
457  }
458 }
459 
460 
461 /** Set a field.
462  * @param msgptr message for which to get the field type
463  * @param field_name name of the field
464  * @param value term which must contain a single properly typed value.
465  */
466 void
467 OpenPRSProtobuf::oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
468 {
469  std::shared_ptr<google::protobuf::Message> *m =
470  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
471  if (!*m) return;
472 
473  const Descriptor *desc = (*m)->GetDescriptor();
474  const FieldDescriptor *field = desc->FindFieldByName(field_name);
475  if (! field) {
476  //logger_->log_warn("RefBox", "Could not find field %s", field_name.c_str());
477  return;
478  }
479  const Reflection *refl = (*m)->GetReflection();
480 
481  try {
482  switch (field->type()) {
483  case FieldDescriptor::TYPE_DOUBLE:
484  if (value->type == TT_FLOAT) {
485  refl->SetDouble(m->get(), field, *(value->u.doubleptr));
486  } else {
487  throw std::logic_error(std::string("Invalid type, required float for ") +
488  (*m)->GetTypeName() + field_name);
489  }
490  break;
491  case FieldDescriptor::TYPE_FLOAT:
492  if (value->type == TT_FLOAT) {
493  refl->SetFloat(m->get(), field, *(value->u.doubleptr));
494  } else {
495  throw std::logic_error(std::string("Invalid type, required float for ") +
496  (*m)->GetTypeName() + field_name);
497  }
498  break;
499  case FieldDescriptor::TYPE_SFIXED64:
500  case FieldDescriptor::TYPE_SINT64:
501  case FieldDescriptor::TYPE_INT64:
502  if (value->type == INTEGER) {
503  refl->SetInt64(m->get(), field, value->u.intval);
504  } else if (value->type == LONG_LONG) {
505  refl->SetInt64(m->get(), field, value->u.llintval);
506  } else {
507  throw std::logic_error(std::string("Invalid type, required integer or long long for ") +
508  (*m)->GetTypeName() + field_name);
509  }
510  break;
511  case FieldDescriptor::TYPE_FIXED64:
512  case FieldDescriptor::TYPE_UINT64:
513  if (value->type == INTEGER) {
514  refl->SetUInt64(m->get(), field, value->u.intval);
515  } else if (value->type == LONG_LONG) {
516  refl->SetUInt64(m->get(), field, value->u.llintval);
517  } else {
518  throw std::logic_error(std::string("Invalid type, required integer or long long for ") +
519  (*m)->GetTypeName() + field_name);
520  }
521  break;
522  case FieldDescriptor::TYPE_SFIXED32:
523  case FieldDescriptor::TYPE_SINT32:
524  case FieldDescriptor::TYPE_INT32:
525  if (value->type == INTEGER) {
526  refl->SetInt32(m->get(), field, value->u.intval);
527  } else {
528  throw std::logic_error(std::string("Invalid type, required integer for ") +
529  (*m)->GetTypeName() + field_name);
530  }
531  break;
532  case FieldDescriptor::TYPE_BOOL:
533  if (value->type == TT_ATOM) {
534  if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
535  refl->SetBool(m->get(), field, (value->u.id == lisp_t_sym));
536  } else {
537  throw std::logic_error(std::string("Invalid value, allowed are T or NIL for field ") +
538  (*m)->GetTypeName() + field_name);
539  }
540  } else {
541  throw std::logic_error(std::string("Invalid type, required symbol for ") +
542  (*m)->GetTypeName() + field_name);
543  }
544  break;
545  case FieldDescriptor::TYPE_STRING:
546  if (value->type == STRING) {
547  refl->SetString(m->get(), field, value->u.string);
548  } else {
549  throw std::logic_error(std::string("Invalid type, required string for ") +
550  (*m)->GetTypeName() + field_name);
551  }
552  break;
553  case FieldDescriptor::TYPE_MESSAGE:
554  if (value->type == U_POINTER) {
555  std::shared_ptr<google::protobuf::Message> *mfrom =
556  static_cast<std::shared_ptr<google::protobuf::Message> *>(value->u.u_pointer);
557  Message *mut_msg = refl->MutableMessage(m->get(), field);
558  mut_msg->CopyFrom(**mfrom);
559  delete mfrom;
560  } else {
561  throw std::logic_error(std::string("Invalid type, required user pointer for ") +
562  (*m)->GetTypeName() + field_name);
563  }
564  break;
565  case FieldDescriptor::TYPE_BYTES: break;
566  case FieldDescriptor::TYPE_FIXED32:
567  case FieldDescriptor::TYPE_UINT32:
568  if (value->type == INTEGER) {
569  refl->SetUInt32(m->get(), field, value->u.intval);
570  } else if (value->type == LONG_LONG) {
571  refl->SetUInt32(m->get(), field, value->u.llintval);
572  } else {
573  throw std::logic_error(std::string("Invalid type, required integer or long long for ") +
574  (*m)->GetTypeName() + field_name);
575  }
576  break;
577  case FieldDescriptor::TYPE_ENUM:
578  {
579  const char *sym_name = NULL;
580  if (value->type == TT_ATOM) {
581  sym_name = value->u.id;
582  } else if (value->type == STRING) {
583  sym_name = value->u.string;
584  } else {
585  throw std::logic_error(std::string("Invalid type, required symbol or string for ") +
586  (*m)->GetTypeName() + field_name);
587  }
588 
589  const EnumDescriptor *enumdesc = field->enum_type();
590  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
591  if (enumval) {
592  refl->SetEnum(m->get(), field, enumval);
593  } else {
594  std::string sym_str(sym_name);
595  std::transform(sym_str.begin(), sym_str.end(), sym_str.begin(), std::ptr_fun<int, int>(std::toupper));
596 
597  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_str);
598 
599  if (enumval) {
600  refl->SetEnum(m->get(), field, enumval);
601  } else {
602  fprintf(stderr, "%s: cannot set invalid enum value '%s' (neither '%s') on '%s'",
603  (*m)->GetTypeName().c_str(), sym_name, sym_str.c_str(), field_name.c_str());
604  }
605  }
606  }
607  break;
608 
609  default:
610  throw std::logic_error("Unknown protobuf field type encountered");
611  }
612  } catch (std::logic_error &e) {
613  //logger_->log_warn("RefBox", "Failed to set field %s of %s: %s", field_name.c_str(),
614  // (*m)->GetTypeName().c_str(), e.what());
615  }
616 }
617 
618 
619 /** Add value to a repeated field.
620  * @param msgptr message
621  * @param field_name name of the field
622  * @param value term which must contain a single properly typed value.
623  */
624 void
625 OpenPRSProtobuf::oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
626 {
627  std::shared_ptr<google::protobuf::Message> *m =
628  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
629  if (!(m || *m)) return;
630 
631  const Descriptor *desc = (*m)->GetDescriptor();
632  const FieldDescriptor *field = desc->FindFieldByName(field_name);
633  if (! field) {
634  //logger_->log_warn("RefBox", "Could not find field %s", field_name.c_str());
635  return;
636  }
637  const Reflection *refl = (*m)->GetReflection();
638 
639  try {
640  switch (field->type()) {
641  case FieldDescriptor::TYPE_DOUBLE:
642  if (value->type == TT_FLOAT) {
643  refl->AddDouble(m->get(), field, *(value->u.doubleptr));
644  } else {
645  throw std::logic_error(std::string("Invalid type, required float for ") +
646  (*m)->GetTypeName() + field_name);
647  }
648  break;
649  case FieldDescriptor::TYPE_FLOAT:
650  if (value->type == TT_FLOAT) {
651  refl->AddFloat(m->get(), field, *(value->u.doubleptr));
652  } else {
653  throw std::logic_error(std::string("Invalid type, required float for ") +
654  (*m)->GetTypeName() + field_name);
655  }
656  break;
657 
658  case FieldDescriptor::TYPE_SFIXED64:
659  case FieldDescriptor::TYPE_SINT64:
660  case FieldDescriptor::TYPE_INT64:
661  if (value->type == INTEGER) {
662  refl->AddInt64(m->get(), field, value->u.intval);
663  } else if (value->type == LONG_LONG) {
664  refl->AddInt64(m->get(), field, value->u.llintval);
665  } else {
666  throw std::logic_error(std::string("Invalid type, required integer or long long for ") +
667  (*m)->GetTypeName() + field_name);
668  }
669  break;
670 
671  case FieldDescriptor::TYPE_SFIXED32:
672  case FieldDescriptor::TYPE_SINT32:
673  case FieldDescriptor::TYPE_INT32:
674  if (value->type == INTEGER) {
675  refl->AddInt32(m->get(), field, value->u.intval);
676  } else {
677  throw std::logic_error(std::string("Invalid type, required integer for ") +
678  (*m)->GetTypeName() + field_name);
679  }
680  break;
681  case FieldDescriptor::TYPE_BOOL:
682  if (value->type == TT_ATOM) {
683  if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
684  refl->AddBool(m->get(), field, (value->u.id == lisp_t_sym));
685  } else {
686  throw std::logic_error(std::string("Invalid value, allowed are T or NIL for field ") +
687  (*m)->GetTypeName() + field_name);
688  }
689  } else {
690  throw std::logic_error(std::string("Invalid type, required symbol for ") +
691  (*m)->GetTypeName() + field_name);
692  }
693  break;
694  case FieldDescriptor::TYPE_STRING:
695  if (value->type == STRING) {
696  refl->AddString(m->get(), field, value->u.string);
697  } else {
698  throw std::logic_error(std::string("Invalid type, required string for ") +
699  (*m)->GetTypeName() + field_name);
700  }
701  break;
702  case FieldDescriptor::TYPE_MESSAGE:
703  if (value->type == U_POINTER) {
704  std::shared_ptr<google::protobuf::Message> *mfrom =
705  static_cast<std::shared_ptr<google::protobuf::Message> *>(value->u.u_pointer);
706  Message *mut_msg = refl->AddMessage(m->get(), field);
707  mut_msg->CopyFrom(**mfrom);
708  delete mfrom;
709  } else {
710  throw std::logic_error(std::string("Invalid type, required user pointer for ") +
711  (*m)->GetTypeName() + field_name);
712  }
713  break;
714 
715  case FieldDescriptor::TYPE_BYTES: break;
716 
717  case FieldDescriptor::TYPE_FIXED32:
718  case FieldDescriptor::TYPE_UINT32:
719  if (value->type == INTEGER) {
720  refl->AddUInt32(m->get(), field, value->u.intval);
721  } else if (value->type == LONG_LONG) {
722  refl->AddUInt32(m->get(), field, value->u.llintval);
723  } else {
724  throw std::logic_error(std::string("Invalid type, required integer or long long for ") +
725  (*m)->GetTypeName() + field_name);
726  }
727  break;
728 
729  case FieldDescriptor::TYPE_ENUM:
730  {
731  const char *sym_name = NULL;
732  if (value->type == TT_ATOM) {
733  sym_name = value->u.id;
734  } else if (value->type == STRING) {
735  sym_name = value->u.string;
736  } else {
737  throw std::logic_error(std::string("Invalid type, required symbol or string for ") +
738  (*m)->GetTypeName() + field_name);
739  }
740  const EnumDescriptor *enumdesc = field->enum_type();
741  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
742  if (enumval) {
743  refl->AddEnum(m->get(), field, enumval);
744  } else {
745  //logger_->log_warn("RefBox", "%s: cannot set invalid enum value '%s' on '%s'",
746  // (*m)->GetTypeName().c_str(), value.as_string().c_str(), field_name.c_str());
747  }
748  }
749  break;
750 
751  default:
752  throw std::logic_error("Unknown protobuf field type encountered");
753  }
754  } catch (std::logic_error &e) {
755  //logger_->log_warn("RefBox", "Failed to add field %s of %s: %s", field_name.c_str(),
756  // (*m)->GetTypeName().c_str(), e.what());
757  }
758 }
759 
760 
761 /** Connect as a client to the given server.
762  * Note that this will perform an asynchronous connect. A
763  * (protobuf-client-connected) or (protobuf-client-disconnected) fact
764  * is asserted during (pb-process) in the case of success or failure.
765  * @param host host to connect to
766  * @param port TCP port to connect to
767  * @return Term with a long long of the client ID
768  */
769 Term *
770 OpenPRSProtobuf::oprs_pb_client_connect(std::string host, int port)
771 {
772  if (port <= 0) return build_nil();
773 
774  ProtobufStreamClient *client = new ProtobufStreamClient(message_register_);
775 
776  long int client_id;
777  {
778  fawkes::MutexLocker lock(&map_mutex_);
779  client_id = ++next_client_id_;
780  clients_[client_id] = client;
781  }
782 
783  client->signal_connected().connect(
784  boost::bind(&OpenPRSProtobuf::handle_client_connected, this, client_id));
785  client->signal_disconnected().connect(
786  boost::bind(&OpenPRSProtobuf::handle_client_disconnected,
787  this, client_id, boost::asio::placeholders::error));
788  client->signal_received().connect(
789  boost::bind(&OpenPRSProtobuf::handle_client_msg, this, client_id, _1, _2, _3));
790  client->signal_receive_failed().connect(
791  boost::bind(&OpenPRSProtobuf::handle_client_receive_fail, this, client_id, _1, _2, _3));
792 
793  client->async_connect(host.c_str(), port);
794  return build_long_long(client_id);
795 }
796 
797 
798 /** Send message to a specific client.
799  * @param client_id ID of the client, this can be a server client ID, a client
800  * ID, or a peer ID (message will then be broadcasted).
801  * @param msgptr message to send
802  */
803 void
804 OpenPRSProtobuf::oprs_pb_send(long int client_id, void *msgptr)
805 {
806  std::shared_ptr<google::protobuf::Message> *m =
807  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
808  if (!(m || *m)) {
809  //logger_->log_warn("RefBox", "Cannot send to %li: invalid message", client_id);
810  return;
811  }
812 
813  try {
814  fawkes::MutexLocker lock(&map_mutex_);
815 
816  if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
817  //printf("***** SENDING via SERVER\n");
818  server_->send(server_clients_[client_id], *m);
819  sig_server_sent_(server_clients_[client_id], *m);
820  } else if (clients_.find(client_id) != clients_.end()) {
821  //printf("***** SENDING via CLIENT\n");
822  clients_[client_id]->send(*m);
823  std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
824  sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
825  } else if (peers_.find(client_id) != peers_.end()) {
826  //printf("***** SENDING via CLIENT\n");
827  peers_[client_id]->send(*m);
828  sig_peer_sent_(client_id, *m);
829  } else {
830  //printf("Client ID %li is unknown, cannot send message of type %s\n",
831  // client_id, (*m)->GetTypeName().c_str());
832  }
833  } catch (google::protobuf::FatalException &e) {
834  //logger_->log_warn("RefBox", "Failed to send message of type %s: %s",
835  // (*m)->GetTypeName().c_str(), e.what());
836  } catch (std::runtime_error &e) {
837  //logger_->log_warn("RefBox", "Failed to send message of type %s: %s",
838  // (*m)->GetTypeName().c_str(), e.what());
839  }
840 }
841 
842 
843 /** Broadcast a message through a peer.
844  * @param peer_id ID broadcast peer to send through
845  * @param msgptr message to send
846  */
847 void
848 OpenPRSProtobuf::oprs_pb_broadcast(long int peer_id, void *msgptr)
849 {
850  std::shared_ptr<google::protobuf::Message> *m =
851  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
852  if (!(m || *m)) {
853  fprintf(stderr, "Cannot send broadcast: invalid message");
854  return;
855  }
856 
857  fawkes::MutexLocker lock(&map_mutex_);
858  if (peers_.find(peer_id) == peers_.end()) return;
859 
860  try {
861  peers_[peer_id]->send(*m);
862  } catch (google::protobuf::FatalException &e) {
863  fprintf(stderr, "pb-broadcast: failed to broadcast message of type %s: %s\n",
864  (*m)->GetTypeName().c_str(), e.what());
865  }
866 
867  sig_peer_sent_(peer_id, *m);
868 }
869 
870 
871 /** Disconnect a given client.
872  * @param client_id ID of client to disconnect, can be a server client ID or a client ID
873  */
874 void
876 {
877  //logger_->log_info("RefBox", "Disconnecting client %li", client_id);
878 
879  try {
880  fawkes::MutexLocker lock(&map_mutex_);
881 
882  if (server_clients_.find(client_id) != server_clients_.end()) {
883  protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
884  server_->disconnect(srv_client);
885  server_clients_.erase(client_id);
886  rev_server_clients_.erase(srv_client);
887  } else if (clients_.find(client_id) != clients_.end()) {
888  delete clients_[client_id];
889  clients_.erase(client_id);
890  }
891  } catch (std::runtime_error &e) {
892  throw fawkes::Exception("Failed to disconnect from client %li: %s", client_id, e.what());
893  }
894 }
895 
896 
897 /** Get list of values of a given message field.
898  * @param msgptr message
899  * @param field_name field to retrieve
900  * @return term which contains a Lisp list with properly typed values, or a symbol in
901  * case of an error
902  */
903 Term *
904 OpenPRSProtobuf::oprs_pb_field_list(void *msgptr, std::string field_name)
905 {
906  std::shared_ptr<google::protobuf::Message> *m =
907  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
908  if (!(m || *m)) return build_id(declare_atom("INVALID-MESSAGE"));
909 
910  const Descriptor *desc = (*m)->GetDescriptor();
911  const FieldDescriptor *field = desc->FindFieldByName(field_name);
912  if (! field) return build_id(declare_atom("DOES-NOT-EXIST"));
913 
914  TermList tl = sl_make_slist();
915 
916  if (field->label() == FieldDescriptor::LABEL_REQUIRED ||
917  field->label() == FieldDescriptor::LABEL_OPTIONAL)
918  {
919  tl = build_term_list(tl, oprs_pb_field_value(msgptr, field_name));
920  return build_term_l_list_from_c_list(tl);
921  }
922 
923  const Reflection *refl = (*m)->GetReflection();
924  int field_size = refl->FieldSize(**m, field);
925  for (int i = 0; i < field_size; ++i) {
926  switch (field->type()) {
927  case FieldDescriptor::TYPE_DOUBLE:
928  tl = build_term_list(tl, build_float(refl->GetRepeatedDouble(**m, field, i)));
929  break;
930  case FieldDescriptor::TYPE_FLOAT:
931  tl = build_term_list(tl, build_float(refl->GetRepeatedFloat(**m, field, i)));
932  break;
933  break;
934  case FieldDescriptor::TYPE_UINT64:
935  case FieldDescriptor::TYPE_FIXED64:
936  tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt64(**m, field, i)));
937  break;
938  case FieldDescriptor::TYPE_UINT32:
939  case FieldDescriptor::TYPE_FIXED32:
940  tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt32(**m, field, i)));
941  break;
942  case FieldDescriptor::TYPE_BOOL:
943  tl = build_term_list(tl, refl->GetRepeatedBool(**m, field, i) ? build_t() : build_nil());
944  break;
945  case FieldDescriptor::TYPE_STRING:
946  tl = build_term_list(tl, build_string(refl->GetRepeatedString(**m, field, i).c_str()));
947  break;
948  case FieldDescriptor::TYPE_MESSAGE:
949  {
950  const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
951  google::protobuf::Message *mcopy = msg.New();
952  mcopy->CopyFrom(msg);
953  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
954  tl = build_term_list(tl, build_pointer(ptr));
955  }
956  break;
957  case FieldDescriptor::TYPE_BYTES:
958  tl = build_term_list(tl, build_string((char *)"bytes"));
959  break;
960  case FieldDescriptor::TYPE_ENUM:
961  tl = build_term_list(tl, build_id(declare_atom(refl->GetRepeatedEnum(**m, field, i)->name().c_str())));
962  break;
963  case FieldDescriptor::TYPE_SFIXED32:
964  case FieldDescriptor::TYPE_INT32:
965  case FieldDescriptor::TYPE_SINT32:
966  tl = build_term_list(tl, build_integer(refl->GetRepeatedInt32(**m, field, i)));
967  break;
968  case FieldDescriptor::TYPE_SFIXED64:
969  case FieldDescriptor::TYPE_SINT64:
970  case FieldDescriptor::TYPE_INT64:
971  tl = build_term_list(tl, build_long_long(refl->GetRepeatedInt64(**m, field, i)));
972  break;
973  default:
974  throw std::logic_error("Unknown protobuf field type encountered");
975  }
976  }
977 
978  return build_term_l_list_from_c_list(tl);
979 }
980 
981 
982 /** Check if a given field is a list (repeated field).
983  * @param msgptr message
984  * @param field_name name of the field
985  * @return true if the field is a list, false otherwise
986  */
987 bool
988 OpenPRSProtobuf::oprs_pb_field_is_list(void *msgptr, std::string field_name)
989 {
990  std::shared_ptr<google::protobuf::Message> *m =
991  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
992  if (!(m || *m)) return false;
993 
994  const Descriptor *desc = (*m)->GetDescriptor();
995  const FieldDescriptor *field = desc->FindFieldByName(field_name);
996  if (! field) {
997  return false;
998  }
999  return (field->label() == FieldDescriptor::LABEL_REPEATED);
1000 }
1001 
1002 
1003 /** Process all pending events.
1004  * This will process events and assert appropriate facts.
1005  */
1006 void
1008 {
1009  { fawkes::MutexLocker lock(q_server_client_.mutex());
1010  while (! q_server_client_.empty()) {
1011  auto &sc = q_server_client_.front();
1012  oprs_assert_server_client_event(std::get<0>(sc), std::get<1>(sc),
1013  std::get<2>(sc), std::get<3>(sc));
1014  q_server_client_.pop();
1015  }
1016  }
1017 
1018  { fawkes::MutexLocker lock(q_client_.mutex());
1019  while (! q_client_.empty()) {
1020  auto &c = q_client_.front();
1021  oprs_assert_client_event(std::get<0>(c), std::get<1>(c));
1022  q_client_.pop();
1023  }
1024  }
1025 
1026  { fawkes::MutexLocker lock(q_msgs_.mutex());
1027  while (! q_msgs_.empty()) {
1028  auto &m = q_msgs_.front();
1029  oprs_assert_message(std::get<0>(m), std::get<1>(m), std::get<2>(m),
1030  std::get<3>(m), std::get<4>(m), std::get<5>(m), std::get<6>(m));
1031  q_msgs_.pop();
1032  }
1033  }
1034 }
1035 
1036 
1037 /** Check if there are pending events.
1038  * @return true if there are pending events, false otherwise
1039  */
1040 bool
1042 {
1043  fawkes::MutexLocker lock1(q_server_client_.mutex());
1044  fawkes::MutexLocker lock2(q_client_.mutex());
1045  fawkes::MutexLocker lock3(q_msgs_.mutex());
1046 
1047  return (! (q_server_client_.empty() && q_client_.empty() && q_msgs_.empty()));
1048 }
1049 
1050 
1051 void
1052 OpenPRSProtobuf::oprs_assert_server_client_event(long int client_id,
1053  std::string &host, unsigned short port, bool connect)
1054 {
1055  TermList tl = sl_make_slist();
1056  tl = build_term_list(tl, build_long_long(client_id));
1057  if (connect) {
1058  tl = build_term_list(tl, build_string(host.c_str()));
1059  tl = build_term_list(tl, build_integer(port));
1060  add_external_fact((char *)"protobuf-server-client-connected", tl);
1061  } else {
1062  add_external_fact((char *)"protobuf-server-client-disconnected", tl);
1063  }
1064 }
1065 
1066 
1067 void
1068 OpenPRSProtobuf::oprs_assert_client_event(long int client_id, bool connect)
1069 {
1070  TermList tl = sl_make_slist();
1071  tl = build_term_list(tl, build_long_long(client_id));
1072  if (connect) {
1073  add_external_fact((char *)"protobuf-client-connected", tl);
1074  } else {
1075  add_external_fact((char *)"protobuf-client-disconnected", tl);
1076  }
1077 }
1078 
1079 void
1080 OpenPRSProtobuf::oprs_assert_message(std::string &endpoint_host, unsigned short endpoint_port,
1081  uint16_t comp_id, uint16_t msg_type,
1082  std::shared_ptr<google::protobuf::Message> &msg,
1083  OpenPRSProtobuf::ClientType ct,
1084  unsigned int client_id)
1085 {
1086  TermList tl = sl_make_slist();
1087 
1088  struct timeval tv;
1089  gettimeofday(&tv, 0);
1090  void *ptr = new std::shared_ptr<google::protobuf::Message>(msg);
1091  //tl = build_term_list(tl, build_string((char *)"type"));
1092  tl = build_term_list(tl, build_string(msg->GetTypeName().c_str()));
1093  //tl = build_term_list(tl, build_string((char *)"comp-id"));
1094  tl = build_term_list(tl, build_integer(comp_id));
1095  //tl = build_term_list(tl, build_string((char *)"msg-type"));
1096  tl = build_term_list(tl, build_integer(msg_type));
1097  //tl = build_term_list(tl, build_string((char *)"rcvd-via"));
1098  tl = build_term_list(tl, build_string((client_id == 0) ? "BROADCAST" : "STREAM"));
1099  //tl = build_term_list(tl, build_string((char *)"rcvd-at"));
1100  tl = build_term_list(tl, build_long_long(tv.tv_sec));
1101  tl = build_term_list(tl, build_long_long(tv.tv_usec));
1102  //tl = build_term_list(tl, build_string((char *)"rcvd-from"));
1103  tl = build_term_list(tl, build_string(endpoint_host.c_str()));
1104  tl = build_term_list(tl, build_integer(endpoint_port));
1105  //tl = build_term_list(tl, build_string((char *)"client-type"));
1106  tl = build_term_list(tl, build_string(ct == CT_CLIENT ? "CLIENT" :
1107  (ct == CT_SERVER ? "SERVER" : "PEER")));
1108  //tl = build_term_list(tl, build_string((char *)"client-id"));
1109  tl = build_term_list(tl, build_integer(client_id));
1110  //tl = build_term_list(tl, build_string((char *)"ptr"));
1111  tl = build_term_list(tl, build_pointer(ptr));
1112 
1113  add_external_fact((char *)"protobuf-msg", tl);
1114 }
1115 
1116 void
1117 OpenPRSProtobuf::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1118  boost::asio::ip::tcp::endpoint &endpoint)
1119 {
1120 
1121  long int client_id = -1;
1122  {
1123  fawkes::MutexLocker lock(&map_mutex_);
1124  client_id = ++next_client_id_;
1125  client_endpoints_[client_id] =
1126  std::make_pair(endpoint.address().to_string(), endpoint.port());
1127  server_clients_[client_id] = client;
1128  rev_server_clients_[client] = client_id;
1129  }
1130 
1131  q_server_client_.push_locked(
1132  std::make_tuple(client_id, endpoint.address().to_string(), endpoint.port(), true));
1133 }
1134 
1135 
1136 void
1137 OpenPRSProtobuf::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1138  const boost::system::error_code &error)
1139 {
1140  long int client_id = -1;
1141  {
1142  fawkes::MutexLocker lock(&map_mutex_);
1143  RevServerClientMap::iterator c;
1144  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1145  client_id = c->second;
1146  rev_server_clients_.erase(c);
1147  server_clients_.erase(client_id);
1148  }
1149  }
1150 
1151  if (client_id >= 0) {
1152  q_server_client_.push_locked(std::make_tuple(client_id, "", 0, false));
1153  }
1154 }
1155 
1156 
1157 /** Handle message that came from a client.
1158  * @param client client ID
1159  * @param component_id component the message was addressed to
1160  * @param msg_type type of the message
1161  * @param msg the message
1162  */
1163 void
1164 OpenPRSProtobuf::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1165  uint16_t component_id, uint16_t msg_type,
1166  std::shared_ptr<google::protobuf::Message> msg)
1167 {
1168  fawkes::MutexLocker lock(&map_mutex_);
1169  RevServerClientMap::iterator c;
1170  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1171  q_msgs_.push_locked(std::make_tuple(client_endpoints_[c->second].first,
1172  client_endpoints_[c->second].second,
1173  component_id, msg_type, msg, CT_SERVER, c->second));
1174  }
1175 }
1176 
1177 /** Handle server reception failure
1178  * @param client client ID
1179  * @param component_id component the message was addressed to
1180  * @param msg_type type of the message
1181  * @param msg the message string
1182  */
1183 void
1184 OpenPRSProtobuf::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1185  uint16_t component_id, uint16_t msg_type,
1186  std::string msg)
1187 {
1188  fawkes::MutexLocker lock(&map_mutex_);
1189  RevServerClientMap::iterator c;
1190  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1191  /*
1192  fawkes::MutexLocker lock(&oprs_mutex_);
1193  oprs_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1194  "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1195  "(rcvd-from (\"%s\" %u)))",
1196  component_id, msg_type, c->second, msg.c_str(),
1197  client_endpoints_[c->second].first.c_str(),
1198  client_endpoints_[c->second].second);
1199  */
1200  }
1201 }
1202 
1203 
1204 /** Handle message that came from a peer/robot
1205  * @param endpoint the endpoint from which the message was received
1206  * @param component_id component the message was addressed to
1207  * @param msg_type type of the message
1208  * @param msg the message
1209  */
1210 void
1211 OpenPRSProtobuf::handle_peer_msg(long int peer_id,
1212  boost::asio::ip::udp::endpoint &endpoint,
1213  uint16_t component_id, uint16_t msg_type,
1214  std::shared_ptr<google::protobuf::Message> msg)
1215 {
1216  q_msgs_.push_locked(std::make_tuple(endpoint.address().to_string(), endpoint.port(),
1217  component_id, msg_type, msg, CT_PEER, peer_id));
1218 }
1219 
1220 
1221 /** Handle error during peer message processing.
1222  * @param endpoint endpoint of incoming message
1223  * @param msg error message
1224  */
1225 void
1226 OpenPRSProtobuf::handle_peer_recv_error(long int peer_id,
1227  boost::asio::ip::udp::endpoint &endpoint, std::string msg)
1228 {
1229  fprintf(stderr, "Failed to receive peer message from %s:%u: %s\n",
1230  endpoint.address().to_string().c_str(), endpoint.port(), msg.c_str());
1231 }
1232 
1233 /** Handle error during peer message processing.
1234  * @param msg error message
1235  */
1236 void
1237 OpenPRSProtobuf::handle_peer_send_error(long int peer_id, std::string msg)
1238 {
1239  //logger_->log_warn("RefBox", "Failed to send peer message: %s", msg.c_str());
1240 }
1241 
1242 
1243 void
1244 OpenPRSProtobuf::handle_client_connected(long int client_id)
1245 {
1246  q_client_.push_locked(std::make_tuple(client_id, true));
1247 }
1248 
1249 void
1250 OpenPRSProtobuf::handle_client_disconnected(long int client_id,
1251  const boost::system::error_code &error)
1252 {
1253  q_client_.push_locked(std::make_tuple(client_id, false));
1254 }
1255 
1256 void
1257 OpenPRSProtobuf::handle_client_msg(long int client_id,
1258  uint16_t comp_id, uint16_t msg_type,
1259  std::shared_ptr<google::protobuf::Message> msg)
1260 {
1261  q_msgs_.push_locked(std::make_tuple("", 0, comp_id, msg_type, msg, CT_CLIENT, client_id));
1262 }
1263 
1264 
1265 void
1266 OpenPRSProtobuf::handle_client_receive_fail(long int client_id,
1267  uint16_t comp_id, uint16_t msg_type, std::string msg)
1268 {
1269  /*
1270  oprs_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1271  "(comp-id %u) (msg-type %u) (message \"%s\"))",
1272  client_id, comp_id, msg_type, msg.c_str());
1273  */
1274 }
1275 
1276 } // end namespace protobuf_clips
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
Mutex locking helper.
Definition: mutex_locker.h:33
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:68
Register to map msg type numbers to Protobuf messages.
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
Definition: server.h:120
Term * oprs_pb_peer_create(std::string host, int port)
Enable protobuf peer.
void oprs_pb_process()
Process all pending events.
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:450
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
Term * oprs_pb_peer_create_crypto(std::string host, int port, std::string crypto_key="", std::string cipher="")
Enable protobuf peer.
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Communicate by broadcasting protobuf messages.
Definition: peer.h:60
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
Definition: peer.h:152
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
Term * oprs_pb_peer_create_local_crypto(std::string host, int send_port, int recv_port, std::string crypto_key="", std::string cipher="")
Enable protobuf peer.
Base class for exceptions in Fawkes.
Definition: exception.h:36
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_queue.h:72
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: client.h:93
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
Definition: peer.h:132
std::shared_ptr< google::protobuf::Message > new_message_for(uint16_t component_id, uint16_t msg_type)
Create a new message instance.
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: client.h:99
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
Definition: client.cpp:154
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:324
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
Stream server for protobuf message transmission.
Definition: server.h:64
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
Definition: client.h:105
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
Definition: server.h:114
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
Definition: peer.h:146
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:139
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: server.h:108
Term * oprs_pb_peer_create_local(std::string host, int send_port, int recv_port)
Enable protobuf peer.
void oprs_pb_disable_server()
Disable protobuf stream server.
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Definition: client.h:111
Stream client for protobuf message transmission.
Definition: client.h:59
bool oprs_pb_events_pending()
Check if there are pending events.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: server.h:102
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
void add_message_type(std::string msg_type)
Add a message type from generated pool.
void oprs_pb_peer_setup_crypto(long int peer_id, std::string crypto_key, std::string cipher)
Setup crypto for peer.
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.