37 #include "oprs_protobuf.h" 39 #include <core/threading/mutex_locker.h> 40 #include <core/exception.h> 41 #include <protobuf_comm/client.h> 42 #include <protobuf_comm/server.h> 43 #include <protobuf_comm/peer.h> 45 #include <oprs_f-pub.h> 46 #include <google/protobuf/descriptor.h> 70 OpenPRSProtobuf::OpenPRSProtobuf(std::vector<std::string> &proto_path)
71 : server_(NULL), next_client_id_(0)
80 for (
auto c : clients_) {
85 delete message_register_;
96 if ((port > 0) && ! server_) {
100 .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_connected,
this, _1, _2));
102 .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_disconnected,
this, _1, _2));
104 .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_msg,
this, _1, _2, _3, _4));
106 .connect(boost::bind(&OpenPRSProtobuf::handle_server_client_fail,
this, _1, _2, _3, _4));
131 std::string crypto_key, std::string cipher)
133 if (recv_port <= 0) recv_port = send_port;
138 message_register_, crypto_key, cipher);
143 peer_id = ++next_client_id_;
144 peers_[peer_id] = peer;
148 .connect(boost::bind(&OpenPRSProtobuf::handle_peer_msg,
this, peer_id, _1, _2, _3, _4));
150 .connect(boost::bind(&OpenPRSProtobuf::handle_peer_recv_error,
this, peer_id, _1, _2));
152 .connect(boost::bind(&OpenPRSProtobuf::handle_peer_send_error,
this, peer_id, _1));
154 return build_long_long(peer_id);
156 return build_long_long(0);
169 std::string crypto_key, std::string cipher)
205 if (peers_.find(peer_id) != peers_.end()) {
206 delete peers_[peer_id];
207 peers_.erase(peer_id);
219 std::string crypto_key, std::string cipher)
221 if (peers_.find(peer_id) != peers_.end()) {
222 peers_[peer_id]->setup_crypto(crypto_key, cipher);
237 }
catch (std::runtime_error &e) {
250 std::shared_ptr<google::protobuf::Message> *
253 std::shared_ptr<google::protobuf::Message> m =
255 return new std::shared_ptr<google::protobuf::Message>(m);
266 std::shared_ptr<google::protobuf::Message> *m =
267 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
268 if (!*m)
return build_pointer(
new std::shared_ptr<google::protobuf::Message>());
270 return build_pointer(
new std::shared_ptr<google::protobuf::Message>(*m));
284 std::shared_ptr<google::protobuf::Message> *m =
285 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
286 if (!*m)
return build_nil();
300 TermList tl = sl_make_slist();
302 std::shared_ptr<google::protobuf::Message> *m =
303 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
304 if (!*m)
return build_term_l_list_from_c_list(tl);
307 const Descriptor *desc = (*m)->GetDescriptor();
308 const int field_count = desc->field_count();
309 for (
int i = 0; i < field_count; ++i) {
310 tl = build_term_list(tl, build_string(desc->field(i)->name().c_str()));
312 return build_term_l_list_from_c_list(tl);
324 std::shared_ptr<google::protobuf::Message> *m =
325 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
326 if (!*m)
return build_id(declare_atom(
"INVALID-MESSAGE"));
328 const Descriptor *desc = (*m)->GetDescriptor();
329 const FieldDescriptor *field = desc->FindFieldByName(field_name);
331 return build_id(declare_atom(
"DOES-NOT-EXIST"));
333 switch (field->type()) {
334 case FieldDescriptor::TYPE_DOUBLE:
return build_id(declare_atom(
"DOUBLE"));
335 case FieldDescriptor::TYPE_FLOAT:
return build_id(declare_atom(
"FLOAT"));
336 case FieldDescriptor::TYPE_INT64:
return build_id(declare_atom(
"INT64"));
337 case FieldDescriptor::TYPE_UINT64:
return build_id(declare_atom(
"UINT64"));
338 case FieldDescriptor::TYPE_INT32:
return build_id(declare_atom(
"INT32"));
339 case FieldDescriptor::TYPE_FIXED64:
return build_id(declare_atom(
"FIXED64"));
340 case FieldDescriptor::TYPE_FIXED32:
return build_id(declare_atom(
"FIXED32"));
341 case FieldDescriptor::TYPE_BOOL:
return build_id(declare_atom(
"BOOL"));
342 case FieldDescriptor::TYPE_STRING:
return build_id(declare_atom(
"STRING"));
343 case FieldDescriptor::TYPE_MESSAGE:
return build_id(declare_atom(
"MESSAGE"));
344 case FieldDescriptor::TYPE_BYTES:
return build_id(declare_atom(
"BYTES"));
345 case FieldDescriptor::TYPE_UINT32:
return build_id(declare_atom(
"UINT32"));
346 case FieldDescriptor::TYPE_ENUM:
return build_id(declare_atom(
"ENUM"));
347 case FieldDescriptor::TYPE_SFIXED32:
return build_id(declare_atom(
"SFIXED32"));
348 case FieldDescriptor::TYPE_SFIXED64:
return build_id(declare_atom(
"SFIXED64"));
349 case FieldDescriptor::TYPE_SINT32:
return build_id(declare_atom(
"SINT32"));
350 case FieldDescriptor::TYPE_SINT64:
return build_id(declare_atom(
"SINT64"));
351 default:
return build_id(declare_atom(
"UNKNOWN"));
365 std::shared_ptr<google::protobuf::Message> *m =
366 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
367 if (!*m)
return false;
369 const Descriptor *desc = (*m)->GetDescriptor();
370 const FieldDescriptor *field = desc->FindFieldByName(field_name);
371 if (! field)
return false;
373 const Reflection *refl = (*m)->GetReflection();
375 if (field->is_repeated()) {
376 return (refl->FieldSize(**m, field) > 0);
378 return refl->HasField(**m, field);
391 std::shared_ptr<google::protobuf::Message> *m =
392 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
393 if (!*m)
return build_id(declare_atom(
"INVALID-MESSAGE"));
395 const Descriptor *desc = (*m)->GetDescriptor();
396 const FieldDescriptor *field = desc->FindFieldByName(field_name);
397 if (! field)
return build_id(declare_atom(
"DOES-NOT-EXIST"));
398 switch (field->label()) {
399 case FieldDescriptor::LABEL_OPTIONAL:
return build_id(declare_atom(
"OPTIONAL"));
400 case FieldDescriptor::LABEL_REQUIRED:
return build_id(declare_atom(
"REQUIRED"));
401 case FieldDescriptor::LABEL_REPEATED:
return build_id(declare_atom(
"REPEATED"));
402 default:
return build_id(declare_atom(
"UNKNOWN"));
414 std::shared_ptr<google::protobuf::Message> *m =
415 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
416 if (!*m)
return build_id(declare_atom(
"INVALID-MESSAGE"));
418 const Descriptor *desc = (*m)->GetDescriptor();
419 const FieldDescriptor *field = desc->FindFieldByName(field_name);
420 if (! field)
return build_id(declare_atom(
"DOES-NOT-EXIST"));
421 const Reflection *refl = (*m)->GetReflection();
422 if (field->type() != FieldDescriptor::TYPE_MESSAGE && ! refl->HasField(**m, field)) {
425 return build_id(declare_atom(
"NOT-SET"));
427 switch (field->type()) {
428 case FieldDescriptor::TYPE_DOUBLE:
return build_float(refl->GetDouble(**m, field));
429 case FieldDescriptor::TYPE_FLOAT:
return build_float(refl->GetFloat(**m, field));
430 case FieldDescriptor::TYPE_INT64:
return build_long_long(refl->GetInt64(**m, field));
431 case FieldDescriptor::TYPE_UINT64:
432 return build_long_long((
long int)refl->GetUInt64(**m, field));
433 case FieldDescriptor::TYPE_INT32:
return build_integer(refl->GetInt32(**m, field));
434 case FieldDescriptor::TYPE_FIXED64:
435 return build_long_long((
long int)refl->GetUInt64(**m, field));
436 case FieldDescriptor::TYPE_FIXED32:
return build_long_long(refl->GetUInt32(**m, field));
437 case FieldDescriptor::TYPE_BOOL:
return refl->GetBool(**m, field) ? build_t() : build_nil();
438 case FieldDescriptor::TYPE_STRING:
return build_string(refl->GetString(**m, field).c_str());
439 case FieldDescriptor::TYPE_MESSAGE:
441 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
442 google::protobuf::Message *mcopy = mfield.New();
443 mcopy->CopyFrom(mfield);
444 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
445 return build_pointer(ptr);
447 case FieldDescriptor::TYPE_BYTES:
return build_string((
char *)
"bytes");
448 case FieldDescriptor::TYPE_UINT32:
return build_long_long(refl->GetUInt32(**m, field));
449 case FieldDescriptor::TYPE_ENUM:
450 return build_id(declare_atom(refl->GetEnum(**m, field)->name().c_str()));
451 case FieldDescriptor::TYPE_SFIXED32:
return build_integer(refl->GetInt32(**m, field));
452 case FieldDescriptor::TYPE_SFIXED64:
return build_long_long(refl->GetInt64(**m, field));
453 case FieldDescriptor::TYPE_SINT32:
return build_integer(refl->GetInt32(**m, field));
454 case FieldDescriptor::TYPE_SINT64:
return build_long_long(refl->GetInt64(**m, field));
456 throw std::logic_error(
"Unknown protobuf field type encountered");
469 std::shared_ptr<google::protobuf::Message> *m =
470 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
473 const Descriptor *desc = (*m)->GetDescriptor();
474 const FieldDescriptor *field = desc->FindFieldByName(field_name);
479 const Reflection *refl = (*m)->GetReflection();
482 switch (field->type()) {
483 case FieldDescriptor::TYPE_DOUBLE:
484 if (value->type == TT_FLOAT) {
485 refl->SetDouble(m->get(), field, *(value->u.doubleptr));
487 throw std::logic_error(std::string(
"Invalid type, required float for ") +
488 (*m)->GetTypeName() + field_name);
491 case FieldDescriptor::TYPE_FLOAT:
492 if (value->type == TT_FLOAT) {
493 refl->SetFloat(m->get(), field, *(value->u.doubleptr));
495 throw std::logic_error(std::string(
"Invalid type, required float for ") +
496 (*m)->GetTypeName() + field_name);
499 case FieldDescriptor::TYPE_SFIXED64:
500 case FieldDescriptor::TYPE_SINT64:
501 case FieldDescriptor::TYPE_INT64:
502 if (value->type == INTEGER) {
503 refl->SetInt64(m->get(), field, value->u.intval);
504 }
else if (value->type == LONG_LONG) {
505 refl->SetInt64(m->get(), field, value->u.llintval);
507 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ") +
508 (*m)->GetTypeName() + field_name);
511 case FieldDescriptor::TYPE_FIXED64:
512 case FieldDescriptor::TYPE_UINT64:
513 if (value->type == INTEGER) {
514 refl->SetUInt64(m->get(), field, value->u.intval);
515 }
else if (value->type == LONG_LONG) {
516 refl->SetUInt64(m->get(), field, value->u.llintval);
518 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ") +
519 (*m)->GetTypeName() + field_name);
522 case FieldDescriptor::TYPE_SFIXED32:
523 case FieldDescriptor::TYPE_SINT32:
524 case FieldDescriptor::TYPE_INT32:
525 if (value->type == INTEGER) {
526 refl->SetInt32(m->get(), field, value->u.intval);
528 throw std::logic_error(std::string(
"Invalid type, required integer for ") +
529 (*m)->GetTypeName() + field_name);
532 case FieldDescriptor::TYPE_BOOL:
533 if (value->type == TT_ATOM) {
534 if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
535 refl->SetBool(m->get(), field, (value->u.id == lisp_t_sym));
537 throw std::logic_error(std::string(
"Invalid value, allowed are T or NIL for field ") +
538 (*m)->GetTypeName() + field_name);
541 throw std::logic_error(std::string(
"Invalid type, required symbol for ") +
542 (*m)->GetTypeName() + field_name);
545 case FieldDescriptor::TYPE_STRING:
546 if (value->type == STRING) {
547 refl->SetString(m->get(), field, value->u.string);
549 throw std::logic_error(std::string(
"Invalid type, required string for ") +
550 (*m)->GetTypeName() + field_name);
553 case FieldDescriptor::TYPE_MESSAGE:
554 if (value->type == U_POINTER) {
555 std::shared_ptr<google::protobuf::Message> *mfrom =
556 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value->u.u_pointer);
557 Message *mut_msg = refl->MutableMessage(m->get(), field);
558 mut_msg->CopyFrom(**mfrom);
561 throw std::logic_error(std::string(
"Invalid type, required user pointer for ") +
562 (*m)->GetTypeName() + field_name);
565 case FieldDescriptor::TYPE_BYTES:
break;
566 case FieldDescriptor::TYPE_FIXED32:
567 case FieldDescriptor::TYPE_UINT32:
568 if (value->type == INTEGER) {
569 refl->SetUInt32(m->get(), field, value->u.intval);
570 }
else if (value->type == LONG_LONG) {
571 refl->SetUInt32(m->get(), field, value->u.llintval);
573 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ") +
574 (*m)->GetTypeName() + field_name);
577 case FieldDescriptor::TYPE_ENUM:
579 const char *sym_name = NULL;
580 if (value->type == TT_ATOM) {
581 sym_name = value->u.id;
582 }
else if (value->type == STRING) {
583 sym_name = value->u.string;
585 throw std::logic_error(std::string(
"Invalid type, required symbol or string for ") +
586 (*m)->GetTypeName() + field_name);
589 const EnumDescriptor *enumdesc = field->enum_type();
590 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
592 refl->SetEnum(m->get(), field, enumval);
594 std::string sym_str(sym_name);
595 std::transform(sym_str.begin(), sym_str.end(), sym_str.begin(), std::ptr_fun<int, int>(std::toupper));
597 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_str);
600 refl->SetEnum(m->get(), field, enumval);
602 fprintf(stderr,
"%s: cannot set invalid enum value '%s' (neither '%s') on '%s'",
603 (*m)->GetTypeName().c_str(), sym_name, sym_str.c_str(), field_name.c_str());
610 throw std::logic_error(
"Unknown protobuf field type encountered");
612 }
catch (std::logic_error &e) {
627 std::shared_ptr<google::protobuf::Message> *m =
628 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
629 if (!(m || *m))
return;
631 const Descriptor *desc = (*m)->GetDescriptor();
632 const FieldDescriptor *field = desc->FindFieldByName(field_name);
637 const Reflection *refl = (*m)->GetReflection();
640 switch (field->type()) {
641 case FieldDescriptor::TYPE_DOUBLE:
642 if (value->type == TT_FLOAT) {
643 refl->AddDouble(m->get(), field, *(value->u.doubleptr));
645 throw std::logic_error(std::string(
"Invalid type, required float for ") +
646 (*m)->GetTypeName() + field_name);
649 case FieldDescriptor::TYPE_FLOAT:
650 if (value->type == TT_FLOAT) {
651 refl->AddFloat(m->get(), field, *(value->u.doubleptr));
653 throw std::logic_error(std::string(
"Invalid type, required float for ") +
654 (*m)->GetTypeName() + field_name);
658 case FieldDescriptor::TYPE_SFIXED64:
659 case FieldDescriptor::TYPE_SINT64:
660 case FieldDescriptor::TYPE_INT64:
661 if (value->type == INTEGER) {
662 refl->AddInt64(m->get(), field, value->u.intval);
663 }
else if (value->type == LONG_LONG) {
664 refl->AddInt64(m->get(), field, value->u.llintval);
666 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ") +
667 (*m)->GetTypeName() + field_name);
671 case FieldDescriptor::TYPE_SFIXED32:
672 case FieldDescriptor::TYPE_SINT32:
673 case FieldDescriptor::TYPE_INT32:
674 if (value->type == INTEGER) {
675 refl->AddInt32(m->get(), field, value->u.intval);
677 throw std::logic_error(std::string(
"Invalid type, required integer for ") +
678 (*m)->GetTypeName() + field_name);
681 case FieldDescriptor::TYPE_BOOL:
682 if (value->type == TT_ATOM) {
683 if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
684 refl->AddBool(m->get(), field, (value->u.id == lisp_t_sym));
686 throw std::logic_error(std::string(
"Invalid value, allowed are T or NIL for field ") +
687 (*m)->GetTypeName() + field_name);
690 throw std::logic_error(std::string(
"Invalid type, required symbol for ") +
691 (*m)->GetTypeName() + field_name);
694 case FieldDescriptor::TYPE_STRING:
695 if (value->type == STRING) {
696 refl->AddString(m->get(), field, value->u.string);
698 throw std::logic_error(std::string(
"Invalid type, required string for ") +
699 (*m)->GetTypeName() + field_name);
702 case FieldDescriptor::TYPE_MESSAGE:
703 if (value->type == U_POINTER) {
704 std::shared_ptr<google::protobuf::Message> *mfrom =
705 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value->u.u_pointer);
706 Message *mut_msg = refl->AddMessage(m->get(), field);
707 mut_msg->CopyFrom(**mfrom);
710 throw std::logic_error(std::string(
"Invalid type, required user pointer for ") +
711 (*m)->GetTypeName() + field_name);
715 case FieldDescriptor::TYPE_BYTES:
break;
717 case FieldDescriptor::TYPE_FIXED32:
718 case FieldDescriptor::TYPE_UINT32:
719 if (value->type == INTEGER) {
720 refl->AddUInt32(m->get(), field, value->u.intval);
721 }
else if (value->type == LONG_LONG) {
722 refl->AddUInt32(m->get(), field, value->u.llintval);
724 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ") +
725 (*m)->GetTypeName() + field_name);
729 case FieldDescriptor::TYPE_ENUM:
731 const char *sym_name = NULL;
732 if (value->type == TT_ATOM) {
733 sym_name = value->u.id;
734 }
else if (value->type == STRING) {
735 sym_name = value->u.string;
737 throw std::logic_error(std::string(
"Invalid type, required symbol or string for ") +
738 (*m)->GetTypeName() + field_name);
740 const EnumDescriptor *enumdesc = field->enum_type();
741 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
743 refl->AddEnum(m->get(), field, enumval);
752 throw std::logic_error(
"Unknown protobuf field type encountered");
754 }
catch (std::logic_error &e) {
772 if (port <= 0)
return build_nil();
779 client_id = ++next_client_id_;
780 clients_[client_id] = client;
784 boost::bind(&OpenPRSProtobuf::handle_client_connected,
this, client_id));
786 boost::bind(&OpenPRSProtobuf::handle_client_disconnected,
787 this, client_id, boost::asio::placeholders::error));
789 boost::bind(&OpenPRSProtobuf::handle_client_msg,
this, client_id, _1, _2, _3));
791 boost::bind(&OpenPRSProtobuf::handle_client_receive_fail,
this, client_id, _1, _2, _3));
794 return build_long_long(client_id);
806 std::shared_ptr<google::protobuf::Message> *m =
807 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
816 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
818 server_->
send(server_clients_[client_id], *m);
819 sig_server_sent_(server_clients_[client_id], *m);
820 }
else if (clients_.find(client_id) != clients_.end()) {
822 clients_[client_id]->send(*m);
823 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
824 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
825 }
else if (peers_.find(client_id) != peers_.end()) {
827 peers_[client_id]->send(*m);
828 sig_peer_sent_(client_id, *m);
833 }
catch (google::protobuf::FatalException &e) {
836 }
catch (std::runtime_error &e) {
850 std::shared_ptr<google::protobuf::Message> *m =
851 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
853 fprintf(stderr,
"Cannot send broadcast: invalid message");
858 if (peers_.find(peer_id) == peers_.end())
return;
861 peers_[peer_id]->send(*m);
862 }
catch (google::protobuf::FatalException &e) {
863 fprintf(stderr,
"pb-broadcast: failed to broadcast message of type %s: %s\n",
864 (*m)->GetTypeName().c_str(), e.what());
867 sig_peer_sent_(peer_id, *m);
882 if (server_clients_.find(client_id) != server_clients_.end()) {
885 server_clients_.erase(client_id);
886 rev_server_clients_.erase(srv_client);
887 }
else if (clients_.find(client_id) != clients_.end()) {
888 delete clients_[client_id];
889 clients_.erase(client_id);
891 }
catch (std::runtime_error &e) {
892 throw fawkes::Exception(
"Failed to disconnect from client %li: %s", client_id, e.what());
906 std::shared_ptr<google::protobuf::Message> *m =
907 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
908 if (!(m || *m))
return build_id(declare_atom(
"INVALID-MESSAGE"));
910 const Descriptor *desc = (*m)->GetDescriptor();
911 const FieldDescriptor *field = desc->FindFieldByName(field_name);
912 if (! field)
return build_id(declare_atom(
"DOES-NOT-EXIST"));
914 TermList tl = sl_make_slist();
916 if (field->label() == FieldDescriptor::LABEL_REQUIRED ||
917 field->label() == FieldDescriptor::LABEL_OPTIONAL)
920 return build_term_l_list_from_c_list(tl);
923 const Reflection *refl = (*m)->GetReflection();
924 int field_size = refl->FieldSize(**m, field);
925 for (
int i = 0; i < field_size; ++i) {
926 switch (field->type()) {
927 case FieldDescriptor::TYPE_DOUBLE:
928 tl = build_term_list(tl, build_float(refl->GetRepeatedDouble(**m, field, i)));
930 case FieldDescriptor::TYPE_FLOAT:
931 tl = build_term_list(tl, build_float(refl->GetRepeatedFloat(**m, field, i)));
934 case FieldDescriptor::TYPE_UINT64:
935 case FieldDescriptor::TYPE_FIXED64:
936 tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt64(**m, field, i)));
938 case FieldDescriptor::TYPE_UINT32:
939 case FieldDescriptor::TYPE_FIXED32:
940 tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt32(**m, field, i)));
942 case FieldDescriptor::TYPE_BOOL:
943 tl = build_term_list(tl, refl->GetRepeatedBool(**m, field, i) ? build_t() : build_nil());
945 case FieldDescriptor::TYPE_STRING:
946 tl = build_term_list(tl, build_string(refl->GetRepeatedString(**m, field, i).c_str()));
948 case FieldDescriptor::TYPE_MESSAGE:
950 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
951 google::protobuf::Message *mcopy = msg.New();
952 mcopy->CopyFrom(msg);
953 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
954 tl = build_term_list(tl, build_pointer(ptr));
957 case FieldDescriptor::TYPE_BYTES:
958 tl = build_term_list(tl, build_string((
char *)
"bytes"));
960 case FieldDescriptor::TYPE_ENUM:
961 tl = build_term_list(tl, build_id(declare_atom(refl->GetRepeatedEnum(**m, field, i)->name().c_str())));
963 case FieldDescriptor::TYPE_SFIXED32:
964 case FieldDescriptor::TYPE_INT32:
965 case FieldDescriptor::TYPE_SINT32:
966 tl = build_term_list(tl, build_integer(refl->GetRepeatedInt32(**m, field, i)));
968 case FieldDescriptor::TYPE_SFIXED64:
969 case FieldDescriptor::TYPE_SINT64:
970 case FieldDescriptor::TYPE_INT64:
971 tl = build_term_list(tl, build_long_long(refl->GetRepeatedInt64(**m, field, i)));
974 throw std::logic_error(
"Unknown protobuf field type encountered");
978 return build_term_l_list_from_c_list(tl);
990 std::shared_ptr<google::protobuf::Message> *m =
991 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
992 if (!(m || *m))
return false;
994 const Descriptor *desc = (*m)->GetDescriptor();
995 const FieldDescriptor *field = desc->FindFieldByName(field_name);
999 return (field->label() == FieldDescriptor::LABEL_REPEATED);
1010 while (! q_server_client_.empty()) {
1011 auto &sc = q_server_client_.front();
1012 oprs_assert_server_client_event(std::get<0>(sc), std::get<1>(sc),
1013 std::get<2>(sc), std::get<3>(sc));
1014 q_server_client_.pop();
1019 while (! q_client_.empty()) {
1020 auto &c = q_client_.front();
1021 oprs_assert_client_event(std::get<0>(c), std::get<1>(c));
1027 while (! q_msgs_.empty()) {
1028 auto &m = q_msgs_.front();
1029 oprs_assert_message(std::get<0>(m), std::get<1>(m), std::get<2>(m),
1030 std::get<3>(m), std::get<4>(m), std::get<5>(m), std::get<6>(m));
1047 return (! (q_server_client_.empty() && q_client_.empty() && q_msgs_.empty()));
1052 OpenPRSProtobuf::oprs_assert_server_client_event(
long int client_id,
1053 std::string &host,
unsigned short port,
bool connect)
1055 TermList tl = sl_make_slist();
1056 tl = build_term_list(tl, build_long_long(client_id));
1058 tl = build_term_list(tl, build_string(host.c_str()));
1059 tl = build_term_list(tl, build_integer(port));
1060 add_external_fact((
char *)
"protobuf-server-client-connected", tl);
1062 add_external_fact((
char *)
"protobuf-server-client-disconnected", tl);
1068 OpenPRSProtobuf::oprs_assert_client_event(
long int client_id,
bool connect)
1070 TermList tl = sl_make_slist();
1071 tl = build_term_list(tl, build_long_long(client_id));
1073 add_external_fact((
char *)
"protobuf-client-connected", tl);
1075 add_external_fact((
char *)
"protobuf-client-disconnected", tl);
1080 OpenPRSProtobuf::oprs_assert_message(std::string &endpoint_host,
unsigned short endpoint_port,
1081 uint16_t comp_id, uint16_t msg_type,
1082 std::shared_ptr<google::protobuf::Message> &msg,
1083 OpenPRSProtobuf::ClientType ct,
1084 unsigned int client_id)
1086 TermList tl = sl_make_slist();
1089 gettimeofday(&tv, 0);
1090 void *ptr =
new std::shared_ptr<google::protobuf::Message>(msg);
1092 tl = build_term_list(tl, build_string(msg->GetTypeName().c_str()));
1094 tl = build_term_list(tl, build_integer(comp_id));
1096 tl = build_term_list(tl, build_integer(msg_type));
1098 tl = build_term_list(tl, build_string((client_id == 0) ?
"BROADCAST" :
"STREAM"));
1100 tl = build_term_list(tl, build_long_long(tv.tv_sec));
1101 tl = build_term_list(tl, build_long_long(tv.tv_usec));
1103 tl = build_term_list(tl, build_string(endpoint_host.c_str()));
1104 tl = build_term_list(tl, build_integer(endpoint_port));
1106 tl = build_term_list(tl, build_string(ct == CT_CLIENT ?
"CLIENT" :
1107 (ct == CT_SERVER ?
"SERVER" :
"PEER")));
1109 tl = build_term_list(tl, build_integer(client_id));
1111 tl = build_term_list(tl, build_pointer(ptr));
1113 add_external_fact((
char *)
"protobuf-msg", tl);
1118 boost::asio::ip::tcp::endpoint &endpoint)
1121 long int client_id = -1;
1124 client_id = ++next_client_id_;
1125 client_endpoints_[client_id] =
1126 std::make_pair(endpoint.address().to_string(), endpoint.port());
1127 server_clients_[client_id] = client;
1128 rev_server_clients_[client] = client_id;
1132 std::make_tuple(client_id, endpoint.address().to_string(), endpoint.port(),
true));
1138 const boost::system::error_code &error)
1140 long int client_id = -1;
1143 RevServerClientMap::iterator c;
1144 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1145 client_id = c->second;
1146 rev_server_clients_.erase(c);
1147 server_clients_.erase(client_id);
1151 if (client_id >= 0) {
1152 q_server_client_.
push_locked(std::make_tuple(client_id,
"", 0,
false));
1165 uint16_t component_id, uint16_t msg_type,
1166 std::shared_ptr<google::protobuf::Message> msg)
1169 RevServerClientMap::iterator c;
1170 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1171 q_msgs_.
push_locked(std::make_tuple(client_endpoints_[c->second].first,
1172 client_endpoints_[c->second].second,
1173 component_id, msg_type, msg, CT_SERVER, c->second));
1185 uint16_t component_id, uint16_t msg_type,
1189 RevServerClientMap::iterator c;
1190 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1211 OpenPRSProtobuf::handle_peer_msg(
long int peer_id,
1212 boost::asio::ip::udp::endpoint &endpoint,
1213 uint16_t component_id, uint16_t msg_type,
1214 std::shared_ptr<google::protobuf::Message> msg)
1216 q_msgs_.
push_locked(std::make_tuple(endpoint.address().to_string(), endpoint.port(),
1217 component_id, msg_type, msg, CT_PEER, peer_id));
1226 OpenPRSProtobuf::handle_peer_recv_error(
long int peer_id,
1227 boost::asio::ip::udp::endpoint &endpoint, std::string msg)
1229 fprintf(stderr,
"Failed to receive peer message from %s:%u: %s\n",
1230 endpoint.address().to_string().c_str(), endpoint.port(), msg.c_str());
1237 OpenPRSProtobuf::handle_peer_send_error(
long int peer_id, std::string msg)
1244 OpenPRSProtobuf::handle_client_connected(
long int client_id)
1246 q_client_.
push_locked(std::make_tuple(client_id,
true));
1250 OpenPRSProtobuf::handle_client_disconnected(
long int client_id,
1251 const boost::system::error_code &error)
1253 q_client_.
push_locked(std::make_tuple(client_id,
false));
1257 OpenPRSProtobuf::handle_client_msg(
long int client_id,
1258 uint16_t comp_id, uint16_t msg_type,
1259 std::shared_ptr<google::protobuf::Message> msg)
1261 q_msgs_.
push_locked(std::make_tuple(
"", 0, comp_id, msg_type, msg, CT_CLIENT, client_id));
1266 OpenPRSProtobuf::handle_client_receive_fail(
long int client_id,
1267 uint16_t comp_id, uint16_t msg_type, std::string msg)
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
unsigned int ClientID
ID to identify connected clients.
Register to map msg type numbers to Protobuf messages.
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
~OpenPRSProtobuf()
Destructor.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
Term * oprs_pb_peer_create(std::string host, int port)
Enable protobuf peer.
void oprs_pb_process()
Process all pending events.
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
void disconnect(ClientID client)
Disconnect specific client.
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
Term * oprs_pb_peer_create_crypto(std::string host, int port, std::string crypto_key="", std::string cipher="")
Enable protobuf peer.
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Communicate by broadcasting protobuf messages.
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
Term * oprs_pb_peer_create_local_crypto(std::string host, int send_port, int recv_port, std::string crypto_key="", std::string cipher="")
Enable protobuf peer.
Base class for exceptions in Fawkes.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
std::shared_ptr< google::protobuf::Message > new_message_for(uint16_t component_id, uint16_t msg_type)
Create a new message instance.
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
Stream server for protobuf message transmission.
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
void push_locked(const Type &x)
Push element to queue with lock protection.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Term * oprs_pb_peer_create_local(std::string host, int send_port, int recv_port)
Enable protobuf peer.
void oprs_pb_disable_server()
Disable protobuf stream server.
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Stream client for protobuf message transmission.
bool oprs_pb_events_pending()
Check if there are pending events.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
void add_message_type(std::string msg_type)
Add a message type from generated pool.
void oprs_pb_peer_setup_crypto(long int peer_id, std::string crypto_key, std::string cipher)
Setup crypto for peer.
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.