37 #include <protobuf_clips/communicator.h> 39 #include <core/threading/mutex_locker.h> 40 #include <logging/logger.h> 41 #include <protobuf_comm/client.h> 42 #include <protobuf_comm/server.h> 43 #include <protobuf_comm/peer.h> 45 #include <google/protobuf/descriptor.h> 47 #include <boost/format.hpp> 71 ClipsProtobufCommunicator::ClipsProtobufCommunicator(CLIPS::Environment *env,
74 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL),
89 std::vector<std::string> &proto_path,
91 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL),
105 for (
auto f : functions_) {
106 clips_->remove_function(f);
111 for (
auto c : clients_) {
116 delete message_register_;
121 #define ADD_FUNCTION(n, s) \ 122 clips_->add_function(n, s); \ 123 functions_.push_back(n); 128 ClipsProtobufCommunicator::setup_clips()
132 ADD_FUNCTION(
"pb-register-type", (sigc::slot<CLIPS::Value, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_register_type))));
133 ADD_FUNCTION(
"pb-field-names", (sigc::slot<CLIPS::Values, void *>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_names))));
134 ADD_FUNCTION(
"pb-field-type", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_type))));
135 ADD_FUNCTION(
"pb-has-field", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_has_field))));
136 ADD_FUNCTION(
"pb-field-label", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_label))));
137 ADD_FUNCTION(
"pb-field-value", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_value))));
138 ADD_FUNCTION(
"pb-field-list", (sigc::slot<CLIPS::Values, void *, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_list))));
139 ADD_FUNCTION(
"pb-field-is-list", (sigc::slot<CLIPS::Value, void *, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
140 ADD_FUNCTION(
"pb-create", (sigc::slot<CLIPS::Value, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_create))));
141 ADD_FUNCTION(
"pb-destroy", (sigc::slot<void, void *>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_destroy))));
142 ADD_FUNCTION(
"pb-ref", (sigc::slot<CLIPS::Value, void *>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_ref))));
143 ADD_FUNCTION(
"pb-set-field", (sigc::slot<void, void *, std::string, CLIPS::Value>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_set_field))));
144 ADD_FUNCTION(
"pb-add-list", (sigc::slot<void, void *, std::string, CLIPS::Value>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_add_list))));
145 ADD_FUNCTION(
"pb-send", (sigc::slot<void, long int, void *>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_send))));
146 ADD_FUNCTION(
"pb-tostring", (sigc::slot<std::string, void *>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_tostring))));
149 ADD_FUNCTION(
"pb-peer-create", (sigc::slot<long int, std::string, int>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
150 ADD_FUNCTION(
"pb-peer-create-local", (sigc::slot<long int, std::string, int, int>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
151 ADD_FUNCTION(
"pb-peer-create-crypto",
152 (sigc::slot<long int, std::string, int, std::string, std::string>
153 (sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
154 ADD_FUNCTION(
"pb-peer-create-local-crypto",
155 (sigc::slot<long int, std::string, int, int, std::string, std::string>
156 (sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
157 ADD_FUNCTION(
"pb-peer-destroy", (sigc::slot<void, long int>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
158 ADD_FUNCTION(
"pb-peer-setup-crypto", (sigc::slot<void, long int, std::string, std::string>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
159 ADD_FUNCTION(
"pb-broadcast", (sigc::slot<void, long int, void *>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
160 ADD_FUNCTION(
"pb-connect", (sigc::slot<long int, std::string, int>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
161 ADD_FUNCTION(
"pb-disconnect", (sigc::slot<void, long int>(sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
170 if ((port > 0) && ! server_) {
174 .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected,
this, _1, _2));
176 .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected,
this, _1, _2));
178 .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg,
this, _1, _2, _3, _4));
180 .connect(boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail,
this, _1, _2, _3, _4));
204 ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address,
int send_port,
int recv_port,
205 std::string crypto_key, std::string cipher)
207 if (recv_port <= 0) recv_port = send_port;
212 message_register_, crypto_key, cipher);
217 peer_id = ++next_client_id_;
218 peers_[peer_id] = peer;
222 .connect(boost::bind(&ClipsProtobufCommunicator::handle_peer_msg,
this, peer_id, _1, _2, _3, _4));
224 .connect(boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error,
this, peer_id, _1, _2));
226 .connect(boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error,
this, peer_id, _1));
242 ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address,
int port,
243 std::string crypto_key, std::string cipher)
245 return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
254 ClipsProtobufCommunicator::clips_pb_peer_create(std::string address,
int port)
256 return clips_pb_peer_create_local_crypto(address, port, port);
266 ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address,
int send_port,
269 return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
277 ClipsProtobufCommunicator::clips_pb_peer_destroy(
long int peer_id)
279 if (peers_.find(peer_id) != peers_.end()) {
280 delete peers_[peer_id];
281 peers_.erase(peer_id);
292 ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(
long int peer_id,
293 std::string crypto_key, std::string cipher)
295 if (peers_.find(peer_id) != peers_.end()) {
296 peers_[peer_id]->setup_crypto(crypto_key, cipher);
306 ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
310 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
311 }
catch (std::runtime_error &e) {
313 logger_->
log_error(
"CLIPS-Protobuf",
"Registering type %s failed: %s",
314 full_name.c_str(), e.what());
316 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
323 ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
326 std::shared_ptr<google::protobuf::Message> m =
328 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>(m));
329 }
catch (std::runtime_error &e) {
331 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot create message of type %s: %s",
332 full_name.c_str(), e.what());
334 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>());
340 ClipsProtobufCommunicator::clips_pb_ref(
void *msgptr)
342 std::shared_ptr<google::protobuf::Message> *m =
343 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
344 if (!*m)
return new std::shared_ptr<google::protobuf::Message>();
346 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>(*m));
351 ClipsProtobufCommunicator::clips_pb_destroy(
void *msgptr)
353 std::shared_ptr<google::protobuf::Message> *m =
354 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
362 ClipsProtobufCommunicator::clips_pb_field_names(
void *msgptr)
364 std::shared_ptr<google::protobuf::Message> *m =
365 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
366 if (!*m)
return CLIPS::Values();
368 const Descriptor *desc = (*m)->GetDescriptor();
369 const int field_count = desc->field_count();
370 CLIPS::Values field_names(field_count);
371 for (
int i = 0; i < field_count; ++i) {
372 field_names[i].set(desc->field(i)->name(),
true);
378 ClipsProtobufCommunicator::clips_pb_field_type(
void *msgptr, std::string field_name)
380 std::shared_ptr<google::protobuf::Message> *m =
381 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
382 if (!*m)
return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
384 const Descriptor *desc = (*m)->GetDescriptor();
385 const FieldDescriptor *field = desc->FindFieldByName(field_name);
387 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
389 switch (field->type()) {
390 case FieldDescriptor::TYPE_DOUBLE:
return CLIPS::Value(
"DOUBLE", CLIPS::TYPE_SYMBOL);
391 case FieldDescriptor::TYPE_FLOAT:
return CLIPS::Value(
"FLOAT", CLIPS::TYPE_SYMBOL);
392 case FieldDescriptor::TYPE_INT64:
return CLIPS::Value(
"INT64", CLIPS::TYPE_SYMBOL);
393 case FieldDescriptor::TYPE_UINT64:
return CLIPS::Value(
"UINT64", CLIPS::TYPE_SYMBOL);
394 case FieldDescriptor::TYPE_INT32:
return CLIPS::Value(
"INT32", CLIPS::TYPE_SYMBOL);
395 case FieldDescriptor::TYPE_FIXED64:
return CLIPS::Value(
"FIXED64", CLIPS::TYPE_SYMBOL);
396 case FieldDescriptor::TYPE_FIXED32:
return CLIPS::Value(
"FIXED32", CLIPS::TYPE_SYMBOL);
397 case FieldDescriptor::TYPE_BOOL:
return CLIPS::Value(
"BOOL", CLIPS::TYPE_SYMBOL);
398 case FieldDescriptor::TYPE_STRING:
return CLIPS::Value(
"STRING", CLIPS::TYPE_SYMBOL);
399 case FieldDescriptor::TYPE_MESSAGE:
return CLIPS::Value(
"MESSAGE", CLIPS::TYPE_SYMBOL);
400 case FieldDescriptor::TYPE_BYTES:
return CLIPS::Value(
"BYTES", CLIPS::TYPE_SYMBOL);
401 case FieldDescriptor::TYPE_UINT32:
return CLIPS::Value(
"UINT32", CLIPS::TYPE_SYMBOL);
402 case FieldDescriptor::TYPE_ENUM:
return CLIPS::Value(
"ENUM", CLIPS::TYPE_SYMBOL);
403 case FieldDescriptor::TYPE_SFIXED32:
return CLIPS::Value(
"SFIXED32", CLIPS::TYPE_SYMBOL);
404 case FieldDescriptor::TYPE_SFIXED64:
return CLIPS::Value(
"SFIXED64", CLIPS::TYPE_SYMBOL);
405 case FieldDescriptor::TYPE_SINT32:
return CLIPS::Value(
"SINT32", CLIPS::TYPE_SYMBOL);
406 case FieldDescriptor::TYPE_SINT64:
return CLIPS::Value(
"SINT64", CLIPS::TYPE_SYMBOL);
407 default:
return CLIPS::Value(
"UNKNOWN", CLIPS::TYPE_SYMBOL);
412 ClipsProtobufCommunicator::clips_pb_has_field(
void *msgptr, std::string field_name)
414 std::shared_ptr<google::protobuf::Message> *m =
415 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
416 if (!*m)
return false;
418 const Descriptor *desc = (*m)->GetDescriptor();
419 const FieldDescriptor *field = desc->FindFieldByName(field_name);
420 if (! field)
return false;
422 const Reflection *refl = (*m)->GetReflection();
424 if (field->is_repeated()) {
425 return CLIPS::Value((refl->FieldSize(**m, field) > 0) ?
"TRUE" :
"FALSE",
427 }
else if (field->is_optional()) {
428 return CLIPS::Value(refl->HasField(**m, field) ?
"TRUE" :
"FALSE",
431 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
437 ClipsProtobufCommunicator::clips_pb_field_label(
void *msgptr, std::string field_name)
439 std::shared_ptr<google::protobuf::Message> *m =
440 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
441 if (!*m)
return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
443 const Descriptor *desc = (*m)->GetDescriptor();
444 const FieldDescriptor *field = desc->FindFieldByName(field_name);
446 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
448 switch (field->label()) {
449 case FieldDescriptor::LABEL_OPTIONAL:
return CLIPS::Value(
"OPTIONAL", CLIPS::TYPE_SYMBOL);
450 case FieldDescriptor::LABEL_REQUIRED:
return CLIPS::Value(
"REQUIRED", CLIPS::TYPE_SYMBOL);
451 case FieldDescriptor::LABEL_REPEATED:
return CLIPS::Value(
"REPEATED", CLIPS::TYPE_SYMBOL);
452 default:
return CLIPS::Value(
"UNKNOWN", CLIPS::TYPE_SYMBOL);
457 ClipsProtobufCommunicator::clips_pb_field_value(
void *msgptr, std::string field_name)
459 std::shared_ptr<google::protobuf::Message> *m =
460 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
463 logger_->
log_warn(
"CLIPS-Protobuf",
"Invalid message when setting %s", field_name.c_str());
465 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
468 const Descriptor *desc = (*m)->GetDescriptor();
469 const FieldDescriptor *field = desc->FindFieldByName(field_name);
472 logger_->
log_warn(
"CLIPS-Protobuf",
"Field %s of %s does not exist",
473 field_name.c_str(), (*m)->GetTypeName().c_str());
475 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
477 const Reflection *refl = (*m)->GetReflection();
478 if (field->type() != FieldDescriptor::TYPE_MESSAGE && ! refl->HasField(**m, field)) {
480 logger_->
log_warn(
"CLIPS-Protobuf",
"Field %s of %s not set",
481 field_name.c_str(), (*m)->GetTypeName().c_str());
483 return CLIPS::Value(
"NOT-SET", CLIPS::TYPE_SYMBOL);
485 switch (field->type()) {
486 case FieldDescriptor::TYPE_DOUBLE:
return CLIPS::Value(refl->GetDouble(**m, field));
487 case FieldDescriptor::TYPE_FLOAT:
return CLIPS::Value(refl->GetFloat(**m, field));
488 case FieldDescriptor::TYPE_INT64:
return CLIPS::Value(refl->GetInt64(**m, field));
489 case FieldDescriptor::TYPE_UINT64:
490 return CLIPS::Value((
long int)refl->GetUInt64(**m, field));
491 case FieldDescriptor::TYPE_INT32:
return CLIPS::Value(refl->GetInt32(**m, field));
492 case FieldDescriptor::TYPE_FIXED64:
493 return CLIPS::Value((
long int)refl->GetUInt64(**m, field));
494 case FieldDescriptor::TYPE_FIXED32:
return CLIPS::Value(refl->GetUInt32(**m, field));
495 case FieldDescriptor::TYPE_BOOL:
497 if(refl->GetBool(**m, field)){
498 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
501 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
503 case FieldDescriptor::TYPE_STRING:
return CLIPS::Value(refl->GetString(**m, field));
504 case FieldDescriptor::TYPE_MESSAGE:
506 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
507 google::protobuf::Message *mcopy = mfield.New();
508 mcopy->CopyFrom(mfield);
509 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
510 return CLIPS::Value(ptr);
512 case FieldDescriptor::TYPE_BYTES:
return CLIPS::Value((
char *)
"bytes");
513 case FieldDescriptor::TYPE_UINT32:
return CLIPS::Value(refl->GetUInt32(**m, field));
514 case FieldDescriptor::TYPE_ENUM:
515 return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
516 case FieldDescriptor::TYPE_SFIXED32:
return CLIPS::Value(refl->GetInt32(**m, field));
517 case FieldDescriptor::TYPE_SFIXED64:
return CLIPS::Value(refl->GetInt64(**m, field));
518 case FieldDescriptor::TYPE_SINT32:
return CLIPS::Value(refl->GetInt32(**m, field));
519 case FieldDescriptor::TYPE_SINT64:
return CLIPS::Value(refl->GetInt64(**m, field));
521 throw std::logic_error(
"Unknown protobuf field type encountered");
527 ClipsProtobufCommunicator::clips_pb_set_field(
void *msgptr, std::string field_name, CLIPS::Value value)
529 std::shared_ptr<google::protobuf::Message> *m =
530 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
531 if (!(m && *m))
return;
533 const Descriptor *desc = (*m)->GetDescriptor();
534 const FieldDescriptor *field = desc->FindFieldByName(field_name);
538 "Could not find field %s", field_name.c_str());
542 const Reflection *refl = (*m)->GetReflection();
545 switch (field->type()) {
546 case FieldDescriptor::TYPE_DOUBLE:
547 refl->SetDouble(m->get(), field, value.as_float());
break;
548 case FieldDescriptor::TYPE_FLOAT:
549 refl->SetFloat(m->get(), field, value.as_float());
break;
550 case FieldDescriptor::TYPE_SFIXED64:
551 case FieldDescriptor::TYPE_SINT64:
552 case FieldDescriptor::TYPE_INT64:
553 refl->SetInt64(m->get(), field, value.as_integer());
break;
554 case FieldDescriptor::TYPE_FIXED64:
555 case FieldDescriptor::TYPE_UINT64:
556 refl->SetUInt64(m->get(), field, value.as_integer());
break;
557 case FieldDescriptor::TYPE_SFIXED32:
558 case FieldDescriptor::TYPE_SINT32:
559 case FieldDescriptor::TYPE_INT32:
560 refl->SetInt32(m->get(), field, value.as_integer());
break;
561 case FieldDescriptor::TYPE_BOOL:
562 refl->SetBool(m->get(), field, (value ==
"TRUE"));
564 case FieldDescriptor::TYPE_STRING:
565 refl->SetString(m->get(), field, value.as_string());
break;
566 case FieldDescriptor::TYPE_MESSAGE:
568 std::shared_ptr<google::protobuf::Message> *mfrom =
569 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value.as_address());
570 Message *mut_msg = refl->MutableMessage(m->get(), field);
571 mut_msg->CopyFrom(**mfrom);
575 case FieldDescriptor::TYPE_BYTES:
break;
576 case FieldDescriptor::TYPE_FIXED32:
577 case FieldDescriptor::TYPE_UINT32:
578 refl->SetUInt32(m->get(), field, value.as_integer());
break;
579 case FieldDescriptor::TYPE_ENUM:
581 const EnumDescriptor *enumdesc = field->enum_type();
582 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
584 refl->SetEnum(m->get(), field, enumval);
587 logger_->
log_warn(
"CLIPS-Protobuf",
"%s: cannot set invalid " 588 "enum value '%s' on '%s'",
589 (*m)->GetTypeName().c_str(),
590 value.as_string().c_str(), field_name.c_str());
596 throw std::logic_error(
"Unknown protobuf field type encountered");
598 }
catch (std::logic_error &e) {
600 logger_->
log_warn(
"CLIPS-Protobuf",
"Failed to set field %s of %s: %s " 601 "(type %d, as string %s)",
602 field_name.c_str(), (*m)->GetTypeName().c_str(), e.what(),
603 value.type(), to_string(value).c_str());
610 ClipsProtobufCommunicator::clips_pb_add_list(
void *msgptr, std::string field_name, CLIPS::Value value)
612 std::shared_ptr<google::protobuf::Message> *m =
613 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
614 if (!(m && *m))
return;
616 const Descriptor *desc = (*m)->GetDescriptor();
617 const FieldDescriptor *field = desc->FindFieldByName(field_name);
620 logger_->
log_warn(
"CLIPS-Protobuf",
"Could not find field %s",
625 const Reflection *refl = (*m)->GetReflection();
628 switch (field->type()) {
629 case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value);
break;
630 case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value);
break;
631 case FieldDescriptor::TYPE_SFIXED64:
632 case FieldDescriptor::TYPE_SINT64:
633 case FieldDescriptor::TYPE_INT64:
634 refl->AddInt64(m->get(), field, value);
break;
635 case FieldDescriptor::TYPE_FIXED64:
636 case FieldDescriptor::TYPE_UINT64:
637 refl->AddUInt64(m->get(), field, (
long int)value);
break;
638 case FieldDescriptor::TYPE_SFIXED32:
639 case FieldDescriptor::TYPE_SINT32:
640 case FieldDescriptor::TYPE_INT32:
641 refl->AddInt32(m->get(), field, value);
break;
642 case FieldDescriptor::TYPE_BOOL:
643 refl->AddBool(m->get(), field, (value ==
"TRUE"));
645 case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value);
break;
646 case FieldDescriptor::TYPE_MESSAGE:
648 std::shared_ptr<google::protobuf::Message> *mfrom =
649 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value.as_address());
650 Message *new_msg = refl->AddMessage(m->get(), field);
651 new_msg->CopyFrom(**mfrom);
655 case FieldDescriptor::TYPE_BYTES:
break;
656 case FieldDescriptor::TYPE_FIXED32:
657 case FieldDescriptor::TYPE_UINT32:
658 refl->AddUInt32(m->get(), field, value);
break;
659 case FieldDescriptor::TYPE_ENUM:
661 const EnumDescriptor *enumdesc = field->enum_type();
662 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
663 if (enumval) refl->AddEnum(m->get(), field, enumval);
667 throw std::logic_error(
"Unknown protobuf field type encountered");
669 }
catch (std::logic_error &e) {
671 logger_->
log_warn(
"CLIPS-Protobuf",
"Failed to add field %s of %s: %s",
672 field_name.c_str(), (*m)->GetTypeName().c_str(), e.what());
679 ClipsProtobufCommunicator::clips_pb_client_connect(std::string host,
int port)
681 if (port <= 0)
return false;
688 client_id = ++next_client_id_;
689 clients_[client_id] = client;
693 boost::bind(&ClipsProtobufCommunicator::handle_client_connected,
this, client_id));
695 boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
696 this, client_id, boost::asio::placeholders::error));
698 boost::bind(&ClipsProtobufCommunicator::handle_client_msg,
this, client_id, _1, _2, _3));
700 boost::bind(&ClipsProtobufCommunicator::handle_client_receive_fail,
this, client_id, _1, _2, _3));
703 return CLIPS::Value(client_id);
708 ClipsProtobufCommunicator::clips_pb_send(
long int client_id,
void *msgptr)
710 std::shared_ptr<google::protobuf::Message> *m =
711 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
715 "Cannot send to %li: invalid message", client_id);
723 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
725 server_->
send(server_clients_[client_id], *m);
726 sig_server_sent_(server_clients_[client_id], *m);
727 }
else if (clients_.find(client_id) != clients_.end()) {
729 clients_[client_id]->send(*m);
730 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
731 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
732 }
else if (peers_.find(client_id) != peers_.end()) {
734 peers_[client_id]->send(*m);
735 sig_peer_sent_(client_id, *m);
740 }
catch (google::protobuf::FatalException &e) {
742 logger_->
log_warn(
"CLIPS-Profobuf",
"Failed to send message of type %s: %s",
743 (*m)->GetTypeName().c_str(), e.what());
747 logger_->
log_warn(
"CLIPS-Protobuf",
"Failed to send message of type %s: %s",
750 }
catch (std::runtime_error &e) {
752 logger_->
log_warn(
"CLIPS-Protobuf",
"Failed to send message of type %s: %s",
753 (*m)->GetTypeName().c_str(), e.what());
759 ClipsProtobufCommunicator::clips_pb_tostring(
void *msgptr)
761 std::shared_ptr<google::protobuf::Message> *m =
762 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
766 "Cannot convert message to string: invalid message");
771 return (*m)->DebugString();
776 ClipsProtobufCommunicator::clips_pb_broadcast(
long int peer_id,
void *msgptr)
778 std::shared_ptr<google::protobuf::Message> *m =
779 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
782 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot send broadcast: invalid message");
788 if (peers_.find(peer_id) == peers_.end())
return;
792 peers_[peer_id]->send(*m);
793 }
catch (google::protobuf::FatalException &e) {
796 "Failed to broadcast message of type %s: %s",
797 (*m)->GetTypeName().c_str(), e.what());
802 "Failed to broadcast message of type %s: %s",
805 }
catch (std::runtime_error &e) {
808 "Failed to broadcast message of type %s: %s",
809 (*m)->GetTypeName().c_str(), e.what());
813 sig_peer_sent_(peer_id, *m);
818 ClipsProtobufCommunicator::clips_pb_disconnect(
long int client_id)
825 if (server_clients_.find(client_id) != server_clients_.end()) {
828 server_clients_.erase(client_id);
829 rev_server_clients_.erase(srv_client);
830 }
else if (clients_.find(client_id) != clients_.end()) {
831 delete clients_[client_id];
832 clients_.erase(client_id);
834 }
catch (std::runtime_error &e) {
837 "Failed to disconnect from client %li: %s",
838 client_id, e.what());
844 ClipsProtobufCommunicator::clips_pb_field_list(
void *msgptr, std::string field_name)
846 std::shared_ptr<google::protobuf::Message> *m =
847 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
848 if (!(m && *m))
return CLIPS::Values(1, CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
850 const Descriptor *desc = (*m)->GetDescriptor();
851 const FieldDescriptor *field = desc->FindFieldByName(field_name);
853 return CLIPS::Values(1, CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
855 if (field->label() == FieldDescriptor::LABEL_REQUIRED ||
856 field->label() == FieldDescriptor::LABEL_OPTIONAL)
858 CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
862 const Reflection *refl = (*m)->GetReflection();
863 int field_size = refl->FieldSize(**m, field);
864 CLIPS::Values rv(field_size);
865 for (
int i = 0; i < field_size; ++i) {
866 switch (field->type()) {
867 case FieldDescriptor::TYPE_DOUBLE:
868 rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
870 case FieldDescriptor::TYPE_FLOAT:
871 rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
874 case FieldDescriptor::TYPE_UINT64:
875 case FieldDescriptor::TYPE_FIXED64:
876 rv[i] = CLIPS::Value((
long int)refl->GetRepeatedUInt64(**m, field, i));
878 case FieldDescriptor::TYPE_UINT32:
879 case FieldDescriptor::TYPE_FIXED32:
880 rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
882 case FieldDescriptor::TYPE_BOOL:
884 if(refl->GetRepeatedBool(**m, field, i)){
885 rv[i] = CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
888 rv[i] = CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
891 case FieldDescriptor::TYPE_STRING:
892 rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
894 case FieldDescriptor::TYPE_MESSAGE:
896 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
897 google::protobuf::Message *mcopy = msg.New();
898 mcopy->CopyFrom(msg);
899 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
900 rv[i] = CLIPS::Value(ptr);
903 case FieldDescriptor::TYPE_BYTES:
904 rv[i] = CLIPS::Value((
char *)
"BYTES", CLIPS::TYPE_SYMBOL);
906 case FieldDescriptor::TYPE_ENUM:
907 rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
909 case FieldDescriptor::TYPE_SFIXED32:
910 case FieldDescriptor::TYPE_INT32:
911 case FieldDescriptor::TYPE_SINT32:
912 rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
914 case FieldDescriptor::TYPE_SFIXED64:
915 case FieldDescriptor::TYPE_SINT64:
916 case FieldDescriptor::TYPE_INT64:
917 rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
920 throw std::logic_error(
"Unknown protobuf field type encountered");
929 ClipsProtobufCommunicator::clips_pb_field_is_list(
void *msgptr, std::string field_name)
931 std::shared_ptr<google::protobuf::Message> *m =
932 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
933 if (!(m && *m))
return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
935 const Descriptor *desc = (*m)->GetDescriptor();
936 const FieldDescriptor *field = desc->FindFieldByName(field_name);
937 if (! field)
return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
938 return CLIPS::Value(field->is_repeated() ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
943 ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
944 uint16_t comp_id, uint16_t msg_type,
945 std::shared_ptr<google::protobuf::Message> &msg,
946 ClipsProtobufCommunicator::ClientType ct,
947 unsigned int client_id)
949 CLIPS::Template::pointer temp = clips_->get_template(
"protobuf-msg");
952 gettimeofday(&tv, 0);
953 void *ptr =
new std::shared_ptr<google::protobuf::Message>(msg);
954 CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
955 fact->set_slot(
"type", msg->GetTypeName());
956 fact->set_slot(
"comp-id", comp_id);
957 fact->set_slot(
"msg-type", msg_type);
958 fact->set_slot(
"rcvd-via",
959 CLIPS::Value((ct == CT_PEER) ?
"BROADCAST" :
"STREAM", CLIPS::TYPE_SYMBOL));
960 CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
961 rcvd_at[0] = tv.tv_sec;
962 rcvd_at[1] = tv.tv_usec;
963 fact->set_slot(
"rcvd-at", rcvd_at);
964 CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
965 host_port[0] = endpoint.first;
966 host_port[1] = CLIPS::Value(endpoint.second);
967 fact->set_slot(
"rcvd-from", host_port);
968 fact->set_slot(
"client-type",
969 CLIPS::Value(ct == CT_CLIENT ?
"CLIENT" :
970 (ct == CT_SERVER ?
"SERVER" :
"PEER"), CLIPS::TYPE_SYMBOL));
971 fact->set_slot(
"client-id", client_id);
972 fact->set_slot(
"ptr", CLIPS::Value(ptr));
973 CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
977 logger_->
log_warn(
"CLIPS-Protobuf",
"Asserting protobuf-msg fact failed");
979 delete static_cast<std::shared_ptr<google::protobuf::Message> *
>(ptr);
984 "Did not get template, did you load protobuf.clp?");
991 boost::asio::ip::tcp::endpoint &endpoint)
994 long int client_id = -1;
997 client_id = ++next_client_id_;
998 client_endpoints_[client_id] =
999 std::make_pair(endpoint.address().to_string(), endpoint.port());
1000 server_clients_[client_id] = client;
1001 rev_server_clients_[client] = client_id;
1005 clips_->assert_fact_f(
"(protobuf-server-client-connected %li %s %u)", client_id,
1006 endpoint.address().to_string().c_str(), endpoint.port());
1012 const boost::system::error_code &error)
1014 long int client_id = -1;
1017 RevServerClientMap::iterator c;
1018 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1019 client_id = c->second;
1020 rev_server_clients_.erase(c);
1021 server_clients_.erase(client_id);
1025 if (client_id >= 0) {
1027 clips_->assert_fact_f(
"(protobuf-server-client-disconnected %li)", client_id);
1040 uint16_t component_id, uint16_t msg_type,
1041 std::shared_ptr<google::protobuf::Message> msg)
1045 RevServerClientMap::iterator c;
1046 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1047 clips_assert_message(client_endpoints_[c->second],
1048 component_id, msg_type, msg, CT_SERVER, c->second);
1060 uint16_t component_id, uint16_t msg_type,
1064 RevServerClientMap::iterator c;
1065 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1067 clips_->assert_fact_f(
"(protobuf-server-receive-failed (comp-id %u) (msg-type %u) " 1068 "(rcvd-via STREAM) (client-id %li) (message \"%s\") " 1069 "(rcvd-from (\"%s\" %u)))",
1070 component_id, msg_type, c->second, msg.c_str(),
1071 client_endpoints_[c->second].first.c_str(),
1072 client_endpoints_[c->second].second);
1084 ClipsProtobufCommunicator::handle_peer_msg(
long int peer_id,
1085 boost::asio::ip::udp::endpoint &endpoint,
1086 uint16_t component_id, uint16_t msg_type,
1087 std::shared_ptr<google::protobuf::Message> msg)
1090 std::pair<std::string, unsigned short> endpp =
1091 std::make_pair(endpoint.address().to_string(), endpoint.port());
1092 clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1101 ClipsProtobufCommunicator::handle_peer_recv_error(
long int peer_id,
1102 boost::asio::ip::udp::endpoint &endpoint, std::string msg)
1105 logger_->
log_warn(
"CLIPS-Protobuf",
1106 "Failed to receive peer message from %s:%u: %s",
1107 endpoint.address().to_string().c_str(), endpoint.port(),
1116 ClipsProtobufCommunicator::handle_peer_send_error(
long int peer_id, std::string msg)
1119 logger_->
log_warn(
"CLIPS-Protobuf",
1120 "Failed to send peer message: %s", msg.c_str());
1126 ClipsProtobufCommunicator::handle_client_connected(
long int client_id)
1129 clips_->assert_fact_f(
"(protobuf-client-connected %li)", client_id);
1133 ClipsProtobufCommunicator::handle_client_disconnected(
long int client_id,
1134 const boost::system::error_code &error)
1137 clips_->assert_fact_f(
"(protobuf-client-disconnected %li)", client_id);
1141 ClipsProtobufCommunicator::handle_client_msg(
long int client_id,
1142 uint16_t comp_id, uint16_t msg_type,
1143 std::shared_ptr<google::protobuf::Message> msg)
1146 std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1147 clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1152 ClipsProtobufCommunicator::handle_client_receive_fail(
long int client_id,
1153 uint16_t comp_id, uint16_t msg_type, std::string msg)
1156 clips_->assert_fact_f(
"(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) " 1157 "(comp-id %u) (msg-type %u) (message \"%s\"))",
1158 client_id, comp_id, msg_type, msg.c_str());
1162 ClipsProtobufCommunicator::to_string(
const CLIPS::Value &v)
1165 case CLIPS::TYPE_UNKNOWN:
return "Unknown Type";
1166 case CLIPS::TYPE_FLOAT:
return std::to_string(v.as_float());
1167 case CLIPS::TYPE_INTEGER:
return std::to_string(v.as_integer());
1168 case CLIPS::TYPE_SYMBOL:
1169 case CLIPS::TYPE_INSTANCE_NAME:
1170 case CLIPS::TYPE_STRING:
return v.as_string();
1171 case CLIPS::TYPE_INSTANCE_ADDRESS:
1172 case CLIPS::TYPE_EXTERNAL_ADDRESS:
1173 return boost::str(boost::format(
"%p") % v.as_address());
1175 return "Implicit unknown type";
void enable_server(int port)
Enable protobuf stream server.
void disable_server()
Disable protobu stream server.
unsigned int ClientID
ID to identify connected clients.
Register to map msg type numbers to Protobuf messages.
~ClipsProtobufCommunicator()
Destructor.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
void disconnect(ClientID client)
Disconnect specific client.
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
Communicate by broadcasting protobuf messages.
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
Base class for exceptions in Fawkes.
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
std::shared_ptr< google::protobuf::Message > new_message_for(uint16_t component_id, uint16_t msg_type)
Create a new message instance.
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Stream server for protobuf message transmission.
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Mutex mutual exclusion lock.
Stream client for protobuf message transmission.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
void add_message_type(std::string msg_type)
Add a message type from generated pool.