Fawkes API  Fawkes Development Version
server_thread.cpp
1 
2 /***************************************************************************
3  * server_thread.cpp - Fawkes Network Protocol (server part)
4  *
5  * Created: Sun Nov 19 15:08:30 2006
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <netcomm/fawkes/server_thread.h>
25 #include <netcomm/fawkes/server_client_thread.h>
26 #include <netcomm/utils/acceptor_thread.h>
27 #include <netcomm/fawkes/message.h>
28 #include <netcomm/fawkes/handler.h>
29 #include <netcomm/fawkes/message_queue.h>
30 #include <netcomm/fawkes/message_content.h>
31 #include <core/threading/thread_collector.h>
32 #include <core/threading/mutex.h>
33 #include <core/threading/mutex_locker.h>
34 #include <core/exception.h>
35 
36 #include <unistd.h>
37 
38 namespace fawkes {
39 
40 /** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h>
41  * Fawkes Network Thread.
42  * Maintains a list of clients and reacts on events triggered by the clients.
43  * Also runs the acceptor thread.
44  *
45  * @ingroup NetComm
46  * @author Tim Niemueller
47  */
48 
49 /** Constructor.
50  * @param enable_ipv4 true to listen on the IPv4 TCP port
51  * @param enable_ipv6 true to listen on the IPv6 TCP port
52  * @param listen_ipv4 IPv4 address to listen on for incoming connections,
53  * 0.0.0.0 to listen on any local address
54  * @param listen_ipv6 IPv6 address to listen on for incoming connections,
55  * :: to listen on any local address
56  * @param fawkes_port port for Fawkes network protocol
57  * @param thread_collector thread collector to register new threads with
58  */
59 FawkesNetworkServerThread::FawkesNetworkServerThread(bool enable_ipv4, bool enable_ipv6,
60  const std::string &listen_ipv4, const std::string &listen_ipv6,
61  unsigned int fawkes_port,
62  ThreadCollector *thread_collector)
63  : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
64 {
65  this->thread_collector = thread_collector;
66  clients.clear();
67  next_client_id = 1;
68  inbound_messages = new FawkesNetworkMessageQueue();
69 
70  if (enable_ipv4) {
71  acceptor_threads.push_back(new NetworkAcceptorThread(this, Socket::IPv4, listen_ipv4, fawkes_port,
72  "FawkesNetworkAcceptorThread"));
73  }
74  if (enable_ipv6) {
75  acceptor_threads.push_back(new NetworkAcceptorThread(this, Socket::IPv6, listen_ipv6, fawkes_port,
76  "FawkesNetworkAcceptorThread"));
77  }
78 
79  if ( thread_collector ) {
80  for (size_t i = 0; i < acceptor_threads.size(); ++i) {
81  thread_collector->add(acceptor_threads[i]);
82  }
83  } else {
84  for (size_t i = 0; i < acceptor_threads.size(); ++i) {
85  acceptor_threads[i]->start();
86  }
87  }
88 }
89 
90 
91 /** Destructor. */
93 {
94  for (cit = clients.begin(); cit != clients.end(); ++cit) {
95  if ( thread_collector ) {
96  thread_collector->remove((*cit).second);
97  } else {
98  (*cit).second->cancel();
99  (*cit).second->join();
100  }
101  delete (*cit).second;
102  }
103  for (size_t i = 0; i < acceptor_threads.size(); ++i) {
104  if ( thread_collector ) {
105  thread_collector->remove(acceptor_threads[i]);
106  } else {
107  acceptor_threads[i]->cancel();
108  acceptor_threads[i]->join();
109  }
110  delete acceptor_threads[i];
111  }
112  acceptor_threads.clear();
113 
114  delete inbound_messages;
115 }
116 
117 
118 /** Add a new connection.
119  * Called by the NetworkAcceptorThread if a new client connected.
120  * @param s socket for new client
121  */
122 void
124 {
126 
127  clients.lock();
128  client->set_clid(next_client_id);
129  if ( thread_collector ) {
130  thread_collector->add(client);
131  } else {
132  client->start();
133  }
134  unsigned int cid = next_client_id++;
135  clients[cid] = client;
136  clients.unlock();
137 
138  MutexLocker handlers_lock(handlers.mutex());
139  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
140  (*hit).second->client_connected(cid);
141  }
142  handlers_lock.unlock();
143 
144  wakeup();
145 }
146 
147 
148 /** Add a handler.
149  * @param handler to add.
150  */
151 void
153 {
154  MutexLocker handlers_lock(handlers.mutex());
155  if ( handlers.find(handler->id()) != handlers.end()) {
156  throw Exception("Handler already registered");
157  }
158  handlers[handler->id()] = handler;
159 }
160 
161 
162 /** Remove handler.
163  * @param handler handler to remove
164  */
165 void
167 {
168  MutexLocker handlers_lock(handlers.mutex());
169  if( handlers.find(handler->id()) != handlers.end() ) {
170  handlers.erase(handler->id());
171  }
172 }
173 
174 
175 /** Fawkes network thread loop.
176  * The thread loop will check all clients for their alivness and dead
177  * clients are removed. Then inbound messages are processed and dispatched
178  * properly to registered handlers. Then the thread waits for a new event
179  * to happen (event emitting threads need to wakeup this thread!).
180  */
181 void
183 {
184  std::list<unsigned int> dead_clients;
185  clients.lock();
186  // check for dead clients
187  for (cit = clients.begin(); cit != clients.end(); ++cit) {
188  if ( ! cit->second->alive() ) {
189  dead_clients.push_back(cit->first);
190  }
191  }
192  clients.unlock();
193 
194  std::list<unsigned int>::iterator dci;
195  for (dci = dead_clients.begin(); dci != dead_clients.end(); ++dci) {
196  const unsigned int clid = *dci;
197 
198  {
199  MutexLocker handlers_lock(handlers.mutex());
200  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
201  (*hit).second->client_disconnected(clid);
202  }
203  }
204 
205  {
206  MutexLocker clients_lock(clients.mutex());
207  if ( thread_collector ) {
208  thread_collector->remove(clients[clid]);
209  } else {
210  clients[clid]->cancel();
211  clients[clid]->join();
212  }
213  usleep(5000);
214  delete clients[clid];
215  clients.erase(clid);
216  }
217  }
218 
219  // dispatch messages
220  inbound_messages->lock();
221  while ( ! inbound_messages->empty() ) {
222  FawkesNetworkMessage *m = inbound_messages->front();
223  {
224  MutexLocker handlers_lock(handlers.mutex());
225  if ( handlers.find(m->cid()) != handlers.end()) {
226  handlers[m->cid()]->handle_network_message(m);
227  }
228  }
229  m->unref();
230  inbound_messages->pop();
231  }
232  inbound_messages->unlock();
233 }
234 
235 
236 /** Force sending of all pending messages. */
237 void
239 {
240  clients.lock();
241  for (cit = clients.begin(); cit != clients.end(); ++cit) {
242  (*cit).second->force_send();
243  }
244  clients.unlock();
245 }
246 
247 
248 /** Broadcast a message.
249  * Method to broadcast a message to all connected clients. This method will take
250  * ownership of the passed message. If you want to use if after enqueing it you
251  * must reference it explicitly before calling this method.
252  * @param msg Message to broadcast
253  */
254 void
256 {
257  clients.lock();
258  for (cit = clients.begin(); cit != clients.end(); ++cit) {
259  if ( (*cit).second->alive() ) {
260  msg->ref();
261  (*cit).second->enqueue(msg);
262  }
263  }
264  clients.unlock();
265  msg->unref();
266 }
267 
268 
269 /** Broadcast a message.
270  * A FawkesNetworkMessage is created and broacasted via the emitter.
271  * @param component_id component ID
272  * @param msg_id message type id
273  * @param payload payload buffer
274  * @param payload_size size of payload buffer
275  * @see FawkesNetworkEmitter::broadcast()
276  */
277 void
278 FawkesNetworkServerThread::broadcast(unsigned short int component_id,
279  unsigned short int msg_id,
280  void *payload, unsigned int payload_size)
281 {
282  FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id,
283  payload, payload_size);
284  broadcast(m);
285 }
286 
287 
288 /** Broadcast message without payload.
289  * @param component_id component ID
290  * @param msg_id message type ID
291  */
292 void
293 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
294 {
295  FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
296  broadcast(m);
297 }
298 
299 
300 /** Send a message.
301  * Method to send a message to a specific client.
302  * The client ID provided in the message is used to determine the correct
303  * recipient. If no client is connected for the given client ID the message
304  * shall be silently ignored.
305  * This method will take ownership of the passed message. If you want to use
306  * if after enqueing it you must reference it explicitly before calling this
307  * method.
308  * Implemented Emitter interface message.
309  * @param msg Message to send
310  */
311 void
313 {
314  MutexLocker lock(clients.mutex());
315  unsigned int clid = msg->clid();
316  if ( clients.find(clid) != clients.end() ) {
317  if ( clients[clid]->alive() ) {
318  clients[clid]->enqueue(msg);
319  } else {
320  throw Exception("Client %u not alive", clid);
321  }
322  } else {
323  throw Exception("Client %u not found", clid);
324  }
325 }
326 
327 
328 /** Send a message.
329  * A FawkesNetworkMessage is created and sent via the emitter.
330  * @param to_clid client ID of recipient
331  * @param component_id component ID
332  * @param msg_id message type id
333  * @param payload payload buffer
334  * @param payload_size size of payload buffer
335  * @see FawkesNetworkEmitter::broadcast()
336  */
337 void
338 FawkesNetworkServerThread::send(unsigned int to_clid,
339  unsigned short int component_id, unsigned short int msg_id,
340  void *payload, unsigned int payload_size)
341 {
342  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
343  payload, payload_size);
344  send(m);
345 }
346 
347 
348 /** Send a message.
349  * A FawkesNetworkMessage is created and sent via the emitter.
350  * @param to_clid client ID of recipient
351  * @param component_id component ID
352  * @param msg_id message type id
353  * @param content Fawkes complex network message content
354  * @see FawkesNetworkEmitter::broadcast()
355  */
356 void
357 FawkesNetworkServerThread::send(unsigned int to_clid,
358  unsigned short int component_id, unsigned short int msg_id,
360 {
361  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
362  content);
363  send(m);
364 }
365 
366 
367 /** Send a message without payload.
368  * A FawkesNetworkMessage with empty payload is created and sent via the emitter.
369  * This is particularly useful for simple status messages that you want to send.
370  * @param to_clid client ID of recipient
371  * @param component_id component ID
372  * @param msg_id message type id
373  * @see FawkesNetworkEmitter::broadcast()
374  */
375 void
376 FawkesNetworkServerThread::send(unsigned int to_clid,
377  unsigned short int component_id, unsigned short int msg_id)
378 {
379  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
380  send(m);
381 }
382 
383 
384 /** Dispatch messages.
385  * Actually messages are just put into the inbound message queue and dispatched
386  * during the next loop iteration. So after adding all the messages you have
387  * to wakeup the thread to get them actually dispatched.
388  * @param msg message to dispatch
389  */
390 void
392 {
393  msg->ref();
394  inbound_messages->push_locked(msg);
395 }
396 
397 } // end namespace fawkes
Fawkes Network Client Thread for server.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:33
Network Acceptor Thread.
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
virtual void broadcast(FawkesNetworkMessage *msg)
Broadcast a message.
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
void unlock() const
Unlock list.
Definition: lock_queue.h:131
unsigned short int cid() const
Get component ID.
Definition: message.cpp:291
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
Fawkes library namespace.
unsigned int clid() const
Get client ID.
Definition: message.cpp:281
Mutex locking helper.
Definition: mutex_locker.h:33
Representation of a message that is sent over the network.
Definition: message.h:75
Thread collector.
Thread class encapsulation of pthreads.
Definition: thread.h:42
virtual void send(FawkesNetworkMessage *msg)
Send a message.
unsigned short int id() const
Get the component ID for this handler.
Definition: handler.cpp:78
TCP stream socket over IP.
Definition: stream.h:31
Fawkes network message content.
void add_connection(StreamSocket *s)
Add a new connection.
void unlock()
Unlock the mutex.
virtual ~FawkesNetworkServerThread()
Destructor.
void set_clid(unsigned int client_id)
Set client ID.
void wakeup()
Wake up thread.
Definition: thread.cpp:1000
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void remove_handler(FawkesNetworkHandler *handler)
Remove handler.
virtual void add_handler(FawkesNetworkHandler *handler)
Add a handler.
void ref()
Increment reference count.
Definition: refcount.cpp:70
Network handler abstract base class.
Definition: handler.h:31
virtual void add(ThreadList &tl)=0
Add multiple threads.
FawkesNetworkServerThread(bool enable_ipv4, bool enable_ipv6, const std::string &listen_ipv4, const std::string &listen_ipv6, unsigned int fawkes_port, ThreadCollector *thread_collector=0)
Constructor.
virtual void loop()
Fawkes network thread loop.
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:139
void lock() const
Lock queue.
Definition: lock_queue.h:115
void force_send()
Force sending of all pending messages.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:511