Fawkes API  Fawkes Development Version
gazsim_comm_thread.cpp
1 /***************************************************************************
2  * gazsim_comm_plugin.cpp - Plugin simulates peer-to-peer communication over
3  * an network with configurable instability and manages
4  * the frowarding of messages to different ports on
5  * the same machine.
6  *
7  * Created: Thu Sep 12 11:09:48 2013
8  * Copyright 2013 Frederik Zwilling
9  *
10  ****************************************************************************/
11 
12 /* This program is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU Library General Public License for more details.
21  *
22  * Read the full text in the LICENSE.GPL file in the doc directory.
23  */
24 
25 #include <aspect/blocked_timing.h>
26 #include <protobuf_comm/peer.h>
27 #include <protobuf_comm/message_register.h>
28 #include <stdlib.h>
29 #include "gazsim_comm_thread.h"
30 #include <algorithm>
31 
32 using namespace fawkes;
33 using namespace protobuf_comm;
34 
35 /** @class GazsimCommThread "clips_thread.h"
36  * Plugin simulates and manages communication for Simulation in Gazebo
37  * @author Frederik Zwilling
38  */
39 
40 GazsimCommThread::GazsimCommThread()
41  : Thread("GazsimCommThread", Thread::OPMODE_WAITFORWAKEUP),
42  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
43 {
44 }
45 
46 GazsimCommThread::~GazsimCommThread()
47 {
48 }
49 
50 
51 void
53 {
54  //logger->log_info(name(), "GazsimComm initializing");
55  initialized_ = false;
56 
57  //read config values
58  proto_dirs_ = config->get_strings("/gazsim/proto-dirs");
59  package_loss_ = config->get_float("/gazsim/comm/package-loss");
60  addresses_ = config->get_strings("/gazsim/comm/addresses");
61  send_ports_ = config->get_uints("/gazsim/comm/send-ports");
62  recv_ports_ = config->get_uints("/gazsim/comm/recv-ports");
63  use_crypto1_ = config->get_bool("/gazsim/comm/use-crypto1");
64  use_crypto2_ = config->get_bool("/gazsim/comm/use-crypto1");
65  send_ports_crypto1_ = config->get_uints("/gazsim/comm/send-ports-crypto1");
66  recv_ports_crypto1_ = config->get_uints("/gazsim/comm/recv-ports-crypto1");
67  send_ports_crypto2_ = config->get_uints("/gazsim/comm/send-ports-crypto2");
68  recv_ports_crypto2_ = config->get_uints("/gazsim/comm/recv-ports-crypto2");
69  if(addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
70  || ( use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
71  || ( use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
72  || ( use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
73  || ( use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size()))
74  {
75  logger->log_warn(name(), "/gazsim/comm/ has an invalid configuration!");
76  }
77 
78 
79 
80  //resolve proto paths
81  try {
82  proto_dirs_ = config->get_strings("/clips-protobuf/proto-dirs");
83  for (size_t i = 0; i < proto_dirs_.size(); ++i) {
84  std::string::size_type pos;
85  if ((pos = proto_dirs_[i].find("@BASEDIR@")) != std::string::npos) {
86  proto_dirs_[i].replace(pos, 9, BASEDIR);
87  }
88  if ((pos = proto_dirs_[i].find("@FAWKES_BASEDIR@")) != std::string::npos) {
89  proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
90  }
91  if ((pos = proto_dirs_[i].find("@RESDIR@")) != std::string::npos) {
92  proto_dirs_[i].replace(pos, 8, RESDIR);
93  }
94  if ((pos = proto_dirs_[i].find("@CONFDIR@")) != std::string::npos) {
95  proto_dirs_[i].replace(pos, 9, CONFDIR);
96  }
97  if (proto_dirs_[i][proto_dirs_.size()-1] != '/') {
98  proto_dirs_[i] += "/";
99  }
100  }
101  } catch (Exception &e) {
102  logger->log_warn(name(), "Failed to load proto paths from config, exception follows");
103  logger->log_warn(name(), e);
104  }
105 
106  //create peer connections
107  peers_.resize(addresses_.size());
108  peers_crypto1_.resize(addresses_.size());
109  peers_crypto2_.resize(addresses_.size());
110  for(unsigned int i = 0; i < addresses_.size(); i++)
111  {
112  peers_[i] = new ProtobufBroadcastPeer(addresses_[i], send_ports_[i],
113  recv_ports_[i], proto_dirs_);
114  peers_[i]->signal_received_raw().connect(boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
115  if(use_crypto1_)
116  {
117  peers_crypto1_[i] = new ProtobufBroadcastPeer(addresses_[i], send_ports_crypto1_[i],
118  recv_ports_crypto1_[i], proto_dirs_);
119  peers_crypto1_[i]->signal_received_raw().connect(boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
120  }
121  if(use_crypto2_)
122  {
123  peers_crypto2_[i] = new ProtobufBroadcastPeer(addresses_[i], send_ports_crypto2_[i],
124  recv_ports_crypto2_[i], proto_dirs_);
125  peers_crypto2_[i]->signal_received_raw().connect(boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
126  }
127  }
128  initialized_ = true;
129 }
130 
131 
132 void
134 {
135  for(unsigned int i = 0; i < peers_.size(); i++)
136  {
137  delete peers_[i];
138  }
139 }
140 
141 
142 void
144 {
145 }
146 
147 /**
148  * Receive and forward msg
149  * @param endpoint port msg received from
150  * @param component_id message_component_id
151  * @param msg_type msg_type
152  * @param msg Message
153  */
154 void
155 GazsimCommThread::receive_msg(boost::asio::ip::udp::endpoint &endpoint,
156  uint16_t component_id, uint16_t msg_type,
157  std::shared_ptr<google::protobuf::Message> msg)
158 {
159  //logger->log_info(name(), "Got Peer Message from port %d", endpoint.port());
160  unsigned int incoming_peer_port = endpoint.port(); //this is suprisingly the send-port
161 
162  if(!initialized_)
163  {
164  return;
165  }
166 
167  //simulate package loss
168  double rnd = ((double) rand()) / ((double) RAND_MAX); //0.0 <= rnd <= 1.0
169  if(rnd < package_loss_)
170  {
171  return;
172  }
173  //send message to all other peers
174  for(unsigned int i = 0; i < peers_.size(); i++)
175  {
176  if(send_ports_[i] != incoming_peer_port)
177  {
178  peers_[i]->send(msg);
179  }
180  }
181 }
182 
183 /**
184  * Receive and forward raw msg
185  * @param endpoint port msg received from
186  * @param header header of the msg
187  * @param data data stream
188  * @param length length of the data stream
189  */
190 void
191 GazsimCommThread::receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint,
192  protobuf_comm::frame_header_t &header, void * data,
193  size_t length)
194 {
195  //logger->log_info(name(), "Got raw Message from port %d", endpoint.port());
196  unsigned int incoming_peer_port = endpoint.port(); //this is suprisingly the send-port
197 
198  if(!initialized_)
199  {
200  return;
201  }
202 
203  //simulate package loss
204  double rnd = ((double) rand()) / ((double) RAND_MAX); //0.0 <= rnd <= 1.0
205  if(rnd < package_loss_)
206  {
207  return;
208  }
209 
210  //check which set of peers the message comes from
211  std::vector<protobuf_comm::ProtobufBroadcastPeer*> peers;
212  std::vector<unsigned int> send_ports;
213  if(std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end())
214  {
215  peers = peers_;
216  send_ports = send_ports_;
217  }
218  else if(use_crypto1_ && std::find(send_ports_crypto1_.begin(), send_ports_crypto1_.end(), incoming_peer_port) != send_ports_crypto1_.end())
219  {
220  peers = peers_crypto1_;
221  send_ports = send_ports_crypto1_;
222  }
223  else if(use_crypto2_ && std::find(send_ports_crypto2_.begin(), send_ports_crypto2_.end(), incoming_peer_port) != send_ports_crypto1_.end())
224  {
225  peers = peers_crypto2_;
226  send_ports = send_ports_crypto2_;
227  }
228 
229  //send message to all other peers
230  for(unsigned int i = 0; i < peers.size(); i++)
231  {
232  if(send_ports[i] != incoming_peer_port)
233  {
234  peers[i]->send_raw(header, data, length);
235  }
236  }
237 }
void receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint, protobuf_comm::frame_header_t &header, void *data, size_t length)
Receive and forward raw msg.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
Thread class encapsulation of pthreads.
Definition: thread.h:42
const char * name() const
Get the name of the plugin.
Definition: plugin.cpp:142
Network framing header.
Definition: frame_header.h:74
Thread aspect to use blocked timing.
Communicate by broadcasting protobuf messages.
Definition: peer.h:60
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual std::vector< unsigned int > get_uints(const char *path)=0
Get list of values from configuration which is of type unsigned int.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:227
virtual void finalize()
Finalize the thread.
Configuration * config
Fawkes configuration.
Definition: plugin.h:58
virtual void init()
Initialize the thread.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
void receive_msg(boost::asio::ip::udp::endpoint &endpoint, uint16_t component_id, uint16_t msg_type, std::shared_ptr< google::protobuf::Message > msg)
Receive and forward msg.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual void loop()
Code to execute in the thread.