Fawkes API  Fawkes Development Version
fuse_server_client_thread.cpp
00001 
00002 /***************************************************************************
00003  *  fuse_server_client_thread.cpp - client thread for FuseServer
00004  *
00005  *  Created: Tue Nov 13 20:00:55 2007
00006  *  Copyright  2005-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <fvutils/net/fuse_server_client_thread.h>
00025 
00026 #include <fvutils/net/fuse_server.h>
00027 #include <fvutils/net/fuse_server.h>
00028 #include <fvutils/net/fuse_transceiver.h>
00029 #include <fvutils/net/fuse_message_queue.h>
00030 #include <fvutils/net/fuse_image_content.h>
00031 #include <fvutils/net/fuse_lut_content.h>
00032 #include <fvutils/net/fuse_imagelist_content.h>
00033 #include <fvutils/net/fuse_lutlist_content.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <fvutils/compression/jpeg_compressor.h>
00037 
00038 #include <core/exceptions/system.h>
00039 #include <netcomm/socket/stream.h>
00040 #include <netcomm/utils/exceptions.h>
00041 #include <logging/liblogger.h>
00042 
00043 #include <netinet/in.h>
00044 #include <cstring>
00045 #include <cstdlib>
00046 #include <unistd.h>
00047 
00048 using namespace fawkes;
00049 
00050 namespace firevision {
00051 #if 0 /* just to make Emacs auto-indent happy */
00052 }
00053 #endif
00054 
00055 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h>
00056  * FUSE Server Client Thread.
00057  * This thread is instantiated and started for each client that connects to a
00058  * FuseServer.
00059  * @ingroup FUSE
00060  * @ingroup FireVision
00061  * @author Tim Niemueller
00062  */
00063 
00064 /** Constructor.
00065  * @param fuse_server parent FUSE server
00066  * @param s socket to client
00067  */
00068 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
00069   : Thread("FuseServerClientThread")
00070 {
00071   __fuse_server = fuse_server;
00072   __socket = s;
00073   __jpeg_compressor = NULL;
00074 
00075   __inbound_queue  = new FuseNetworkMessageQueue();
00076   __outbound_queue  = new FuseNetworkMessageQueue();
00077 
00078   FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00079   greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00080   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00081                                                 greetmsg, sizeof(FUSE_greeting_message_t)));
00082 
00083   __alive = true;
00084 }
00085 
00086 
00087 /** Destructor. */
00088 FuseServerClientThread::~FuseServerClientThread()
00089 {
00090   delete __socket;
00091   delete __jpeg_compressor;
00092 
00093   for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) {
00094     delete __bit->second;
00095   }
00096   __buffers.clear();
00097 
00098   for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) {
00099     delete __lit->second;
00100   }
00101   __luts.clear();
00102 
00103   while ( ! __inbound_queue->empty() ) {
00104     FuseNetworkMessage *m = __inbound_queue->front();
00105     m->unref();
00106     __inbound_queue->pop();
00107   }
00108 
00109   while ( ! __outbound_queue->empty() ) {
00110     FuseNetworkMessage *m = __outbound_queue->front();
00111     m->unref();
00112     __outbound_queue->pop();
00113   }
00114 
00115   delete __inbound_queue;
00116   delete __outbound_queue;
00117 }
00118 
00119 
00120 /** Send all messages in outbound queue. */
00121 void
00122 FuseServerClientThread::send()
00123 {
00124   if ( ! __outbound_queue->empty() ) {
00125     try {
00126       FuseNetworkTransceiver::send(__socket, __outbound_queue);
00127     } catch (Exception &e) {
00128       __fuse_server->connection_died(this);
00129       __alive = false;
00130     }
00131   }
00132 }
00133 
00134 
00135 /** Receive data.
00136  * Receives data from the network if there is any and then processes all
00137  * inbound messages.
00138  */
00139 void
00140 FuseServerClientThread::recv()
00141 {
00142   try {
00143     FuseNetworkTransceiver::recv(__socket, __inbound_queue);
00144   } catch (ConnectionDiedException &e) {
00145     __socket->close();
00146     __fuse_server->connection_died(this);
00147     __alive = false;
00148   }
00149 }
00150 
00151 
00152 /** Process greeting message.
00153  * @param m received message
00154  */
00155 void
00156 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m)
00157 {
00158   FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00159   if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00160     throw Exception("Invalid version on other side");
00161   }
00162 }
00163 
00164 
00165 SharedMemoryImageBuffer *
00166 FuseServerClientThread::get_shmimgbuf(const char *id)
00167 {
00168   char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
00169   tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
00170   strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
00171 
00172   if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) {
00173     // the buffer has not yet been opened
00174     try {
00175       SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
00176       __buffers[tmp_image_id] = b;
00177       return b;
00178     } catch (Exception &e) {
00179       throw;
00180     }
00181   } else {
00182     return __bit->second;
00183   }
00184 }
00185 
00186 
00187 /** Process image request message.
00188  * @param m received message
00189  */
00190 void
00191 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m)
00192 {
00193   FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>();
00194 
00195   SharedMemoryImageBuffer *b;
00196   try {
00197     b = get_shmimgbuf(irm->image_id);
00198   } catch (Exception &e) {
00199     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00200                                                     m->payload(), m->payload_size(),
00201                                                     /* copy payload */ true);
00202     __outbound_queue->push(nm);
00203     return;
00204   }
00205 
00206   if ( irm->format == FUSE_IF_RAW ) {
00207     FuseImageContent *im = new FuseImageContent(b);
00208     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00209   } else if ( irm->format == FUSE_IF_JPEG ) {
00210     if ( ! __jpeg_compressor) {
00211       __jpeg_compressor = new JpegImageCompressor();
00212       __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM);
00213     }
00214     b->lock_for_read();
00215     __jpeg_compressor->set_image_dimensions(b->width(), b->height());
00216     __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer());
00217     unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size());
00218     __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size());
00219     __jpeg_compressor->compress();
00220     b->unlock();
00221     size_t compressed_buffer_size = __jpeg_compressor->compressed_size();
00222     long int sec = 0, usec = 0;
00223     b->capture_time(&sec, &usec);
00224     FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(),
00225                                                 compressed_buffer, compressed_buffer_size,
00226                                                 CS_UNKNOWN, b->width(), b->height(),
00227                                                 sec, usec);
00228     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00229     free(compressed_buffer);
00230   } else {
00231     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00232                                                     m->payload(), m->payload_size(),
00233                                                     /* copy payload */ true);
00234     __outbound_queue->push(nm);
00235   }
00236 }
00237 
00238 /** Process image info request message.
00239  * @param m received message
00240  */
00241 void
00242 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m)
00243 {
00244   FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>();
00245 
00246   SharedMemoryImageBuffer *b;
00247   try {
00248     b = get_shmimgbuf(idm->image_id);
00249 
00250     FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
00251     
00252     strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH);
00253     ii->colorspace = htons(b->colorspace());
00254     ii->width = htonl(b->width());
00255     ii->height = htonl(b->height());
00256     ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
00257 
00258     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO,
00259                                                     ii, sizeof(FUSE_imageinfo_t));
00260     __outbound_queue->push(nm);
00261   } catch (Exception &e) {
00262     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00263                                                     m->payload(), m->payload_size(),
00264                                                     /* copy payload */ true);
00265     __outbound_queue->push(nm);
00266   }
00267 }
00268 
00269 
00270 /** Process LUT request message.
00271  * @param m received message
00272  */
00273 void
00274 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m)
00275 {
00276   FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>();
00277 
00278   char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
00279   tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
00280   strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
00281 
00282   if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) {
00283     // the buffer had already be opened
00284     FuseLutContent *lm = new FuseLutContent(__lit->second);
00285     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00286   } else {
00287     try {
00288       SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id);
00289       __luts[tmp_lut_id] = b;
00290       FuseLutContent *lm = new FuseLutContent(b);
00291       __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00292     } catch (Exception &e) {
00293       // could not open the shared memory segment for some reason, send failure
00294       FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
00295                                                       m->payload(), m->payload_size(),
00296                                                       /* copy payload */ true);
00297       __outbound_queue->push(nm);
00298     }
00299   }
00300 }
00301 
00302 
00303 /** Process LUT setting.
00304  * @param m received message
00305  */
00306 void
00307 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m)
00308 {
00309   FuseLutContent *lc = m->msgc<FuseLutContent>();  
00310   FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t));
00311   strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH);
00312   // Currently we expect colormaps, so make sure we get sensible dimensions
00313 
00314   SharedMemoryLookupTable *b;
00315   if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) {
00316     // the buffer had already been opened
00317     b = __lit->second;
00318   } else {
00319     try {
00320       b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false);
00321       __luts[lc->lut_id()] = b;
00322     } catch (Exception &e) {
00323       __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00324                                                     reply, sizeof(FUSE_lutdesc_message_t)));
00325       e.append("Cannot open shared memory lookup table %s", lc->lut_id());
00326       LibLogger::log_warn("FuseServerClientThread", e);
00327       delete lc;
00328       return;
00329     }
00330   }
00331 
00332   if ( (b->width() != lc->width())   ||
00333        (b->height() != lc->height()) ||
00334        (b->depth() != lc->depth())   ||
00335        (b->bytes_per_cell() != lc->bytes_per_cell()) ) {
00336     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00337                                                   reply, sizeof(FUSE_lutdesc_message_t)));
00338     LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. "
00339                         "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
00340                         b->width(), b->height(), b->depth(), b->bytes_per_cell(),
00341                         lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell());
00342   } else {
00343     b->set(lc->buffer());
00344     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED,
00345                                                   reply, sizeof(FUSE_lutdesc_message_t)));
00346   }
00347 
00348   delete lc;
00349 }
00350 
00351 
00352 /** Process image list request message.
00353  * @param m received message
00354  */
00355 void
00356 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m)
00357 {
00358   FuseImageListContent *ilm = new FuseImageListContent();
00359 
00360   SharedMemoryImageBufferHeader *h      = new SharedMemoryImageBufferHeader();
00361   SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
00362   SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00363                                                             
00364   while ( i != endi ) {
00365     const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
00366     if ( ih ) {
00367       ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
00368     }
00369 
00370     ++i;
00371   }
00372 
00373   delete h;
00374 
00375   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
00376 }
00377 
00378 
00379 /** Process LUT list request message.
00380  * @param m received message
00381  */
00382 void
00383 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m)
00384 {
00385   FuseLutListContent *llm = new FuseLutListContent();
00386 
00387   SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader();
00388   SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
00389   SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00390                                                             
00391   while ( i != endi ) {
00392     const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
00393     if ( lh ) {
00394       llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
00395     }
00396 
00397     ++i;
00398   }
00399 
00400   delete h;
00401 
00402   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
00403 }
00404 
00405 
00406 /** Process inbound messages. */
00407 void
00408 FuseServerClientThread::process_inbound()
00409 {
00410   __inbound_queue->lock();
00411   while ( ! __inbound_queue->empty() ) {
00412     FuseNetworkMessage *m = __inbound_queue->front();
00413 
00414     try {
00415       switch (m->type()) {
00416       case FUSE_MT_GREETING:
00417         process_greeting_message(m);
00418         break;
00419       case FUSE_MT_GET_IMAGE:
00420         process_getimage_message(m);
00421         break;
00422       case FUSE_MT_GET_IMAGE_INFO:
00423         process_getimageinfo_message(m);
00424         break;
00425       case FUSE_MT_GET_IMAGE_LIST:
00426         process_getimagelist_message(m);
00427         break;
00428       case FUSE_MT_GET_LUT_LIST:
00429         process_getlutlist_message(m);
00430         break;
00431       case FUSE_MT_GET_LUT:
00432         process_getlut_message(m);
00433         break;
00434       case FUSE_MT_SET_LUT:
00435         process_setlut_message(m);
00436         break;
00437       default:
00438         throw Exception("Unknown message type received\n");
00439       }
00440     } catch (Exception &e) {
00441       e.append("FUSE protocol error");
00442       LibLogger::log_warn("FuseServerClientThread", e);
00443       __fuse_server->connection_died(this);
00444       __alive = false;
00445     }
00446 
00447     m->unref();
00448     __inbound_queue->pop();
00449   }
00450   __inbound_queue->unlock();
00451 }
00452 
00453 
00454 void
00455 FuseServerClientThread::loop()
00456 {
00457   if ( ! __alive ) {
00458     usleep(10000);
00459     return;
00460   }
00461 
00462   short p = 0;
00463   try {
00464     p = __socket->poll(10); // block for up to 10 ms
00465   } catch (InterruptedException &e) {
00466     // we just ignore this and try it again
00467     return;
00468   }
00469 
00470   if ( (p & Socket::POLL_ERR) ||
00471        (p & Socket::POLL_HUP) ||
00472        (p & Socket::POLL_RDHUP)) {
00473     __fuse_server->connection_died(this);
00474     __alive = false;
00475   } else if ( p & Socket::POLL_IN ) {
00476     try {
00477       // Data can be read
00478       recv();
00479       process_inbound();
00480     }
00481     catch (...) {
00482       __fuse_server->connection_died(this);
00483       __alive = false;
00484     }
00485   }
00486 
00487   if ( __alive ) {
00488     send();
00489   }
00490 }
00491 
00492 } // end namespace firevision