24 #include <netcomm/fawkes/client.h> 25 #include <netcomm/fawkes/client_handler.h> 26 #include <netcomm/fawkes/message_queue.h> 27 #include <netcomm/fawkes/transceiver.h> 28 #include <netcomm/socket/stream.h> 29 #include <netcomm/utils/exceptions.h> 31 #include <core/threading/thread.h> 32 #include <core/threading/mutex.h> 33 #include <core/threading/mutex_locker.h> 34 #include <core/threading/wait_condition.h> 35 #include <core/exceptions/system.h> 54 :
Exception(
"A handler for this component has already been registered")
74 :
Thread(
"FawkesNetworkClientSendThread",
Thread::OPMODE_WAITFORWAKEUP)
78 __outbound_mutex =
new Mutex();
81 __outbound_active = 0;
82 __outbound_msgq = __outbound_msgqs[0];
83 __outbound_havemore =
false;
89 for (
unsigned int i = 0; i < 2; ++i) {
90 while ( ! __outbound_msgqs[i]->empty() ) {
93 __outbound_msgqs[i]->pop();
96 delete __outbound_msgqs[0];
97 delete __outbound_msgqs[1];
98 delete __outbound_mutex;
103 __parent->set_send_slave_alive();
108 if ( ! __parent->connected() )
return;
110 while ( __outbound_havemore ) {
111 __outbound_mutex->lock();
112 __outbound_havemore =
false;
114 __outbound_active = 1 - __outbound_active;
115 __outbound_msgq = __outbound_msgqs[__outbound_active];
116 __outbound_mutex->
unlock();
118 if ( ! q->empty() ) {
122 __parent->connection_died();
134 if ( loop_mutex->try_lock() ) {
136 loop_mutex->unlock();
153 __outbound_mutex->lock();
154 __outbound_msgq->push(message);
155 __outbound_havemore =
true;
156 __outbound_mutex->unlock();
166 Mutex *__outbound_mutex;
167 unsigned int __outbound_active;
168 bool __outbound_havemore;
191 :
Thread(
"FawkesNetworkClientRecvThread")
196 __recv_mutex = recv_mutex;
202 while ( ! __inbound_msgq->empty() ) {
205 __inbound_msgq->pop();
207 delete __inbound_msgq;
213 std::list<unsigned int> wakeup_list;
220 __inbound_msgq->lock();
221 while ( ! __inbound_msgq->empty() ) {
223 wakeup_list.push_back(m->
cid());
224 __parent->dispatch_message(m);
226 __inbound_msgq->pop();
228 __inbound_msgq->unlock();
233 wakeup_list.unique();
234 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
235 __parent->wake_handlers(*i);
244 __parent->set_recv_slave_alive();
262 __parent->connection_died();
269 __parent->connection_died();
300 __host = strdup(host);
309 connection_died_recently =
false;
310 __send_slave_alive =
false;
311 __recv_slave_alive =
false;
313 slave_status_mutex =
new Mutex();
318 __recv_mutex =
new Mutex();
320 __connest_mutex =
new Mutex();
323 __connest_interrupted =
false;
342 connection_died_recently =
false;
343 __send_slave_alive =
false;
344 __recv_slave_alive =
false;
346 slave_status_mutex =
new Mutex();
351 __recv_mutex =
new Mutex();
353 __connest_mutex =
new Mutex();
356 __connest_interrupted =
false;
366 unsigned short int port)
368 __host = strdup(host);
377 connection_died_recently =
false;
378 __send_slave_alive =
false;
379 __recv_slave_alive =
false;
381 slave_status_mutex =
new Mutex();
386 __recv_mutex =
new Mutex();
388 __connest_mutex =
new Mutex();
391 __connest_interrupted =
false;
401 if (__host) free(__host);
402 if (addr_) free(addr_);
403 delete slave_status_mutex;
405 delete __connest_waitcond;
406 delete __connest_mutex;
407 delete __recv_waitcond;
419 if ( __host == NULL && addr_ == NULL) {
428 connection_died_recently =
false;
433 s->connect(addr_, addr_len_);
435 s->connect(__host, __port);
440 __send_slave->start();
442 __recv_slave->start();
444 connection_died_recently =
true;
445 if ( __send_slave ) {
446 __send_slave->cancel();
447 __send_slave->join();
451 if ( __recv_slave ) {
452 __recv_slave->cancel();
453 __recv_slave->join();
457 __send_slave_alive =
false;
458 __recv_slave_alive =
false;
464 __connest_mutex->lock();
465 while ( ! __connest && ! __connest_interrupted ) {
466 __connest_waitcond->wait();
468 bool interrupted = __connest_interrupted;
469 __connest_interrupted =
false;
470 __connest_mutex->unlock();
475 notify_of_connection_established();
488 if (__host) free(__host);
489 __host = strdup(host);
502 if (__host) free(__host);
503 if (addr_) free(addr_);
504 addr_ = (
struct sockaddr *)malloc(addr_len);
505 addr_len_ = addr_len;
506 memcpy(addr_, addr, addr_len);
507 __host = strdup(hostname);
518 if (__host) free(__host);
519 if (addr_) free(addr_);
520 addr_ = (
struct sockaddr *)malloc(
sizeof(sockaddr_storage));
521 addr_len_ =
sizeof(sockaddr_storage);
522 memcpy(addr_, &addr, addr_len_);
523 __host = strdup(hostname);
531 if ( s == NULL )
return;
533 if ( __send_slave_alive ) {
534 if ( ! connection_died_recently ) {
535 __send_slave->force_send();
539 __send_slave->cancel();
540 __send_slave->join();
544 if ( __recv_slave_alive ) {
545 __recv_slave->cancel();
546 __recv_slave->join();
550 __send_slave_alive =
false;
551 __recv_slave_alive =
false;
555 if (! connection_died_recently) {
568 __connest_mutex->lock();
569 __connest_interrupted =
true;
570 __connest_waitcond->wake_all();
571 __connest_mutex->unlock();
589 if (__send_slave) __send_slave->enqueue(message);
606 unsigned int timeout_sec)
608 if (__send_slave && __recv_slave) {
609 __recv_mutex->lock();
610 if ( __recv_received.find(message->
cid()) != __recv_received.end()) {
611 __recv_mutex->unlock();
612 unsigned int cid = message->
cid();
613 throw Exception(
"There is already a thread waiting for messages of " 614 "component id %u", cid);
616 __send_slave->enqueue(message);
617 unsigned int cid = message->
cid();
618 __recv_received[cid] =
false;
619 while (!__recv_received[cid] && ! connection_died_recently) {
620 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
621 __recv_received.erase(cid);
622 __recv_mutex->unlock();
623 throw TimeoutException(
"Timeout reached while waiting for incoming message " 624 "(outgoing was %u:%u)", message->
cid(), message->
msgid());
627 __recv_received.erase(cid);
628 __recv_mutex->unlock();
630 unsigned int cid = message->
cid();
631 unsigned int msgid = message->
msgid();
632 throw Exception(
"Cannot enqueue given message %u:%u, sender or " 633 "receiver missing", cid, msgid);
647 unsigned int component_id)
650 if ( handlers.find(component_id) != handlers.end() ) {
654 handlers[component_id] = handler;
668 if ( handlers.find(component_id) != handlers.end() ) {
669 handlers[component_id]->deregistered(_id);
670 handlers.erase(component_id);
673 __recv_mutex->lock();
674 if (__recv_received.find(component_id) != __recv_received.end()) {
675 __recv_received[component_id] =
true;
676 __recv_waitcond->wake_all();
678 __recv_mutex->unlock();
685 unsigned int cid = m->
cid();
687 if (handlers.find(cid) != handlers.end()) {
688 handlers[cid]->inbound_received(m, _id);
695 FawkesNetworkClient::wake_handlers(
unsigned int cid)
697 __recv_mutex->lock();
698 if (__recv_received.find(cid) != __recv_received.end()) {
699 __recv_received[cid] =
true;
701 __recv_waitcond->wake_all();
702 __recv_mutex->unlock();
706 FawkesNetworkClient::notify_of_connection_dead()
708 __connest_mutex->lock();
710 __connest_mutex->unlock();
713 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
714 i->second->connection_died(_id);
718 __recv_mutex->lock();
719 __recv_waitcond->wake_all();
720 __recv_mutex->unlock();
724 FawkesNetworkClient::notify_of_connection_established()
727 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
728 i->second->connection_established(_id);
735 FawkesNetworkClient::connection_died()
737 connection_died_recently =
true;
738 notify_of_connection_dead();
743 FawkesNetworkClient::set_send_slave_alive()
745 slave_status_mutex->lock();
746 __send_slave_alive =
true;
747 if ( __send_slave_alive && __recv_slave_alive ) {
748 __connest_mutex->lock();
750 __connest_waitcond->wake_all();
751 __connest_mutex->unlock();
753 slave_status_mutex->unlock();
758 FawkesNetworkClient::set_recv_slave_alive()
760 slave_status_mutex->lock();
761 __recv_slave_alive =
true;
762 if ( __send_slave_alive && __recv_slave_alive ) {
763 __connest_mutex->lock();
765 __connest_waitcond->wake_all();
766 __connest_mutex->unlock();
768 slave_status_mutex->unlock();
782 __recv_mutex->lock();
783 if ( __recv_received.find(component_id) != __recv_received.end()) {
784 __recv_mutex->unlock();
785 throw Exception(
"There is already a thread waiting for messages of " 786 "component id %u", component_id);
788 __recv_received[component_id] =
false;
789 while (! __recv_received[component_id] && ! connection_died_recently) {
790 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
791 __recv_received.erase(component_id);
792 __recv_mutex->unlock();
793 throw TimeoutException(
"Timeout reached while waiting for incoming message " 794 "(component %u)", component_id);
797 __recv_received.erase(component_id);
798 __recv_mutex->unlock();
810 __recv_mutex->lock();
811 if ( __recv_received.find(component_id) != __recv_received.end()) {
812 __recv_received[component_id] =
true;
814 __recv_waitcond->wake_all();
815 __recv_mutex->unlock();
825 return (! connection_died_recently && (s != NULL));
846 throw Exception(
"Trying to get the ID of a client that has no ID");
~FawkesNetworkClient()
Destructor.
Message handler for FawkesNetworkClient.
static const short POLL_ERR
Error condition.
void wake(unsigned int component_id)
Wake a waiting thread.
void interrupt_connect()
Interrupt connect().
Wait until a given condition holds.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send and take ownership.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
void enqueue_and_wait(FawkesNetworkMessage *message, unsigned int timeout_sec=15)
Enqueue message to send and wait for answer.
Simple Fawkes network client.
void unref()
Decrement reference count and conditionally delete this instance.
void unlock() const
Unlock list.
unsigned short int cid() const
Get component ID.
Fawkes library namespace.
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
virtual void run()
Stub to see name in backtrace for easier debugging.
Exception()
Constructor for subclasses.
virtual void run()
Code to execute in the thread.
void disconnect()
Disconnect socket.
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
static const short POLL_IN
Data can be read.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
void wait(unsigned int component_id, unsigned int timeout_sec=15)
Wait for messages for component ID.
Representation of a message that is sent over the network.
void connect()
Connect to remote.
The current system call has timed out before completion.
bool has_id() const
Check whether the client has an id.
Fawkes network client send thread.
unsigned int id() const
Get the client's ID.
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
~FawkesNetworkClientRecvThread()
Destructor.
virtual void loop()
Code to execute in the thread.
TCP stream socket over IP.
void unlock()
Unlock the mutex.
HandlerAlreadyRegisteredException()
Costructor.
virtual void loop()
Code to execute in the thread.
Base class for exceptions in Fawkes.
void recv()
Receive and process messages.
virtual void once()
Execute an action exactly once.
The current system call has been interrupted (for instance by a signal).
FawkesNetworkClient()
Constructor.
Thrown if the connection died during an operation.
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
FawkesNetworkClientRecvThread(StreamSocket *s, FawkesNetworkClient *parent, Mutex *recv_mutex)
Constructor.
~FawkesNetworkClientSendThread()
Destructor.
unsigned short int msgid() const
Get message type ID.
void deregister_handler(unsigned int component_id)
Deregister handler.
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
static const short POLL_HUP
Hang up.
const char * get_hostname() const
Get the client's hostname.
virtual void run()
Stub to see name in backtrace for easier debugging.
bool connected() const
Check if connection is alive.
Mutex mutual exclusion lock.
FawkesNetworkClientSendThread(StreamSocket *s, FawkesNetworkClient *parent)
Constructor.
Fawkes network client receive thread.
void force_send()
Force sending of messages.
virtual void once()
Execute an action exactly once.