Fawkes API  Fawkes Development Version
fuse_transceiver.cpp
1 
2 /***************************************************************************
3  * fuse_transceiver.cpp - Fuse transceiver
4  *
5  * Created: Wed Nov 14 13:30:34 2007
6  * Copyright 2006-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <fvutils/net/fuse_transceiver.h>
25 #include <fvutils/net/fuse_message_queue.h>
26 #include <fvutils/net/fuse_message.h>
27 #include <netcomm/socket/stream.h>
28 #include <netcomm/utils/exceptions.h>
29 
30 #include <netinet/in.h>
31 #include <cstdlib>
32 
33 using namespace fawkes;
34 
35 namespace firevision {
36 #if 0 /* just to make Emacs auto-indent happy */
37 }
38 #endif
39 
40 /** @class FuseNetworkTransceiver <fvutils/net/fuse_transceiver.h>
41  * FUSE Network Transceiver.
42  * Utility class that provides methods to send and receive messages via
43  * the network. Operates on message queues and a given socket.
44  *
45  * @ingroup FUSE
46  * @ingroup FireVision
47  * @author Tim Niemueller
48  */
49 
50 /** Send messages.
51  * @param s socket over which the data shall be transmitted.
52  * @param msgq message queue that contains the messages that have to be sent
53  * @exception ConnectionDiedException Thrown if any error occurs during the
54  * operation since for any error the conncetion is considered dead.
55  */
56 void
57 FuseNetworkTransceiver::send(StreamSocket *s, FuseNetworkMessageQueue *msgq)
58 {
59  msgq->lock();
60  try {
61  while ( ! msgq->empty() ) {
62  FuseNetworkMessage *m = msgq->front();
63  m->pack();
64  const FUSE_message_t &f = m->fmsg();
65  unsigned int payload_size = m->payload_size();
66  s->write(&(f.header), sizeof(f.header));
67  s->write(f.payload, payload_size);
68  m->unref();
69  msgq->pop();
70  }
71  } catch (SocketException &e) {
72  msgq->unlock();
73  throw ConnectionDiedException("Write failed");
74  }
75  msgq->unlock();
76 }
77 
78 
79 /** Receive data.
80  * This method receives all messages currently available from the network, or
81  * a limited number depending on max_num_msgs. If max_num_msgs is 0 then all
82  * messages are read. Note that on a busy connection this may cause recv() to
83  * never return! The default is to return after 8 messages.
84  * The messages are stored in the supplied message queue.
85  * @param s socket to gather messages from
86  * @param msgq message queue to store received messages in
87  * @param max_num_msgs maximum number of messages to read from stream in one go.
88  * @exception ConnectionDiedException Thrown if any error occurs during the
89  * operation since for any error the conncetion is considered dead.
90  */
91 void
92 FuseNetworkTransceiver::recv(StreamSocket *s, FuseNetworkMessageQueue *msgq,
93  unsigned int max_num_msgs)
94 {
95  msgq->lock();
96 
97  try {
98  unsigned int num_msgs = 0;
99  do {
100  FUSE_message_t msg;
101  s->read(&(msg.header), sizeof(msg.header));
102 
103  unsigned int payload_size = ntohl(msg.header.payload_size);
104 
105  if ( payload_size > 0 ) {
106 
107  msg.payload = malloc(payload_size);
108  s->read(msg.payload, payload_size);
109  } else {
110  msg.payload = NULL;
111  }
112 
113  FuseNetworkMessage *m = new FuseNetworkMessage(&msg);
114  msgq->push(m);
115 
116  ++num_msgs;
117  } while ( s->available() && (num_msgs < max_num_msgs) );
118  } catch (SocketException &e) {
119  msgq->unlock();
120  throw ConnectionDiedException("Read failed");
121  }
122  msgq->unlock();
123 }
124 
125 } // end namespace firevision
void * payload
payload
Definition: fuse.h:94
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
virtual void write(const void *buf, size_t count)
Write to the socket.
Definition: socket.cpp:681
void unlock() const
Unlock list.
Definition: lock_queue.h:131
Fawkes library namespace.
TCP stream socket over IP.
Definition: stream.h:31
virtual bool available()
Check if data is available.
Definition: socket.cpp:611
FUSE Network Message.
Definition: fuse_message.h:41
void pack()
Pack data for sending.
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
virtual size_t read(void *buf, size_t count, bool read_all=true)
Read from socket.
Definition: socket.cpp:729
Thrown if the connection died during an operation.
Definition: exceptions.h:31
FUSE message.
Definition: fuse.h:92
void lock() const
Lock queue.
Definition: lock_queue.h:115
uint32_t payload_size
payload size
Definition: fuse.h:88
const FUSE_message_t & fmsg() const
Get plain message.
FUSE_header_t header
header
Definition: fuse.h:93
size_t payload_size() const
Get payload size.
Socket exception.
Definition: socket.h:58