25 #include <aspect/blocked_timing.h> 26 #include <protobuf_comm/peer.h> 27 #include <protobuf_comm/message_register.h> 29 #include "gazsim_comm_thread.h" 40 GazsimCommThread::GazsimCommThread()
41 :
Thread(
"GazsimCommThread",
Thread::OPMODE_WAITFORWAKEUP),
46 GazsimCommThread::~GazsimCommThread()
65 send_ports_crypto1_ =
config->
get_uints(
"/gazsim/comm/send-ports-crypto1");
66 recv_ports_crypto1_ =
config->
get_uints(
"/gazsim/comm/recv-ports-crypto1");
67 send_ports_crypto2_ =
config->
get_uints(
"/gazsim/comm/send-ports-crypto2");
68 recv_ports_crypto2_ =
config->
get_uints(
"/gazsim/comm/recv-ports-crypto2");
69 if(addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
70 || ( use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
71 || ( use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
72 || ( use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
73 || ( use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size()))
75 logger->
log_warn(
name(),
"/gazsim/comm/ has an invalid configuration!");
83 for (
size_t i = 0; i < proto_dirs_.size(); ++i) {
84 std::string::size_type pos;
85 if ((pos = proto_dirs_[i].find(
"@BASEDIR@")) != std::string::npos) {
86 proto_dirs_[i].replace(pos, 9, BASEDIR);
88 if ((pos = proto_dirs_[i].find(
"@FAWKES_BASEDIR@")) != std::string::npos) {
89 proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
91 if ((pos = proto_dirs_[i].find(
"@RESDIR@")) != std::string::npos) {
92 proto_dirs_[i].replace(pos, 8, RESDIR);
94 if ((pos = proto_dirs_[i].find(
"@CONFDIR@")) != std::string::npos) {
95 proto_dirs_[i].replace(pos, 9, CONFDIR);
97 if (proto_dirs_[i][proto_dirs_.size()-1] !=
'/') {
98 proto_dirs_[i] +=
"/";
102 logger->
log_warn(
name(),
"Failed to load proto paths from config, exception follows");
107 peers_.resize(addresses_.size());
108 peers_crypto1_.resize(addresses_.size());
109 peers_crypto2_.resize(addresses_.size());
110 for(
unsigned int i = 0; i < addresses_.size(); i++)
113 recv_ports_[i], proto_dirs_);
118 recv_ports_crypto1_[i], proto_dirs_);
124 recv_ports_crypto2_[i], proto_dirs_);
135 for(
unsigned int i = 0; i < peers_.size(); i++)
156 uint16_t component_id, uint16_t msg_type,
157 std::shared_ptr<google::protobuf::Message> msg)
160 unsigned int incoming_peer_port = endpoint.port();
168 double rnd = ((double) rand()) / ((
double) RAND_MAX);
169 if(rnd < package_loss_)
174 for(
unsigned int i = 0; i < peers_.size(); i++)
176 if(send_ports_[i] != incoming_peer_port)
178 peers_[i]->send(msg);
196 unsigned int incoming_peer_port = endpoint.port();
204 double rnd = ((double) rand()) / ((
double) RAND_MAX);
205 if(rnd < package_loss_)
211 std::vector<protobuf_comm::ProtobufBroadcastPeer*> peers;
212 std::vector<unsigned int> send_ports;
213 if(std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end())
216 send_ports = send_ports_;
218 else if(use_crypto1_ && std::find(send_ports_crypto1_.begin(), send_ports_crypto1_.end(), incoming_peer_port) != send_ports_crypto1_.end())
220 peers = peers_crypto1_;
221 send_ports = send_ports_crypto1_;
223 else if(use_crypto2_ && std::find(send_ports_crypto2_.begin(), send_ports_crypto2_.end(), incoming_peer_port) != send_ports_crypto1_.end())
225 peers = peers_crypto2_;
226 send_ports = send_ports_crypto2_;
230 for(
unsigned int i = 0; i < peers.size(); i++)
232 if(send_ports[i] != incoming_peer_port)
234 peers[i]->send_raw(header, data, length);
void receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint, protobuf_comm::frame_header_t &header, void *data, size_t length)
Receive and forward raw msg.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
Thread class encapsulation of pthreads.
const char * name() const
Get the name of the plugin.
Thread aspect to use blocked timing.
Communicate by broadcasting protobuf messages.
Base class for exceptions in Fawkes.
virtual std::vector< unsigned int > get_uints(const char *path)=0
Get list of values from configuration which is of type unsigned int.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
virtual void finalize()
Finalize the thread.
Configuration * config
Fawkes configuration.
virtual void init()
Initialize the thread.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
void receive_msg(boost::asio::ip::udp::endpoint &endpoint, uint16_t component_id, uint16_t msg_type, std::shared_ptr< google::protobuf::Message > msg)
Receive and forward msg.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual void loop()
Code to execute in the thread.