23 #include "sync_thread.h" 25 #include <blackboard/remote.h> 26 #include <core/threading/mutex_locker.h> 27 #include <utils/time/wait.h> 46 std::string &peer_cfg_prefix,
50 set_name(
"BBSyncThread[%s]", peer.c_str());
53 __bbsync_cfg_prefix = bbsync_cfg_prefix;
54 __peer_cfg_prefix = peer_cfg_prefix;
70 unsigned int check_interval = 0;
75 check_interval =
config->
get_uint((__bbsync_cfg_prefix +
"check_interval").c_str());
77 e.
append(
"Host or port not specified for peer");
82 check_interval =
config->
get_uint((__peer_cfg_prefix +
"check_interval").c_str());
88 read_config_combos(__peer_cfg_prefix +
"reading/",
false);
89 read_config_combos(__peer_cfg_prefix +
"writing/",
true);
91 for (ComboMap::iterator i = __combos.begin(); i != __combos.end(); ++i) {
93 i->second.reader_id.c_str(), i->second.remote_writer ?
"local" :
"remote",
94 i->second.writer_id.c_str(), i->second.remote_writer ?
"remote" :
"local");
100 if (! check_connection()) {
134 BlackBoardSynchronizationThread::check_connection()
136 if (! __remote_bb || ! __remote_bb->
is_alive()) {
138 logger->
log_warn(
name(),
"Lost connection via remote BB to %s (%s:%u), will try to re-establish",
139 __peer.c_str(), __host.c_str(), __port);
150 __peer.c_str(), __host.c_str(), __port);
164 BlackBoardSynchronizationThread::read_config_combos(std::string prefix,
bool writing)
168 if (strcmp(i->
type(),
"string") != 0) {
170 "but found value of type %s",
171 prefix.c_str(), i->
type());
176 std::string varname = std::string(i->
path()).substr(prefix.length());
180 if ((sf = uid.find(
"::")) == std::string::npos) {
182 throw Exception(
"Interface UID '%s' at %s is not valid, missing double colon",
183 uid.c_str(), i->
path());
186 std::string type = uid.substr(0, sf);
187 std::string
id = uid.substr(sf + 2);
188 combo_t combo = { type, id, id, writing };
190 if ( (sf =
id.find(
"=")) != std::string::npos) {
192 combo.reader_id =
id.substr(0, sf);
193 combo.writer_id =
id.substr(sf + 1);
196 __combos[varname] = combo;
203 BlackBoardSynchronizationThread::open_interfaces()
208 ComboMap::iterator i;
209 for (i = __combos.begin(); i != __combos.end(); ++i) {
210 Interface *iface_reader = NULL, *iface_writer = NULL;
217 i->second.remote_writer ?
"locally" :
"remotely",
218 i->second.type.c_str(), i->second.reader_id.c_str());
220 i->second.reader_id.c_str());
224 i->second.remote_writer ?
"remotely" :
"locally",
225 i->second.type.c_str(), i->second.writer_id.c_str());
227 i->second.writer_id.c_str());
230 InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
231 __interfaces[iface_reader] = ii;
234 reader_bb->
close(iface_reader);
235 writer_bb->
close(iface_writer);
243 reader_bb, writer_bb);
245 __sync_listeners[iface_reader] = sync_listener;
247 if (i->second.remote_writer) {
257 BlackBoardSynchronizationThread::close_interfaces()
259 SyncListenerMap::iterator s;
260 for (s = __sync_listeners.begin(); s != __sync_listeners.end(); ++s) {
267 InterfaceMap::iterator i;
268 for (i = __interfaces.begin(); i != __interfaces.end(); ++i) {
270 i->second.combo->remote_writer ?
"local" :
"remote",
272 if (i->second.combo->remote_writer) {
277 __remote_bb->
close(i->first);
279 if (i->second.writer) {
281 i->second.combo->remote_writer ?
"remote" :
"local",
282 i->second.writer->uid());
283 if (i->second.combo->remote_writer) {
284 __remote_bb->
close(i->second.writer);
290 __interfaces.clear();
291 __sync_listeners.clear();
304 if (__interfaces[interface].writer) {
306 logger->
log_warn(
name(),
"Writer added for %s, but relay exists already. Bug?", interface->uid());
308 logger->
log_warn(
name(),
"Writer added for %s, opening relay writer", interface->uid());
312 InterfaceInfo &ii = __interfaces[interface];
314 iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(),
315 ii.combo->writer_id.c_str());
318 ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
319 ii.combo->writer_id.c_str());
322 ii.reader_bb, ii.writer_bb);
324 __sync_listeners[interface] = sync_listener;
328 delete sync_listener;
329 ii.writer_bb->close(iface);
331 ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
332 ii.combo->writer_id.c_str());
348 if (! __interfaces[interface].writer) {
350 logger->
log_warn(
name(),
"Writer removed for %s, but no relay exists. Bug?", interface->uid());
352 logger->
log_warn(
name(),
"Writer removed for %s, closing relay writer", interface->uid());
354 InterfaceInfo &ii = __interfaces[interface];
356 delete __sync_listeners[interface];
357 __sync_listeners[interface] = NULL;
359 ii.writer_bb->close(ii.writer);
364 ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
365 ii.combo->writer_id.c_str());
virtual void finalize()
Finalize the thread.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
virtual ~BlackBoardSynchronizationThread()
Destructor.
virtual const char * type() const =0
Type of value.
Fawkes library namespace.
virtual void init()
Initialize the thread.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual bool next()=0
Check if there is another element and advance to this if possible.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Base class for all Fawkes BlackBoard interfaces.
Logger * logger
This is the Logger member used to access the logger.
void wait_systime()
Wait until minimum loop time has been reached in real time.
BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, std::string &peer_cfg_prefix, std::string &peer)
Constructor.
Clock * clock
By means of this member access to the clock is given.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
void set_name(const char *format,...)
Set name of thread.
Base class for exceptions in Fawkes.
void remove_interface(fawkes::Interface *interface)
Remove an interface to listen to.
virtual bool is_alive() const =0
Check if the BlackBoard is still alive.
bool has_writer() const
Check if there is a writer for the interface.
virtual std::string get_string() const =0
Get string value.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void add_interface(fawkes::Interface *interface)
Add an interface to listen to.
virtual const char * path() const =0
Path of value.
void print_trace()
Prints trace to stderr.
void mark_start()
Mark start of loop.
Listener for writer events in bbsync plugin.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
Iterator interface to iterate over config values.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
void writer_added(fawkes::Interface *interface)
A writer has been added for an interface.
void writer_removed(fawkes::Interface *interface)
A writer has been removed for an interface.
The BlackBoard abstract class.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual void loop()
Code to execute in the thread.
Configuration * config
This is the Configuration member used to access the configuration.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
void append(const char *format,...)
Append messages to the message list.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Synchronize two interfaces.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
virtual void close(Interface *interface)=0
Close interface.