Fawkes API  Fawkes Development Version
server_thread.cpp
00001 
00002 /***************************************************************************
00003  *  server_thread.cpp - Fawkes Network Protocol (server part)
00004  *
00005  *  Created: Sun Nov 19 15:08:30 2006
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <netcomm/fawkes/server_thread.h>
00025 #include <netcomm/fawkes/server_client_thread.h>
00026 #include <netcomm/utils/acceptor_thread.h>
00027 #include <netcomm/fawkes/message.h>
00028 #include <netcomm/fawkes/handler.h>
00029 #include <netcomm/fawkes/message_queue.h>
00030 #include <netcomm/fawkes/message_content.h>
00031 #include <core/threading/thread_collector.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/exception.h>
00034 
00035 #include <unistd.h>
00036 
00037 namespace fawkes {
00038 
00039 /** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h>
00040  * Fawkes Network Thread.
00041  * Maintains a list of clients and reacts on events triggered by the clients.
00042  * Also runs the acceptor thread.
00043  *
00044  * @ingroup NetComm
00045  * @author Tim Niemueller
00046  */
00047 
00048 /** Constructor.
00049  * @param thread_collector thread collector to register new threads with
00050  * @param fawkes_port port for Fawkes network protocol
00051  */
00052 FawkesNetworkServerThread::FawkesNetworkServerThread(unsigned int fawkes_port,
00053                                                      ThreadCollector *thread_collector)
00054   : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
00055 {
00056   this->thread_collector = thread_collector;
00057   clients.clear();
00058   next_client_id = 1;
00059   inbound_messages = new FawkesNetworkMessageQueue();
00060 
00061   acceptor_thread = new NetworkAcceptorThread(this, fawkes_port,
00062                                               "FawkesNetworkAcceptorThread");
00063   if ( thread_collector ) {
00064     thread_collector->add(acceptor_thread);
00065   } else {
00066     acceptor_thread->start();
00067   }
00068 }
00069 
00070 
00071 /** Destructor. */
00072 FawkesNetworkServerThread::~FawkesNetworkServerThread()
00073 {
00074   for (cit = clients.begin(); cit != clients.end(); ++cit) {
00075     if ( thread_collector ) {
00076       thread_collector->remove((*cit).second);
00077     } else {
00078       (*cit).second->cancel();
00079       (*cit).second->join();
00080     }
00081     delete (*cit).second;
00082   }
00083   if ( thread_collector ) {
00084     thread_collector->remove(acceptor_thread);
00085   } else {
00086     acceptor_thread->cancel();
00087     acceptor_thread->join();
00088   }
00089   delete acceptor_thread;
00090 
00091   delete inbound_messages;
00092 }
00093 
00094 
00095 /** Add a new connection.
00096  * Called by the NetworkAcceptorThread if a new client connected.
00097  * @param s socket for new client
00098  */
00099 void
00100 FawkesNetworkServerThread::add_connection(StreamSocket *s) throw()
00101 {
00102   FawkesNetworkServerClientThread *client = new FawkesNetworkServerClientThread(s, this);
00103 
00104   clients.lock();
00105   client->set_clid(next_client_id);
00106   if ( thread_collector ) {
00107     thread_collector->add(client);
00108   } else {
00109     client->start();
00110   }
00111   clients[next_client_id] = client;
00112   for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00113     (*hit).second->client_connected(next_client_id);
00114   }
00115   ++next_client_id;
00116   clients.unlock();
00117 
00118   wakeup();
00119 }
00120 
00121 
00122 /** Add a handler.
00123  * @param handler to add.
00124  */
00125 void
00126 FawkesNetworkServerThread::add_handler(FawkesNetworkHandler *handler)
00127 {
00128   handlers.lock();
00129   if ( handlers.find(handler->id()) != handlers.end()) {
00130     handlers.unlock();
00131     throw Exception("Handler already registered");
00132   }
00133   handlers[handler->id()] = handler;
00134   handlers.unlock();
00135 }
00136 
00137 
00138 /** Remove handler.
00139  * @param handler handler to remove
00140  */
00141 void
00142 FawkesNetworkServerThread::remove_handler(FawkesNetworkHandler *handler)
00143 {
00144   handlers.lock();
00145   if( handlers.find(handler->id()) != handlers.end() ) {
00146     handlers.erase(handler->id());
00147   }
00148   handlers.unlock();
00149 }
00150 
00151 
00152 /** Fawkes network thread loop.
00153  * The thread loop will check all clients for their alivness and dead
00154  * clients are removed. Then inbound messages are processed and dispatched
00155  * properly to registered handlers. Then the thread waits for a new event
00156  * to happen (event emitting threads need to wakeup this thread!).
00157  */
00158 void
00159 FawkesNetworkServerThread::loop()
00160 {
00161   clients.lock();
00162 
00163   // check for dead clients
00164   cit = clients.begin();
00165   while (cit != clients.end()) {
00166     if ( ! cit->second->alive() ) {
00167       if ( thread_collector ) {
00168         thread_collector->remove((*cit).second);
00169       } else {
00170         cit->second->cancel();
00171         cit->second->join();
00172       }
00173       usleep(5000);
00174       delete cit->second;
00175       unsigned int clid = (*cit).first;
00176       ++cit;
00177       clients.erase(clid);
00178       for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00179         (*hit).second->client_disconnected(clid);
00180       }
00181     } else {
00182       ++cit;
00183     }
00184   }
00185 
00186   // dispatch messages
00187   inbound_messages->lock();
00188   while ( ! inbound_messages->empty() ) {
00189     FawkesNetworkMessage *m = inbound_messages->front();
00190     if ( handlers.find(m->cid()) != handlers.end()) {
00191       handlers[m->cid()]->handle_network_message(m);
00192     }
00193     m->unref();
00194     inbound_messages->pop();
00195   }
00196   inbound_messages->unlock();
00197 
00198   clients.unlock();
00199 }
00200 
00201 
00202 /** Force sending of all pending messages. */
00203 void
00204 FawkesNetworkServerThread::force_send()
00205 {
00206   clients.lock();
00207   for (cit = clients.begin(); cit != clients.end(); ++cit) {
00208     (*cit).second->force_send();
00209   }
00210   clients.unlock();
00211 }
00212 
00213 
00214 /** Broadcast a message.
00215  * Method to broadcast a message to all connected clients. This method will take
00216  * ownership of the passed message. If you want to use if after enqueing it you
00217  * must reference it explicitly before calling this method.
00218  * @param msg Message to broadcast
00219  */
00220 void
00221 FawkesNetworkServerThread::broadcast(FawkesNetworkMessage *msg)
00222 {
00223   for (cit = clients.begin(); cit != clients.end(); ++cit) {
00224     if ( (*cit).second->alive() ) {
00225       msg->ref();
00226       (*cit).second->enqueue(msg);
00227     }
00228   }
00229   msg->unref();
00230 }
00231 
00232 
00233 /** Broadcast a message.
00234  * A FawkesNetworkMessage is created and broacasted via the emitter.
00235  * @param component_id component ID
00236  * @param msg_id message type id
00237  * @param payload payload buffer
00238  * @param payload_size size of payload buffer
00239  * @see FawkesNetworkEmitter::broadcast()
00240  */
00241 void
00242 FawkesNetworkServerThread::broadcast(unsigned short int component_id,
00243                                      unsigned short int msg_id,
00244                                      void *payload, unsigned int payload_size)
00245 {
00246   FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id,
00247                                                      payload, payload_size);
00248   broadcast(m);
00249 }
00250 
00251 
00252 /** Broadcast message without payload.
00253  * @param component_id component ID
00254  * @param msg_id message type ID
00255  */
00256 void
00257 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
00258 {
00259   FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
00260   broadcast(m);
00261 }
00262 
00263 
00264 /** Send a message.
00265  * Method to send a message to a specific client.
00266  * The client ID provided in the message is used to determine the correct
00267  * recipient. If no client is connected for the given client ID the message
00268  * shall be silently ignored.
00269  * This method will take ownership of the passed message. If you want to use
00270  * if after enqueing it you must reference it explicitly before calling this
00271  * method.
00272  * Implemented Emitter interface message.
00273  * @param msg Message to send
00274  */
00275 void
00276 FawkesNetworkServerThread::send(FawkesNetworkMessage *msg)
00277 {
00278   unsigned int clid = msg->clid();
00279   if ( clients.find(clid) != clients.end() ) {
00280     if ( clients[clid]->alive() ) {
00281       clients[clid]->enqueue(msg);
00282     } else {
00283       throw Exception("Client %u not alive", clid);
00284     }
00285   } else {
00286     throw Exception("Client %u not found", clid);
00287   }
00288 }
00289 
00290 
00291 /** Send a message.
00292  * A FawkesNetworkMessage is created and sent via the emitter.
00293  * @param to_clid client ID of recipient
00294  * @param component_id component ID
00295  * @param msg_id message type id
00296  * @param payload payload buffer
00297  * @param payload_size size of payload buffer
00298  * @see FawkesNetworkEmitter::broadcast()
00299  */
00300 void
00301 FawkesNetworkServerThread::send(unsigned int to_clid,
00302                           unsigned short int component_id, unsigned short int msg_id,
00303                            void *payload, unsigned int payload_size)
00304 {
00305   FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00306                                                      payload, payload_size);
00307   send(m);
00308 }
00309 
00310 
00311 /** Send a message.
00312  * A FawkesNetworkMessage is created and sent via the emitter.
00313  * @param to_clid client ID of recipient
00314  * @param component_id component ID
00315  * @param msg_id message type id
00316  * @param content Fawkes complex network message content
00317  * @see FawkesNetworkEmitter::broadcast()
00318  */
00319 void
00320 FawkesNetworkServerThread::send(unsigned int to_clid,
00321                           unsigned short int component_id, unsigned short int msg_id,
00322                           FawkesNetworkMessageContent *content)
00323 {
00324   FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00325                                                      content);
00326   send(m);
00327 }
00328 
00329 
00330 /** Send a message without payload.
00331  * A FawkesNetworkMessage with empty payload is created and sent via the emitter.
00332  * This is particularly useful for simple status messages that you want to send.
00333  * @param to_clid client ID of recipient
00334  * @param component_id component ID
00335  * @param msg_id message type id
00336  * @see FawkesNetworkEmitter::broadcast()
00337  */
00338 void
00339 FawkesNetworkServerThread::send(unsigned int to_clid,
00340                           unsigned short int component_id, unsigned short int msg_id)
00341 {
00342   FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
00343   send(m);
00344 }
00345 
00346 
00347 /** Dispatch messages.
00348  * Actually messages are just put into the inbound message queue and dispatched
00349  * during the next loop iteration. So after adding all the messages you have
00350  * to wakeup the thread to get them actually dispatched.
00351  * @param msg message to dispatch
00352  */
00353 void
00354 FawkesNetworkServerThread::dispatch(FawkesNetworkMessage *msg)
00355 {
00356   msg->ref();
00357   inbound_messages->push_locked(msg);
00358 }
00359 
00360 } // end namespace fawkes