24 #include <netcomm/fawkes/client.h>
25 #include <netcomm/fawkes/client_handler.h>
26 #include <netcomm/fawkes/message_queue.h>
27 #include <netcomm/fawkes/transceiver.h>
28 #include <netcomm/socket/stream.h>
29 #include <netcomm/utils/exceptions.h>
31 #include <core/threading/thread.h>
32 #include <core/threading/mutex.h>
33 #include <core/threading/mutex_locker.h>
34 #include <core/threading/wait_condition.h>
35 #include <core/exceptions/system.h>
54 :
Exception(
"A handler for this component has already been registered")
78 __outbound_mutex =
new Mutex();
81 __outbound_active = 0;
82 __outbound_msgq = __outbound_msgqs[0];
88 for (
unsigned int i = 0; i < 2; ++i) {
89 while ( ! __outbound_msgqs[i]->empty() ) {
92 __outbound_msgqs[i]->pop();
95 delete __outbound_msgqs[0];
96 delete __outbound_msgqs[1];
97 delete __outbound_mutex;
102 __parent->set_send_slave_alive();
109 while ( __outbound_havemore ) {
110 __outbound_mutex->
lock();
111 __outbound_havemore =
false;
113 __outbound_active = 1 - __outbound_active;
114 __outbound_msgq = __outbound_msgqs[__outbound_active];
115 __outbound_mutex->
unlock();
117 if ( ! q->empty() ) {
121 __parent->connection_died();
152 __outbound_mutex->
lock();
153 __outbound_msgq->push(message);
154 __outbound_havemore =
true;
155 __outbound_mutex->
unlock();
165 Mutex *__outbound_mutex;
166 unsigned int __outbound_active;
167 bool __outbound_havemore;
190 :
Thread(
"FawkesNetworkClientRecvThread")
195 __recv_mutex = recv_mutex;
201 while ( ! __inbound_msgq->empty() ) {
204 __inbound_msgq->pop();
206 delete __inbound_msgq;
212 std::list<unsigned int> wakeup_list;
219 __inbound_msgq->
lock();
220 while ( ! __inbound_msgq->empty() ) {
222 wakeup_list.push_back(m->
cid());
223 __parent->dispatch_message(m);
225 __inbound_msgq->pop();
232 wakeup_list.unique();
233 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
234 __parent->wake_handlers(*i);
243 __parent->set_recv_slave_alive();
261 __parent->connection_died();
268 __parent->connection_died();
300 __hostname = strdup(hostname);
301 __ip = ip ? strdup(ip) : NULL;
308 connection_died_recently =
false;
309 __send_slave_alive =
false;
310 __recv_slave_alive =
false;
312 slave_status_mutex =
new Mutex();
317 __recv_mutex =
new Mutex();
319 __connest_mutex =
new Mutex();
322 __connest_interrupted =
false;
340 connection_died_recently =
false;
341 __send_slave_alive =
false;
342 __recv_slave_alive =
false;
344 slave_status_mutex =
new Mutex();
349 __recv_mutex =
new Mutex();
351 __connest_mutex =
new Mutex();
354 __connest_interrupted =
false;
365 unsigned short int port,
const char *ip)
367 __hostname = strdup(hostname);
368 __ip = ip ? strdup(ip) : NULL;
375 connection_died_recently =
false;
376 __send_slave_alive =
false;
377 __recv_slave_alive =
false;
379 slave_status_mutex =
new Mutex();
384 __recv_mutex =
new Mutex();
386 __connest_mutex =
new Mutex();
389 __connest_interrupted =
false;
399 if (__hostname) free(__hostname);
400 if (__ip) free(__ip);
401 delete slave_status_mutex;
403 delete __connest_waitcond;
404 delete __connest_mutex;
405 delete __recv_waitcond;
417 if ( __hostname == NULL && __ip == NULL) {
426 connection_died_recently =
false;
430 s->
connect(__ip ? __ip : __hostname, __port);
432 __send_slave->
start();
434 __recv_slave->
start();
436 connection_died_recently =
true;
437 if ( __send_slave ) {
439 __send_slave->
join();
443 if ( __recv_slave ) {
445 __recv_slave->
join();
449 __send_slave_alive =
false;
450 __recv_slave_alive =
false;
456 __connest_mutex->
lock();
457 while ( ! __connest && ! __connest_interrupted ) {
458 __connest_waitcond->
wait();
460 bool interrupted = __connest_interrupted;
461 __connest_interrupted =
false;
462 __connest_mutex->
unlock();
467 notify_of_connection_established();
493 if (__hostname) free(__hostname);
494 if (__ip) free(__ip);
495 __hostname = strdup(hostname);
496 __ip = ip ? strdup(ip) : NULL;
505 if ( s == NULL )
return;
507 if ( __send_slave_alive ) {
508 if ( ! connection_died_recently ) {
514 __send_slave->
join();
518 if ( __recv_slave_alive ) {
520 __recv_slave->
join();
524 __send_slave_alive =
false;
525 __recv_slave_alive =
false;
529 if (! connection_died_recently) {
542 __connest_mutex->
lock();
543 __connest_interrupted =
true;
545 __connest_mutex->
unlock();
563 if (__send_slave) __send_slave->
enqueue(message);
580 unsigned int timeout_sec)
582 if (__send_slave && __recv_slave) {
583 __recv_mutex->
lock();
584 if ( __recv_received.find(message->
cid()) != __recv_received.end()) {
586 unsigned int cid = message->
cid();
587 throw Exception(
"There is already a thread waiting for messages of "
588 "component id %u", cid);
590 __send_slave->
enqueue(message);
591 unsigned int cid = message->
cid();
592 __recv_received[cid] =
false;
593 while (!__recv_received[cid] && ! connection_died_recently) {
595 __recv_received.erase(cid);
597 throw TimeoutException(
"Timeout reached while waiting for incoming message "
598 "(outgoing was %u:%u)", message->
cid(), message->
msgid());
601 __recv_received.erase(cid);
604 unsigned int cid = message->
cid();
605 unsigned int msgid = message->
msgid();
606 throw Exception(
"Cannot enqueue given message %u:%u, sender or "
607 "receiver missing", cid, msgid);
621 unsigned int component_id)
624 if ( handlers.find(component_id) != handlers.end() ) {
628 handlers[component_id] = handler;
642 if ( handlers.find(component_id) != handlers.end() ) {
643 handlers[component_id]->deregistered(_id);
644 handlers.erase(component_id);
647 __recv_mutex->
lock();
648 if (__recv_received.find(component_id) != __recv_received.end()) {
649 __recv_received[component_id] =
true;
659 unsigned int cid = m->
cid();
661 if (handlers.find(cid) != handlers.end()) {
662 handlers[cid]->inbound_received(m, _id);
669 FawkesNetworkClient::wake_handlers(
unsigned int cid)
671 __recv_mutex->
lock();
672 if (__recv_received.find(cid) != __recv_received.end()) {
673 __recv_received[cid] =
true;
680 FawkesNetworkClient::notify_of_connection_dead()
682 __connest_mutex->
lock();
684 __connest_mutex->
unlock();
687 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
688 i->second->connection_died(_id);
692 __recv_mutex->
lock();
698 FawkesNetworkClient::notify_of_connection_established()
701 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
702 i->second->connection_established(_id);
709 FawkesNetworkClient::connection_died()
711 connection_died_recently =
true;
712 notify_of_connection_dead();
717 FawkesNetworkClient::set_send_slave_alive()
719 slave_status_mutex->
lock();
720 __send_slave_alive =
true;
721 if ( __send_slave_alive && __recv_slave_alive ) {
722 __connest_mutex->
lock();
725 __connest_mutex->
unlock();
727 slave_status_mutex->
unlock();
732 FawkesNetworkClient::set_recv_slave_alive()
734 slave_status_mutex->
lock();
735 __recv_slave_alive =
true;
736 if ( __send_slave_alive && __recv_slave_alive ) {
737 __connest_mutex->
lock();
740 __connest_mutex->
unlock();
742 slave_status_mutex->
unlock();
756 __recv_mutex->
lock();
757 if ( __recv_received.find(component_id) != __recv_received.end()) {
759 throw Exception(
"There is already a thread waiting for messages of "
760 "component id %u", component_id);
762 __recv_received[component_id] =
false;
763 while (! __recv_received[component_id] && ! connection_died_recently) {
765 __recv_received.erase(component_id);
767 throw TimeoutException(
"Timeout reached while waiting for incoming message "
768 "(component %u)", component_id);
771 __recv_received.erase(component_id);
784 __recv_mutex->
lock();
785 if ( __recv_received.find(component_id) != __recv_received.end()) {
786 __recv_received[component_id] =
true;
799 return (! connection_died_recently && (s != NULL));
820 throw Exception(
"Trying to get the ID of a client that has no ID");
virtual void connect(const char *hostname, const unsigned short int port)
Connect socket.
~FawkesNetworkClient()
Destructor.
void unlock() const
Unlock list.
Message handler for FawkesNetworkClient.
static const short POLL_ERR
Error condition.
void wake(unsigned int component_id)
Wake a waiting thread.
unsigned int id() const
Get the client's ID.
void interrupt_connect()
Interrupt connect().
bool has_id() const
Check whether the client has an id.
const char * get_hostname() const
Get the client's hostname.
Wait until a given condition holds.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send and take ownership.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
void enqueue_and_wait(FawkesNetworkMessage *message, unsigned int timeout_sec=15)
Enqueue message to send and wait for answer.
Simple Fawkes network client.
void unref()
Decrement reference count and conditionally delete this instance.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
const char * get_ip() const
Get the client's ip.
void unlock() const
Unlock list.
Fawkes library namespace.
void unlock()
Unlock the mutex.
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
virtual void run()
Stub to see name in backtrace for easier debugging.
void wake_all()
Wake up all waiting threads.
virtual void run()
Code to execute in the thread.
void disconnect()
Disconnect socket.
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
static const short POLL_IN
Data can be read.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
void wait(unsigned int component_id, unsigned int timeout_sec=15)
Wait for messages for component ID.
Representation of a message that is sent over the network.
void connect()
Connect to remote.
The current system call has timed out before completion.
unsigned short int msgid() const
Get message type ID.
Fawkes network client send thread.
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
~FawkesNetworkClientRecvThread()
Destructor.
virtual void loop()
Code to execute in the thread.
TCP stream socket over IP.
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
void unlock()
Unlock the mutex.
HandlerAlreadyRegisteredException()
Costructor.
virtual void loop()
Code to execute in the thread.
void wakeup()
Wake up thread.
Base class for exceptions in Fawkes.
void lock() const
Lock list.
Client handler has already been registered.
void recv()
Receive and process messages.
virtual void once()
Execute an action exactly once.
bool connected() const
Check if connection is alive.
The current system call has been interrupted (for instance by a signal).
FawkesNetworkClient()
Constructor.
bool try_lock()
Tries to lock the mutex.
void wait()
Wait for the condition forever.
Thrown if the connection died during an operation.
void cancel()
Cancel a thread.
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
FawkesNetworkClientRecvThread(StreamSocket *s, FawkesNetworkClient *parent, Mutex *recv_mutex)
Constructor.
~FawkesNetworkClientSendThread()
Destructor.
void lock() const
Lock queue.
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
void deregister_handler(unsigned int component_id)
Deregister handler.
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
static const short POLL_HUP
Hang up.
void join()
Join the thread.
void lock()
Lock this mutex.
unsigned short int cid() const
Get component ID.
virtual void run()
Stub to see name in backtrace for easier debugging.
operate in wait-for-wakeup mode
Mutex mutual exclusion lock.
FawkesNetworkClientSendThread(StreamSocket *s, FawkesNetworkClient *parent)
Constructor.
Fawkes network client receive thread.
void force_send()
Force sending of messages.
void exit()
Exit the thread.
void start(bool wait=true)
Call this method to start the thread.
virtual void once()
Execute an action exactly once.