24 #include <netcomm/fawkes/server_thread.h> 25 #include <netcomm/fawkes/server_client_thread.h> 26 #include <netcomm/utils/acceptor_thread.h> 27 #include <netcomm/fawkes/message.h> 28 #include <netcomm/fawkes/handler.h> 29 #include <netcomm/fawkes/message_queue.h> 30 #include <netcomm/fawkes/message_content.h> 31 #include <core/threading/thread_collector.h> 32 #include <core/threading/mutex.h> 33 #include <core/threading/mutex_locker.h> 34 #include <core/exception.h> 60 const std::string &listen_ipv4,
const std::string &listen_ipv6,
61 unsigned int fawkes_port,
63 :
Thread(
"FawkesNetworkServerThread",
Thread::OPMODE_WAITFORWAKEUP)
65 this->thread_collector = thread_collector;
72 "FawkesNetworkAcceptorThread"));
76 "FawkesNetworkAcceptorThread"));
79 if ( thread_collector ) {
80 for (
size_t i = 0; i < acceptor_threads.size(); ++i) {
81 thread_collector->
add(acceptor_threads[i]);
84 for (
size_t i = 0; i < acceptor_threads.size(); ++i) {
85 acceptor_threads[i]->start();
94 for (cit = clients.begin(); cit != clients.end(); ++cit) {
95 if ( thread_collector ) {
96 thread_collector->
remove((*cit).second);
98 (*cit).second->cancel();
99 (*cit).second->join();
101 delete (*cit).second;
103 for (
size_t i = 0; i < acceptor_threads.size(); ++i) {
104 if ( thread_collector ) {
105 thread_collector->
remove(acceptor_threads[i]);
107 acceptor_threads[i]->cancel();
108 acceptor_threads[i]->join();
110 delete acceptor_threads[i];
112 acceptor_threads.clear();
114 delete inbound_messages;
129 if ( thread_collector ) {
130 thread_collector->
add(client);
134 unsigned int cid = next_client_id++;
135 clients[cid] = client;
139 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
140 (*hit).second->client_connected(cid);
155 if ( handlers.find(handler->
id()) != handlers.end()) {
156 throw Exception(
"Handler already registered");
158 handlers[handler->
id()] = handler;
169 if( handlers.find(handler->
id()) != handlers.end() ) {
170 handlers.erase(handler->
id());
184 std::list<unsigned int> dead_clients;
187 for (cit = clients.begin(); cit != clients.end(); ++cit) {
188 if ( ! cit->second->alive() ) {
189 dead_clients.push_back(cit->first);
194 std::list<unsigned int>::iterator dci;
195 for (dci = dead_clients.begin(); dci != dead_clients.end(); ++dci) {
196 const unsigned int clid = *dci;
200 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
201 (*hit).second->client_disconnected(clid);
207 if ( thread_collector ) {
208 thread_collector->
remove(clients[clid]);
210 clients[clid]->cancel();
211 clients[clid]->join();
214 delete clients[clid];
220 inbound_messages->
lock();
221 while ( ! inbound_messages->empty() ) {
225 if ( handlers.find(m->
cid()) != handlers.end()) {
226 handlers[m->
cid()]->handle_network_message(m);
230 inbound_messages->pop();
232 inbound_messages->
unlock();
241 for (cit = clients.begin(); cit != clients.end(); ++cit) {
242 (*cit).second->force_send();
258 for (cit = clients.begin(); cit != clients.end(); ++cit) {
259 if ( (*cit).second->alive() ) {
261 (*cit).second->enqueue(msg);
279 unsigned short int msg_id,
280 void *payload,
unsigned int payload_size)
283 payload, payload_size);
315 unsigned int clid = msg->
clid();
316 if ( clients.find(clid) != clients.end() ) {
317 if ( clients[clid]->alive() ) {
318 clients[clid]->enqueue(msg);
320 throw Exception(
"Client %u not alive", clid);
323 throw Exception(
"Client %u not found", clid);
339 unsigned short int component_id,
unsigned short int msg_id,
340 void *payload,
unsigned int payload_size)
343 payload, payload_size);
358 unsigned short int component_id,
unsigned short int msg_id,
377 unsigned short int component_id,
unsigned short int msg_id)
Fawkes Network Client Thread for server.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
void unref()
Decrement reference count and conditionally delete this instance.
virtual void broadcast(FawkesNetworkMessage *msg)
Broadcast a message.
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
void unlock() const
Unlock list.
unsigned short int cid() const
Get component ID.
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
Fawkes library namespace.
unsigned int clid() const
Get client ID.
Representation of a message that is sent over the network.
Thread class encapsulation of pthreads.
virtual void send(FawkesNetworkMessage *msg)
Send a message.
unsigned short int id() const
Get the component ID for this handler.
TCP stream socket over IP.
Fawkes network message content.
void add_connection(StreamSocket *s)
Add a new connection.
void unlock()
Unlock the mutex.
virtual ~FawkesNetworkServerThread()
Destructor.
void set_clid(unsigned int client_id)
Set client ID.
void wakeup()
Wake up thread.
Base class for exceptions in Fawkes.
virtual void remove_handler(FawkesNetworkHandler *handler)
Remove handler.
virtual void add_handler(FawkesNetworkHandler *handler)
Add a handler.
void ref()
Increment reference count.
Network handler abstract base class.
virtual void add(ThreadList &tl)=0
Add multiple threads.
FawkesNetworkServerThread(bool enable_ipv4, bool enable_ipv6, const std::string &listen_ipv4, const std::string &listen_ipv6, unsigned int fawkes_port, ThreadCollector *thread_collector=0)
Constructor.
virtual void loop()
Fawkes network thread loop.
void push_locked(const Type &x)
Push element to queue with lock protection.
void lock() const
Lock queue.
void force_send()
Force sending of all pending messages.
void start(bool wait=true)
Call this method to start the thread.