Fawkes API  Fawkes Development Version
fuse_client.cpp
1 
2 /***************************************************************************
3  * fuse_client.cpp - FUSE network transport client
4  *
5  * Created: Thu Mar 29 00:47:24 2007
6  * Copyright 2005-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <fvutils/net/fuse_client.h>
25 
26 #include <fvutils/net/fuse_transceiver.h>
27 #include <fvutils/net/fuse_message_queue.h>
28 #include <fvutils/net/fuse_message.h>
29 #include <fvutils/net/fuse_client_handler.h>
30 
31 #include <core/threading/mutex.h>
32 #include <core/threading/wait_condition.h>
33 #include <core/exceptions/software.h>
34 #include <netcomm/socket/stream.h>
35 #include <netcomm/utils/exceptions.h>
36 
37 #include <cstring>
38 #include <netinet/in.h>
39 #include <cstdlib>
40 #include <unistd.h>
41 
42 using namespace fawkes;
43 
44 namespace firevision {
45 #if 0 /* just to make Emacs auto-indent happy */
46 }
47 #endif
48 
49 /** @class FuseClient <fvutils/net/fuse_client.h>
50  * FUSE client.
51  * FUSE is the FireVision protocol to retrieve information, images and lookup
52  * tables from vision processes and to send control commands to these systems.
53  * The client is used in the retrieving or controlling process.
54  * @ingroup FUSE
55  * @ingroup FireVision
56  * @author Tim Niemueller
57  */
58 
59 /** Constructor.
60  * @param hostname host to connect to
61  * @param port port to connect to
62  * @param handler client handler to handle incoming data
63  */
64 FuseClient::FuseClient(const char *hostname, unsigned short int port,
65  FuseClientHandler *handler)
66  : Thread("FuseClient")
67 {
68  __hostname = strdup(hostname);
69  __port = port;
70  __handler = handler;
71 
72  __wait_timeout = 10;
73 
74  __inbound_msgq = new FuseNetworkMessageQueue();
75  __outbound_msgq = new FuseNetworkMessageQueue();
76 
77  __mutex = new Mutex();
78  __recv_mutex = new Mutex();
79  __recv_waitcond = new WaitCondition(__recv_mutex);
80  __socket = new StreamSocket();
81  __greeting_mutex = new Mutex();
82  __greeting_waitcond = new WaitCondition(__greeting_mutex);
83 
84  __alive = true;
85  __greeting_received = false;
86 }
87 
88 
89 /** Destructor. */
91 {
92  free(__hostname);
93 
94  while ( ! __inbound_msgq->empty() ) {
95  FuseNetworkMessage *m = __inbound_msgq->front();
96  m->unref();
97  __inbound_msgq->pop();
98  }
99  delete __inbound_msgq;
100 
101  while ( ! __outbound_msgq->empty() ) {
102  FuseNetworkMessage *m = __outbound_msgq->front();
103  m->unref();
104  __outbound_msgq->pop();
105  }
106  delete __outbound_msgq;
107 
108  delete __mutex;
109  delete __recv_mutex;
110  delete __recv_waitcond;
111  delete __socket;
112  delete __greeting_mutex;
113  delete __greeting_waitcond;
114 }
115 
116 
117 /** Connect. */
118 void
120 {
121  __socket->connect(__hostname, __port);
122 
124  greetmsg->version = htonl(FUSE_CURRENT_VERSION);
125  __outbound_msgq->push(new FuseNetworkMessage(FUSE_MT_GREETING,
126  greetmsg, sizeof(FUSE_greeting_message_t)));
127 }
128 
129 
130 /** Disconnect. */
131 void
133 {
134  __mutex->lock();
135  delete __socket;
136  __socket = new StreamSocket();
137  __alive = false;
138  __mutex->unlock();
139 }
140 
141 
142 /** Send queued messages. */
143 void
144 FuseClient::send()
145 {
146  try {
147  FuseNetworkTransceiver::send(__socket, __outbound_msgq);
148  } catch (ConnectionDiedException &e) {
149  e.print_trace();
150  __socket->close();
151  __alive = false;
152  __handler->fuse_connection_died();
153  __recv_waitcond->wake_all();
154  }
155 }
156 
157 
158 /** Receive messages. */
159 void
160 FuseClient::recv()
161 {
162  __recv_mutex->lock();
163  try {
164  while ( __socket->available() ) {
165  FuseNetworkTransceiver::recv(__socket, __inbound_msgq);
166  }
167  } catch (ConnectionDiedException &e) {
168  e.print_trace();
169  __socket->close();
170  __alive = false;
171  __handler->fuse_connection_died();
172  __recv_waitcond->wake_all();
173  }
174  __recv_mutex->unlock();
175 }
176 
177 
178 /** Enqueue message.
179  * This method takes ownership of the passed message. You must explicitly
180  * reference it before enqueing if you want to use it afterwards.
181  * @param m message to enqueue
182  */
183 void
185 {
186  __outbound_msgq->push_locked(m);
187 }
188 
189 
190 /** Enqueue message.
191  * @param type type of message
192  * @param payload payload of message
193  * @param payload_size size of payload
194  */
195 void
196 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
197 {
198  FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
199  __outbound_msgq->push_locked(m);
200 }
201 
202 
203 /** Enqueue message without payload.
204  * @param type type of message
205  */
206 void
207 FuseClient::enqueue(FUSE_message_type_t type)
208 {
209  FuseNetworkMessage *m = new FuseNetworkMessage(type);
210  __outbound_msgq->push_locked(m);
211 }
212 
213 
214 /** Enqueue message and wait for reply.
215  * The wait happens atomically, use this to avoid race conditions. This method
216  * takes ownership of the passed message. You must explicitly reference it
217  * before enqueing if you want to use it afterwards.
218  * @param m message to enqueue
219  */
220 void
222 {
223  __recv_mutex->lock();
224  __outbound_msgq->push_locked(m);
225  __recv_waitcond->wait();
226  __recv_mutex->unlock();
227 }
228 
229 
230 /** Enqueue message and wait for reply.
231  * The wait happens atomically, use this to avoid race conditions.
232  * @param type type of message
233  * @param payload payload of message
234  * @param payload_size size of payload
235  */
236 void
237 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
238 {
239  FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
240  __recv_mutex->lock();
241  __outbound_msgq->push_locked(m);
242  __recv_waitcond->wait();
243  __recv_mutex->unlock();
244 }
245 
246 
247 /** Enqueue message without payload and wait for reply.
248  * The wait happens atomically, use this to avoid race conditions.
249  * @param type type of message
250  */
251 void
252 FuseClient::enqueue_and_wait(FUSE_message_type_t type)
253 {
254  FuseNetworkMessage *m = new FuseNetworkMessage(type);
255  __recv_mutex->lock();
256  __outbound_msgq->push_locked(m);
257  __recv_waitcond->wait();
258  __recv_mutex->unlock();
259 }
260 
261 
262 
263 /** Sleep for some time.
264  * Wait until inbound messages have been receive, the connection dies or the
265  * timeout has been reached, whatever comes first. So you sleep at most timeout ms,
266  * but short under some circumstances (incoming data or lost connection).
267  */
268 void
269 FuseClient::sleep()
270 {
271  try {
272  __socket->poll(__wait_timeout /* ms timeout */, Socket::POLL_IN);
273  } catch (Exception &e) {
274  }
275 }
276 
277 
278 /** Thread loop.
279  * Sends enqueued messages and reads incoming messages off the network.
280  */
281 void
283 {
284  __mutex->lock();
285 
286  if ( ! __alive ) {
287  __mutex->unlock();
288  usleep(10000);
289  return;
290  }
291 
292  bool wake = false;
293 
294  send();
295  sleep();
296  recv();
297 
298  //process_inbound();
299 
300  __inbound_msgq->lock();
301  while ( ! __inbound_msgq->empty() ) {
302  FuseNetworkMessage *m = __inbound_msgq->front();
303 
304  if ( m->type() == FUSE_MT_GREETING ) {
306  if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
307  __handler->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
308  __alive = false;
309  } else {
310  __greeting_mutex->lock();
311  __greeting_received = true;
312  __greeting_waitcond->wake_all();
313  __greeting_mutex->unlock();
314  __handler->fuse_connection_established();
315  }
316  } else {
317  __handler->fuse_inbound_received(m);
318  wake = true;
319  }
320 
321  m->unref();
322  __inbound_msgq->pop();
323  }
324  __inbound_msgq->unlock();
325 
326  if ( wake ) {
327  __recv_waitcond->wake_all();
328  }
329  __mutex->unlock();
330 }
331 
332 
333 /** Wait for messages.
334  * This will wait for messages to arrive. The calling
335  * thread is blocked until messages are available.
336  */
337 void
339 {
340  __recv_mutex->lock();
341  __recv_waitcond->wait();
342  __recv_mutex->unlock();
343 }
344 
345 
346 /** Wait for greeting message.
347  * This method will wait for the greeting message to arrive. Make sure that you called
348  * connect() before waiting or call it concurrently in another thread. The calling thread
349  * will be blocked until the message has been received. If the message has already been
350  * received this method will return immediately. Thus it is safe to call this at any time
351  * without risking a race condition.
352  */
353 void
355 {
356  __greeting_mutex->lock();
357  while (! __greeting_received) {
358  __greeting_waitcond->wait();
359  }
360  __greeting_mutex->unlock();
361 }
362 
363 } // end namespace firevision
virtual void connect(const char *hostname, const unsigned short int port)
Connect socket.
Definition: socket.cpp:340
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
virtual void close()
Close socket.
Definition: socket.cpp:274
void connect()
Connect.
Wait until a given condition holds.
void enqueue_and_wait(FuseNetworkMessage *message)
Enqueue message and wait for reply.
void disconnect()
Disconnect.
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
void unlock() const
Unlock list.
Definition: lock_queue.h:131
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
void wake_all()
Wake up all waiting threads.
void enqueue(FuseNetworkMessage *m)
Enqueue message.
Thread class encapsulation of pthreads.
Definition: thread.h:42
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
TCP stream socket over IP.
Definition: stream.h:31
virtual bool available()
Check if data is available.
Definition: socket.cpp:611
uint32_t version
version from FUSE_version_t
Definition: fuse.h:99
void wait()
Wait for messages.
FUSE Network Message.
Definition: fuse_message.h:41
uint32_t type() const
Get message type.
Base class for exceptions in Fawkes.
Definition: exception.h:36
version packet, bi-directional
Definition: fuse.h:98
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
virtual void fuse_invalid_server_version(uint32_t local_version, uint32_t remote_version)=0
Invalid version string received.
void wait()
Wait for the condition forever.
virtual ~FuseClient()
Destructor.
Definition: fuse_client.cpp:90
Thrown if the connection died during an operation.
Definition: exceptions.h:31
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:619
virtual void loop()
Thread loop.
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:652
void wait_greeting()
Wait for greeting message.
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:139
void lock() const
Lock queue.
Definition: lock_queue.h:115
void lock()
Lock this mutex.
Definition: mutex.cpp:89
virtual void fuse_connection_died()=0
Connection died.
Mutex mutual exclusion lock.
Definition: mutex.h:32
virtual void fuse_connection_established()=0
Connection has been established.
MT * msg() const
Get correctly casted payload.
Definition: fuse_message.h:67
virtual void fuse_inbound_received(FuseNetworkMessage *m)=0
Message received.