Fawkes API  Fawkes Development Version
server_client_thread.cpp
1 
2 /***************************************************************************
3  * server_client_thread.cpp - Thread handling Fawkes network client
4  *
5  * Created: Fri Nov 17 17:23:24 2006
6  * Copyright 2006-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/system.h>
25 
26 #include <netcomm/fawkes/server_client_thread.h>
27 #include <netcomm/fawkes/server_thread.h>
28 #include <netcomm/fawkes/message_queue.h>
29 #include <netcomm/fawkes/transceiver.h>
30 #include <netcomm/socket/stream.h>
31 #include <netcomm/utils/exceptions.h>
32 #include <core/threading/mutex.h>
33 #include <core/threading/wait_condition.h>
34 
35 #include <unistd.h>
36 
37 namespace fawkes {
38 
39 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h>
40  * Sending thread for a Fawkes client connected to the server.
41  * This thread is spawned for each client connected to the server to handle the
42  * server-side sending
43  * @ingroup NetComm
44  * @author Tim Niemueller
45  */
46 
48  : public Thread
49 {
50  public:
51  /** Constructor.
52  * @param s client stream socket
53  * @param parent parent FawkesNetworkServerClientThread instance
54  */
57  : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
58  {
59  __s = s;
60  __parent = parent;
61  __outbound_mutex = new Mutex();
62  __outbound_msgqs[0] = new FawkesNetworkMessageQueue();
63  __outbound_msgqs[1] = new FawkesNetworkMessageQueue();
64  __outbound_active = 0;
65  __outbound_msgq = __outbound_msgqs[0];
66  }
67 
68  /** Destructor. */
70  {
71  for (unsigned int i = 0; i < 2; ++i) {
72  while ( ! __outbound_msgqs[i]->empty() ) {
73  FawkesNetworkMessage *m = __outbound_msgqs[i]->front();
74  m->unref();
75  __outbound_msgqs[i]->pop();
76  }
77  }
78  delete __outbound_msgqs[0];
79  delete __outbound_msgqs[1];
80  delete __outbound_mutex;
81  }
82 
83  virtual void loop()
84  {
85  if ( ! __parent->alive() ) return;
86 
87  while ( __outbound_havemore ) {
88  __outbound_mutex->lock();
89  __outbound_havemore = false;
90  FawkesNetworkMessageQueue *q = __outbound_msgq;
91  __outbound_active = 1 - __outbound_active;
92  __outbound_msgq = __outbound_msgqs[__outbound_active];
93  __outbound_mutex->unlock();
94 
95  if ( ! q->empty() ) {
96  try {
98  } catch (ConnectionDiedException &e) {
99  __parent->connection_died();
100  exit();
101  }
102  }
103  }
104  }
105 
106 
107  /** Enqueue message to outbound queue.
108  * This enqueues the given message to the outbound queue. The message will
109  * be sent in the next loop iteration. This method takes ownership of the
110  * transmitted message. If you want to use the message after enqueuing you
111  * must reference it explicitly.
112  * @param msg message to enqueue
113  */
115  {
116  __outbound_mutex->lock();
117  __outbound_msgq->push(msg);
118  __outbound_havemore = true;
119  __outbound_mutex->unlock();
120  wakeup();
121  }
122 
123 
124  /** Wait until all data has been sent. */
126  {
127  loop_mutex->lock();
128  loop_mutex->unlock();
129  }
130 
131  /** Stub to see name in backtrace for easier debugging. @see Thread::run() */
132  protected: virtual void run() { Thread::run(); }
133 
134  private:
135  StreamSocket *__s;
137 
138  Mutex *__outbound_mutex;
139  unsigned int __outbound_active;
140  bool __outbound_havemore;
141  FawkesNetworkMessageQueue *__outbound_msgq;
142  FawkesNetworkMessageQueue *__outbound_msgqs[2];
143 
144 };
145 
146 
147 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h
148  * Fawkes Network Client Thread for server.
149  * The FawkesNetworkServerThread spawns an instance of this class for every incoming
150  * connection. It is then used to handle the client.
151  * The thread will start another thread, an instance of
152  * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing
153  * traffic.
154  *
155  * @ingroup NetComm
156  * @author Tim Niemueller
157  */
158 
159 /** Constructor.
160  * @param s socket to client
161  * @param parent parent network thread
162  */
165  : Thread("FawkesNetworkServerClientThread")
166 {
167  _s = s;
168  _parent = parent;
169  _alive = true;
170  _clid = 0;
171  _inbound_queue = new FawkesNetworkMessageQueue();
172 
173  _send_slave = new FawkesNetworkServerClientSendThread(_s, this);
174 
175  set_prepfin_conc_loop(true);
176 }
177 
178 
179 /** Destructor. */
181 {
182  _send_slave->cancel();
183  _send_slave->join();
184  delete _send_slave;
185  delete _s;
186  delete _inbound_queue;
187 }
188 
189 
190 /** Get client ID.
191  * The client ID can be used to send replies.
192  * @return client ID
193  */
194 unsigned int
196 {
197  return _clid;
198 }
199 
200 
201 /** Set client ID.
202  * @param client_id new client ID
203  */
204 void
206 {
207  _clid = client_id;
208 }
209 
210 
211 /** Receive data.
212  * Receives data from the network if there is any and then dispatches all
213  * inbound messages via the parent FawkesNetworkThread::dispatch()
214  */
215 void
216 FawkesNetworkServerClientThread::recv()
217 {
218  try {
219  FawkesNetworkTransceiver::recv(_s, _inbound_queue);
220 
221  _inbound_queue->lock();
222  while ( ! _inbound_queue->empty() ) {
223  FawkesNetworkMessage *m = _inbound_queue->front();
224  m->set_client_id(_clid);
225  _parent->dispatch(m);
226  m->unref();
227  _inbound_queue->pop();
228  }
229  _parent->wakeup();
230  _inbound_queue->unlock();
231 
232  } catch (ConnectionDiedException &e) {
233  _alive = false;
234  _s->close();
235  _parent->wakeup();
236  }
237 }
238 
239 
240 void
242 {
243  _send_slave->start();
244 }
245 
246 
247 /** Thread loop.
248  * The client thread loop polls on the socket for 10 ms (wait for events
249  * on the socket like closed connection or data that can be read). If any
250  * event occurs it is processed. If the connection died or any other
251  * error occured the thread is cancelled and the parent FawkesNetworkThread
252  * is woken up to carry out any action that is needed when a client dies.
253  * If data is available for reading thedata is received and dispatched
254  * via recv().
255  * Afterwards the outbound message queue is processed and alle messages are
256  * sent. This is also done if the operation could block (POLL_OUT is not
257  * honored).
258  */
259 void
261 {
262  if ( ! _alive) {
263  usleep(1000000);
264  return;
265  }
266 
267  short p = 0;
268  try {
269  p = _s->poll(); // block until we got a message
270  } catch (InterruptedException &e) {
271  // we just ignore this and try it again
272  return;
273  }
274 
275  if ( (p & Socket::POLL_ERR) ||
276  (p & Socket::POLL_HUP) ||
277  (p & Socket::POLL_RDHUP)) {
278  _alive = false;
279  _parent->wakeup();
280  } else if ( p & Socket::POLL_IN ) {
281  // Data can be read
282  recv();
283  }
284 }
285 
286 /** Enqueue message to outbound queue.
287  * This enqueues the given message to the outbound queue. The message will be send
288  * in the next loop iteration.
289  * @param msg message to enqueue
290  */
291 void
293 {
294  _send_slave->enqueue(msg);
295 }
296 
297 
298 /** Check aliveness of connection.
299  * @return true if connection is still alive, false otherwise.
300  */
301 bool
303 {
304  return _alive;
305 }
306 
307 
308 /** Force sending of all pending outbound messages.
309  * This is a blocking operation. The current poll will be interrupted by sending
310  * a signal to this thread (and ignoring it) and then wait for the sending to
311  * finish.
312  */
313 void
315 {
316  _send_slave->wait_for_all_sent();
317 }
318 
319 
320 /** Connection died notification.
321  * To be called only be the send slave thread.
322  */
323 void
325 {
326  _alive = false;
327  _parent->wakeup();
328 }
329 
330 } // end namespace fawkes
static const short POLL_ERR
Error condition.
Definition: socket.h:73
virtual void close()
Close socket.
Definition: socket.cpp:274
Fawkes Network Client Thread for server.
void set_client_id(unsigned int clid)
Set client ID.
Definition: message.cpp:341
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:33
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
void wait_for_all_sent()
Wait until all data has been sent.
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
void unlock() const
Unlock list.
Definition: lock_queue.h:131
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
void force_send()
Force sending of all pending outbound messages.
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Definition: transceiver.cpp:86
virtual void run()
Code to execute in the thread.
Definition: thread.cpp:939
static const short POLL_IN
Data can be read.
Definition: socket.h:69
Representation of a message that is sent over the network.
Definition: message.h:75
virtual void run()
Stub to see name in backtrace for easier debugging.
Thread class encapsulation of pthreads.
Definition: thread.h:42
Sending thread for a Fawkes client connected to the server.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:727
TCP stream socket over IP.
Definition: stream.h:31
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
Definition: thread.h:139
FawkesNetworkServerClientThread(StreamSocket *s, FawkesNetworkServerThread *parent)
Constructor.
virtual void once()
Execute an action exactly once.
void set_clid(unsigned int client_id)
Set client ID.
void wakeup()
Wake up thread.
Definition: thread.cpp:1000
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
bool alive() const
Check aliveness of connection.
FawkesNetworkServerClientSendThread(StreamSocket *s, FawkesNetworkServerClientThread *parent)
Constructor.
Thrown if the connection died during an operation.
Definition: exceptions.h:31
void cancel()
Cancel a thread.
Definition: thread.cpp:651
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
Definition: socket.h:72
virtual void loop()
Code to execute in the thread.
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:652
void lock() const
Lock queue.
Definition: lock_queue.h:115
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
Definition: transceiver.cpp:51
static const short POLL_HUP
Hang up.
Definition: socket.h:74
void join()
Join the thread.
Definition: thread.cpp:610
void lock()
Lock this mutex.
Definition: mutex.cpp:89
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
operate in wait-for-wakeup mode
Definition: thread.h:54
Mutex mutual exclusion lock.
Definition: mutex.h:32
void connection_died()
Connection died notification.
Fawkes Network Thread.
Definition: server_thread.h:46
void exit()
Exit the thread.
Definition: thread.cpp:594
unsigned int clid() const
Get client ID.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:511