Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * fuse_server_client_thread.cpp - client thread for FuseServer 00004 * 00005 * Created: Tue Nov 13 20:00:55 2007 00006 * Copyright 2005-2007 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <fvutils/net/fuse_server_client_thread.h> 00025 00026 #include <fvutils/net/fuse_server.h> 00027 #include <fvutils/net/fuse_server.h> 00028 #include <fvutils/net/fuse_transceiver.h> 00029 #include <fvutils/net/fuse_message_queue.h> 00030 #include <fvutils/net/fuse_image_content.h> 00031 #include <fvutils/net/fuse_lut_content.h> 00032 #include <fvutils/net/fuse_imagelist_content.h> 00033 #include <fvutils/net/fuse_lutlist_content.h> 00034 #include <fvutils/ipc/shm_image.h> 00035 #include <fvutils/ipc/shm_lut.h> 00036 #include <fvutils/compression/jpeg_compressor.h> 00037 00038 #include <core/exceptions/system.h> 00039 #include <netcomm/socket/stream.h> 00040 #include <netcomm/utils/exceptions.h> 00041 #include <logging/liblogger.h> 00042 00043 #include <netinet/in.h> 00044 #include <cstring> 00045 #include <cstdlib> 00046 #include <unistd.h> 00047 00048 using namespace fawkes; 00049 00050 namespace firevision { 00051 #if 0 /* just to make Emacs auto-indent happy */ 00052 } 00053 #endif 00054 00055 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h> 00056 * FUSE Server Client Thread. 00057 * This thread is instantiated and started for each client that connects to a 00058 * FuseServer. 00059 * @ingroup FUSE 00060 * @ingroup FireVision 00061 * @author Tim Niemueller 00062 */ 00063 00064 /** Constructor. 00065 * @param fuse_server parent FUSE server 00066 * @param s socket to client 00067 */ 00068 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s) 00069 : Thread("FuseServerClientThread") 00070 { 00071 __fuse_server = fuse_server; 00072 __socket = s; 00073 __jpeg_compressor = NULL; 00074 00075 __inbound_queue = new FuseNetworkMessageQueue(); 00076 __outbound_queue = new FuseNetworkMessageQueue(); 00077 00078 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t)); 00079 greetmsg->version = htonl(FUSE_CURRENT_VERSION); 00080 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING, 00081 greetmsg, sizeof(FUSE_greeting_message_t))); 00082 00083 __alive = true; 00084 } 00085 00086 00087 /** Destructor. */ 00088 FuseServerClientThread::~FuseServerClientThread() 00089 { 00090 delete __socket; 00091 delete __jpeg_compressor; 00092 00093 for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) { 00094 delete __bit->second; 00095 } 00096 __buffers.clear(); 00097 00098 for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) { 00099 delete __lit->second; 00100 } 00101 __luts.clear(); 00102 00103 while ( ! __inbound_queue->empty() ) { 00104 FuseNetworkMessage *m = __inbound_queue->front(); 00105 m->unref(); 00106 __inbound_queue->pop(); 00107 } 00108 00109 while ( ! __outbound_queue->empty() ) { 00110 FuseNetworkMessage *m = __outbound_queue->front(); 00111 m->unref(); 00112 __outbound_queue->pop(); 00113 } 00114 00115 delete __inbound_queue; 00116 delete __outbound_queue; 00117 } 00118 00119 00120 /** Send all messages in outbound queue. */ 00121 void 00122 FuseServerClientThread::send() 00123 { 00124 if ( ! __outbound_queue->empty() ) { 00125 try { 00126 FuseNetworkTransceiver::send(__socket, __outbound_queue); 00127 } catch (Exception &e) { 00128 __fuse_server->connection_died(this); 00129 __alive = false; 00130 } 00131 } 00132 } 00133 00134 00135 /** Receive data. 00136 * Receives data from the network if there is any and then processes all 00137 * inbound messages. 00138 */ 00139 void 00140 FuseServerClientThread::recv() 00141 { 00142 try { 00143 FuseNetworkTransceiver::recv(__socket, __inbound_queue); 00144 } catch (ConnectionDiedException &e) { 00145 __socket->close(); 00146 __fuse_server->connection_died(this); 00147 __alive = false; 00148 } 00149 } 00150 00151 00152 /** Process greeting message. 00153 * @param m received message 00154 */ 00155 void 00156 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m) 00157 { 00158 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>(); 00159 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) { 00160 throw Exception("Invalid version on other side"); 00161 } 00162 } 00163 00164 00165 SharedMemoryImageBuffer * 00166 FuseServerClientThread::get_shmimgbuf(const char *id) 00167 { 00168 char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1]; 00169 tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0; 00170 strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH); 00171 00172 if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) { 00173 // the buffer has not yet been opened 00174 try { 00175 SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id); 00176 __buffers[tmp_image_id] = b; 00177 return b; 00178 } catch (Exception &e) { 00179 throw; 00180 } 00181 } else { 00182 return __bit->second; 00183 } 00184 } 00185 00186 00187 /** Process image request message. 00188 * @param m received message 00189 */ 00190 void 00191 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m) 00192 { 00193 FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>(); 00194 00195 SharedMemoryImageBuffer *b; 00196 try { 00197 b = get_shmimgbuf(irm->image_id); 00198 } catch (Exception &e) { 00199 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED, 00200 m->payload(), m->payload_size(), 00201 /* copy payload */ true); 00202 __outbound_queue->push(nm); 00203 return; 00204 } 00205 00206 if ( irm->format == FUSE_IF_RAW ) { 00207 FuseImageContent *im = new FuseImageContent(b); 00208 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im)); 00209 } else if ( irm->format == FUSE_IF_JPEG ) { 00210 if ( ! __jpeg_compressor) { 00211 __jpeg_compressor = new JpegImageCompressor(); 00212 __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM); 00213 } 00214 b->lock_for_read(); 00215 __jpeg_compressor->set_image_dimensions(b->width(), b->height()); 00216 __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer()); 00217 unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size()); 00218 __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size()); 00219 __jpeg_compressor->compress(); 00220 b->unlock(); 00221 size_t compressed_buffer_size = __jpeg_compressor->compressed_size(); 00222 long int sec = 0, usec = 0; 00223 b->capture_time(&sec, &usec); 00224 FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(), 00225 compressed_buffer, compressed_buffer_size, 00226 CS_UNKNOWN, b->width(), b->height(), 00227 sec, usec); 00228 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im)); 00229 free(compressed_buffer); 00230 } else { 00231 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED, 00232 m->payload(), m->payload_size(), 00233 /* copy payload */ true); 00234 __outbound_queue->push(nm); 00235 } 00236 } 00237 00238 /** Process image info request message. 00239 * @param m received message 00240 */ 00241 void 00242 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m) 00243 { 00244 FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>(); 00245 00246 SharedMemoryImageBuffer *b; 00247 try { 00248 b = get_shmimgbuf(idm->image_id); 00249 00250 FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t)); 00251 00252 strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH); 00253 ii->colorspace = htons(b->colorspace()); 00254 ii->width = htonl(b->width()); 00255 ii->height = htonl(b->height()); 00256 ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height()); 00257 00258 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO, 00259 ii, sizeof(FUSE_imageinfo_t)); 00260 __outbound_queue->push(nm); 00261 } catch (Exception &e) { 00262 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED, 00263 m->payload(), m->payload_size(), 00264 /* copy payload */ true); 00265 __outbound_queue->push(nm); 00266 } 00267 } 00268 00269 00270 /** Process LUT request message. 00271 * @param m received message 00272 */ 00273 void 00274 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m) 00275 { 00276 FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>(); 00277 00278 char tmp_lut_id[LUT_ID_MAX_LENGTH + 1]; 00279 tmp_lut_id[LUT_ID_MAX_LENGTH] = 0; 00280 strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH); 00281 00282 if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) { 00283 // the buffer had already be opened 00284 FuseLutContent *lm = new FuseLutContent(__lit->second); 00285 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm)); 00286 } else { 00287 try { 00288 SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id); 00289 __luts[tmp_lut_id] = b; 00290 FuseLutContent *lm = new FuseLutContent(b); 00291 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm)); 00292 } catch (Exception &e) { 00293 // could not open the shared memory segment for some reason, send failure 00294 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED, 00295 m->payload(), m->payload_size(), 00296 /* copy payload */ true); 00297 __outbound_queue->push(nm); 00298 } 00299 } 00300 } 00301 00302 00303 /** Process LUT setting. 00304 * @param m received message 00305 */ 00306 void 00307 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m) 00308 { 00309 FuseLutContent *lc = m->msgc<FuseLutContent>(); 00310 FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t)); 00311 strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH); 00312 // Currently we expect colormaps, so make sure we get sensible dimensions 00313 00314 SharedMemoryLookupTable *b; 00315 if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) { 00316 // the buffer had already been opened 00317 b = __lit->second; 00318 } else { 00319 try { 00320 b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false); 00321 __luts[lc->lut_id()] = b; 00322 } catch (Exception &e) { 00323 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, 00324 reply, sizeof(FUSE_lutdesc_message_t))); 00325 e.append("Cannot open shared memory lookup table %s", lc->lut_id()); 00326 LibLogger::log_warn("FuseServerClientThread", e); 00327 delete lc; 00328 return; 00329 } 00330 } 00331 00332 if ( (b->width() != lc->width()) || 00333 (b->height() != lc->height()) || 00334 (b->depth() != lc->depth()) || 00335 (b->bytes_per_cell() != lc->bytes_per_cell()) ) { 00336 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, 00337 reply, sizeof(FUSE_lutdesc_message_t))); 00338 LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. " 00339 "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)", 00340 b->width(), b->height(), b->depth(), b->bytes_per_cell(), 00341 lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell()); 00342 } else { 00343 b->set(lc->buffer()); 00344 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED, 00345 reply, sizeof(FUSE_lutdesc_message_t))); 00346 } 00347 00348 delete lc; 00349 } 00350 00351 00352 /** Process image list request message. 00353 * @param m received message 00354 */ 00355 void 00356 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m) 00357 { 00358 FuseImageListContent *ilm = new FuseImageListContent(); 00359 00360 SharedMemoryImageBufferHeader *h = new SharedMemoryImageBufferHeader(); 00361 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h); 00362 SharedMemory::SharedMemoryIterator endi = SharedMemory::end(); 00363 00364 while ( i != endi ) { 00365 const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i); 00366 if ( ih ) { 00367 ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height()); 00368 } 00369 00370 ++i; 00371 } 00372 00373 delete h; 00374 00375 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm)); 00376 } 00377 00378 00379 /** Process LUT list request message. 00380 * @param m received message 00381 */ 00382 void 00383 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m) 00384 { 00385 FuseLutListContent *llm = new FuseLutListContent(); 00386 00387 SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader(); 00388 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h); 00389 SharedMemory::SharedMemoryIterator endi = SharedMemory::end(); 00390 00391 while ( i != endi ) { 00392 const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i); 00393 if ( lh ) { 00394 llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell()); 00395 } 00396 00397 ++i; 00398 } 00399 00400 delete h; 00401 00402 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm)); 00403 } 00404 00405 00406 /** Process inbound messages. */ 00407 void 00408 FuseServerClientThread::process_inbound() 00409 { 00410 __inbound_queue->lock(); 00411 while ( ! __inbound_queue->empty() ) { 00412 FuseNetworkMessage *m = __inbound_queue->front(); 00413 00414 try { 00415 switch (m->type()) { 00416 case FUSE_MT_GREETING: 00417 process_greeting_message(m); 00418 break; 00419 case FUSE_MT_GET_IMAGE: 00420 process_getimage_message(m); 00421 break; 00422 case FUSE_MT_GET_IMAGE_INFO: 00423 process_getimageinfo_message(m); 00424 break; 00425 case FUSE_MT_GET_IMAGE_LIST: 00426 process_getimagelist_message(m); 00427 break; 00428 case FUSE_MT_GET_LUT_LIST: 00429 process_getlutlist_message(m); 00430 break; 00431 case FUSE_MT_GET_LUT: 00432 process_getlut_message(m); 00433 break; 00434 case FUSE_MT_SET_LUT: 00435 process_setlut_message(m); 00436 break; 00437 default: 00438 throw Exception("Unknown message type received\n"); 00439 } 00440 } catch (Exception &e) { 00441 e.append("FUSE protocol error"); 00442 LibLogger::log_warn("FuseServerClientThread", e); 00443 __fuse_server->connection_died(this); 00444 __alive = false; 00445 } 00446 00447 m->unref(); 00448 __inbound_queue->pop(); 00449 } 00450 __inbound_queue->unlock(); 00451 } 00452 00453 00454 void 00455 FuseServerClientThread::loop() 00456 { 00457 if ( ! __alive ) { 00458 usleep(10000); 00459 return; 00460 } 00461 00462 short p = 0; 00463 try { 00464 p = __socket->poll(10); // block for up to 10 ms 00465 } catch (InterruptedException &e) { 00466 // we just ignore this and try it again 00467 return; 00468 } 00469 00470 if ( (p & Socket::POLL_ERR) || 00471 (p & Socket::POLL_HUP) || 00472 (p & Socket::POLL_RDHUP)) { 00473 __fuse_server->connection_died(this); 00474 __alive = false; 00475 } else if ( p & Socket::POLL_IN ) { 00476 try { 00477 // Data can be read 00478 recv(); 00479 process_inbound(); 00480 } 00481 catch (...) { 00482 __fuse_server->connection_died(this); 00483 __alive = false; 00484 } 00485 } 00486 00487 if ( __alive ) { 00488 send(); 00489 } 00490 } 00491 00492 } // end namespace firevision