Fawkes API  Fawkes Development Version
fuse_server_client_thread.cpp
1 
2 /***************************************************************************
3  * fuse_server_client_thread.cpp - client thread for FuseServer
4  *
5  * Created: Tue Nov 13 20:00:55 2007
6  * Copyright 2005-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <fvutils/net/fuse_server_client_thread.h>
25 
26 #include <fvutils/net/fuse_server.h>
27 #include <fvutils/net/fuse_server.h>
28 #include <fvutils/net/fuse_transceiver.h>
29 #include <fvutils/net/fuse_message_queue.h>
30 #include <fvutils/net/fuse_image_content.h>
31 #include <fvutils/net/fuse_lut_content.h>
32 #include <fvutils/net/fuse_imagelist_content.h>
33 #include <fvutils/net/fuse_lutlist_content.h>
34 #include <fvutils/ipc/shm_image.h>
35 #include <fvutils/ipc/shm_lut.h>
36 #include <fvutils/compression/jpeg_compressor.h>
37 
38 #include <core/exceptions/system.h>
39 #include <netcomm/socket/stream.h>
40 #include <netcomm/utils/exceptions.h>
41 #include <logging/liblogger.h>
42 
43 #include <netinet/in.h>
44 #include <cstring>
45 #include <cstdlib>
46 #include <unistd.h>
47 
48 using namespace fawkes;
49 
50 namespace firevision {
51 #if 0 /* just to make Emacs auto-indent happy */
52 }
53 #endif
54 
55 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h>
56  * FUSE Server Client Thread.
57  * This thread is instantiated and started for each client that connects to a
58  * FuseServer.
59  * @ingroup FUSE
60  * @ingroup FireVision
61  * @author Tim Niemueller
62  */
63 
64 /** Constructor.
65  * @param fuse_server parent FUSE server
66  * @param s socket to client
67  */
68 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
69  : Thread("FuseServerClientThread")
70 {
71  __fuse_server = fuse_server;
72  __socket = s;
73  __jpeg_compressor = NULL;
74 
75  __inbound_queue = new FuseNetworkMessageQueue();
76  __outbound_queue = new FuseNetworkMessageQueue();
77 
79  greetmsg->version = htonl(FUSE_CURRENT_VERSION);
80  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING,
81  greetmsg, sizeof(FUSE_greeting_message_t)));
82 
83  __alive = true;
84 }
85 
86 
87 /** Destructor. */
89 {
90  delete __socket;
91  delete __jpeg_compressor;
92 
93  for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) {
94  delete __bit->second;
95  }
96  __buffers.clear();
97 
98  for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) {
99  delete __lit->second;
100  }
101  __luts.clear();
102 
103  while ( ! __inbound_queue->empty() ) {
104  FuseNetworkMessage *m = __inbound_queue->front();
105  m->unref();
106  __inbound_queue->pop();
107  }
108 
109  while ( ! __outbound_queue->empty() ) {
110  FuseNetworkMessage *m = __outbound_queue->front();
111  m->unref();
112  __outbound_queue->pop();
113  }
114 
115  delete __inbound_queue;
116  delete __outbound_queue;
117 }
118 
119 
120 /** Send all messages in outbound queue. */
121 void
123 {
124  if ( ! __outbound_queue->empty() ) {
125  try {
126  FuseNetworkTransceiver::send(__socket, __outbound_queue);
127  } catch (Exception &e) {
128  __fuse_server->connection_died(this);
129  __alive = false;
130  }
131  }
132 }
133 
134 
135 /** Receive data.
136  * Receives data from the network if there is any and then processes all
137  * inbound messages.
138  */
139 void
141 {
142  try {
143  FuseNetworkTransceiver::recv(__socket, __inbound_queue);
144  } catch (ConnectionDiedException &e) {
145  __socket->close();
146  __fuse_server->connection_died(this);
147  __alive = false;
148  }
149 }
150 
151 
152 /** Process greeting message.
153  * @param m received message
154  */
155 void
157 {
159  if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
160  throw Exception("Invalid version on other side");
161  }
162 }
163 
164 
166 FuseServerClientThread::get_shmimgbuf(const char *id)
167 {
168  char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
169  tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
170  strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
171 
172  if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) {
173  // the buffer has not yet been opened
174  try {
175  SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
176  __buffers[tmp_image_id] = b;
177  return b;
178  } catch (Exception &e) {
179  throw;
180  }
181  } else {
182  return __bit->second;
183  }
184 }
185 
186 
187 /** Process image request message.
188  * @param m received message
189  */
190 void
192 {
194 
196  try {
197  b = get_shmimgbuf(irm->image_id);
198  } catch (Exception &e) {
199  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
200  m->payload(), m->payload_size(),
201  /* copy payload */ true);
202  __outbound_queue->push(nm);
203  return;
204  }
205 
206  if ( irm->format == FUSE_IF_RAW ) {
207  FuseImageContent *im = new FuseImageContent(b);
208  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
209  } else if ( irm->format == FUSE_IF_JPEG ) {
210  if ( ! __jpeg_compressor) {
211  __jpeg_compressor = new JpegImageCompressor();
213  }
214  b->lock_for_read();
215  __jpeg_compressor->set_image_dimensions(b->width(), b->height());
216  __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer());
217  unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size());
218  __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size());
219  __jpeg_compressor->compress();
220  b->unlock();
221  size_t compressed_buffer_size = __jpeg_compressor->compressed_size();
222  long int sec = 0, usec = 0;
223  b->capture_time(&sec, &usec);
224  FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(),
225  compressed_buffer, compressed_buffer_size,
226  CS_UNKNOWN, b->width(), b->height(),
227  sec, usec);
228  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
229  free(compressed_buffer);
230  } else {
231  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
232  m->payload(), m->payload_size(),
233  /* copy payload */ true);
234  __outbound_queue->push(nm);
235  }
236 }
237 
238 /** Process image info request message.
239  * @param m received message
240  */
241 void
243 {
245 
247  try {
248  b = get_shmimgbuf(idm->image_id);
249 
250  FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
251 
252  strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH);
253  ii->colorspace = htons(b->colorspace());
254  ii->width = htonl(b->width());
255  ii->height = htonl(b->height());
256  ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
257 
258  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO,
259  ii, sizeof(FUSE_imageinfo_t));
260  __outbound_queue->push(nm);
261  } catch (Exception &e) {
262  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
263  m->payload(), m->payload_size(),
264  /* copy payload */ true);
265  __outbound_queue->push(nm);
266  }
267 }
268 
269 
270 /** Process LUT request message.
271  * @param m received message
272  */
273 void
275 {
277 
278  char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
279  tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
280  strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
281 
282  if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) {
283  // the buffer had already be opened
284  FuseLutContent *lm = new FuseLutContent(__lit->second);
285  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
286  } else {
287  try {
289  __luts[tmp_lut_id] = b;
290  FuseLutContent *lm = new FuseLutContent(b);
291  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
292  } catch (Exception &e) {
293  // could not open the shared memory segment for some reason, send failure
294  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
295  m->payload(), m->payload_size(),
296  /* copy payload */ true);
297  __outbound_queue->push(nm);
298  }
299  }
300 }
301 
302 
303 /** Process LUT setting.
304  * @param m received message
305  */
306 void
308 {
309  FuseLutContent *lc = m->msgc<FuseLutContent>();
311  strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH);
312  // Currently we expect colormaps, so make sure we get sensible dimensions
313 
315  if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) {
316  // the buffer had already been opened
317  b = __lit->second;
318  } else {
319  try {
320  b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false);
321  __luts[lc->lut_id()] = b;
322  } catch (Exception &e) {
323  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
324  reply, sizeof(FUSE_lutdesc_message_t)));
325  e.append("Cannot open shared memory lookup table %s", lc->lut_id());
326  LibLogger::log_warn("FuseServerClientThread", e);
327  delete lc;
328  return;
329  }
330  }
331 
332  if ( (b->width() != lc->width()) ||
333  (b->height() != lc->height()) ||
334  (b->depth() != lc->depth()) ||
335  (b->bytes_per_cell() != lc->bytes_per_cell()) ) {
336  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
337  reply, sizeof(FUSE_lutdesc_message_t)));
338  LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. "
339  "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
340  b->width(), b->height(), b->depth(), b->bytes_per_cell(),
341  lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell());
342  } else {
343  b->set(lc->buffer());
344  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED,
345  reply, sizeof(FUSE_lutdesc_message_t)));
346  }
347 
348  delete lc;
349 }
350 
351 
352 /** Process image list request message.
353  * @param m received message
354  */
355 void
357 {
359 
361  SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
362  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
363 
364  while ( i != endi ) {
365  const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
366  if ( ih ) {
367  ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
368  }
369 
370  ++i;
371  }
372 
373  delete h;
374 
375  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
376 }
377 
378 
379 /** Process LUT list request message.
380  * @param m received message
381  */
382 void
384 {
386 
388  SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
389  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
390 
391  while ( i != endi ) {
392  const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
393  if ( lh ) {
394  llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
395  }
396 
397  ++i;
398  }
399 
400  delete h;
401 
402  __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
403 }
404 
405 
406 /** Process inbound messages. */
407 void
408 FuseServerClientThread::process_inbound()
409 {
410  __inbound_queue->lock();
411  while ( ! __inbound_queue->empty() ) {
412  FuseNetworkMessage *m = __inbound_queue->front();
413 
414  try {
415  switch (m->type()) {
416  case FUSE_MT_GREETING:
418  break;
419  case FUSE_MT_GET_IMAGE:
421  break;
422  case FUSE_MT_GET_IMAGE_INFO:
424  break;
425  case FUSE_MT_GET_IMAGE_LIST:
427  break;
428  case FUSE_MT_GET_LUT_LIST:
430  break;
431  case FUSE_MT_GET_LUT:
433  break;
434  case FUSE_MT_SET_LUT:
436  break;
437  default:
438  throw Exception("Unknown message type received\n");
439  }
440  } catch (Exception &e) {
441  e.append("FUSE protocol error");
442  LibLogger::log_warn("FuseServerClientThread", e);
443  __fuse_server->connection_died(this);
444  __alive = false;
445  }
446 
447  m->unref();
448  __inbound_queue->pop();
449  }
450  __inbound_queue->unlock();
451 }
452 
453 
454 void
456 {
457  if ( ! __alive ) {
458  usleep(10000);
459  return;
460  }
461 
462  short p = 0;
463  try {
464  p = __socket->poll(10); // block for up to 10 ms
465  } catch (InterruptedException &e) {
466  // we just ignore this and try it again
467  return;
468  }
469 
470  if ( (p & Socket::POLL_ERR) ||
471  (p & Socket::POLL_HUP) ||
472  (p & Socket::POLL_RDHUP)) {
473  __fuse_server->connection_died(this);
474  __alive = false;
475  } else if ( p & Socket::POLL_IN ) {
476  try {
477  // Data can be read
478  recv();
479  process_inbound();
480  }
481  catch (...) {
482  __fuse_server->connection_died(this);
483  __alive = false;
484  }
485  }
486 
487  if ( __alive ) {
488  send();
489  }
490 }
491 
492 } // end namespace firevision
void process_setlut_message(FuseNetworkMessage *m)
Process LUT setting.
FireVision FUSE protocol server.
Definition: fuse_server.h:46
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:156
virtual void close()
Close socket.
Definition: socket.cpp:274
void process_getlut_message(FuseNetworkMessage *m)
Process LUT request message.
void connection_died(FuseServerClientThread *client)
Connection died.
Shared memory image buffer header.
Definition: shm_image.h:69
const char * lut_id() const
Get LUT ID.
Definition: shm_lut.cpp:531
FUSE lookup table content.
void process_greeting_message(FuseNetworkMessage *m)
Process greeting message.
unsigned int depth() const
Depth of LUT.
Image request message.
Definition: fuse.h:147
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
virtual void set_compression_destination(ImageCompressor::CompressionDestination cd)
Set compression destination.
void unlock() const
Unlock list.
Definition: lock_queue.h:131
Image info message.
Definition: fuse.h:165
Fawkes library namespace.
virtual size_t compressed_size()
Get compressed size.
Shared Memory iterator.
Definition: shm.h:114
unsigned int height() const
Get LUT height.
Definition: shm_lut.cpp:498
virtual void compress()
Compress image.
unsigned int width() const
Width of LUT.
Shared memory lookup table header.
Definition: shm_lut.h:50
void process_getimagelist_message(FuseNetworkMessage *m)
Process image list request message.
unsigned char * buffer() const
Get buffer.
Image description message.
Definition: fuse.h:155
Thread class encapsulation of pthreads.
Definition: thread.h:42
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
TCP stream socket over IP.
Definition: stream.h:31
Jpeg image compressor.
uint32_t width
width in pixels
Definition: fuse.h:169
void add_lutinfo(const char *lut_id, unsigned int width, unsigned int height, unsigned int depth, unsigned int bytes_per_cell)
Add LUT info.
unsigned int height() const
Get height.
Definition: shm_image.cpp:870
const char * lut_id() const
Get LUT ID.
void * payload() const
Get pointer to payload.
uint32_t colorspace
color space
Definition: fuse.h:167
unsigned int width() const
Get LUT width.
Definition: shm_lut.cpp:487
virtual void set_image_buffer(colorspace_t cspace, unsigned char *buffer)
Set image buffer to compress.
MT * msgc() const
Get correctly parsed output.
Definition: fuse_message.h:107
uint32_t version
version from FUSE_version_t
Definition: fuse.h:99
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:166
unsigned int height() const
Height of LUT.
FUSE Network Message.
Definition: fuse_message.h:41
uint32_t type() const
Get message type.
void process_getimageinfo_message(FuseNetworkMessage *m)
Process image info request message.
Base class for exceptions in Fawkes.
Definition: exception.h:36
unsigned int bytes_per_cell() const
Get bytes per cell.
Definition: shm_lut.cpp:520
const char * image_id() const
Get image number.
Definition: shm_image.cpp:881
FUSE lookup table list content.
version packet, bi-directional
Definition: fuse.h:98
unsigned int bytes_per_cell() const
Bytes per cell in LUT.
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
Shared memory image buffer.
Definition: shm_image.h:181
virtual void set_image_dimensions(unsigned int width, unsigned int height)
Set dimensions of image to compress.
virtual void set_destination_buffer(unsigned char *buf, unsigned int buf_size)
Set destination buffer (if compressing to memory).
void send()
Send all messages in outbound queue.
void process_getimage_message(FuseNetworkMessage *m)
Process image request message.
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:148
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
write compressed image to buffer in memory
void add_imageinfo(const char *image_id, colorspace_t colorspace, unsigned int pixel_width, unsigned int pixel_height)
Add image info.
unsigned int width() const
Get width.
Definition: shm_image.cpp:859
void process_getlutlist_message(FuseNetworkMessage *m)
Process LUT list request message.
colorspace_t colorspace() const
Get colorspace.
Definition: shm_image.cpp:848
Thrown if the connection died during an operation.
Definition: exceptions.h:31
char lut_id[LUT_ID_MAX_LENGTH]
LUT ID.
Definition: fuse.h:161
uint32_t format
requested image format, see FUSE_image_format_t
Definition: fuse.h:149
uint32_t height
height in pixels
Definition: fuse.h:170
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:652
void lock() const
Lock queue.
Definition: lock_queue.h:115
uint32_t buffer_size
size of following image buffer in bytes
Definition: fuse.h:171
virtual size_t recommended_compressed_buffer_size()
Get the recommended size for the compressed buffer.
virtual void loop()
Code to execute in the thread.
MT * msg() const
Get correctly casted payload.
Definition: fuse_message.h:67
Shared memory lookup table.
Definition: shm_lut.h:113
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:341
LUT description message.
Definition: fuse.h:160
size_t payload_size() const
Get payload size.
unsigned int depth() const
Get LUT depth.
Definition: shm_lut.cpp:509