Fawkes API  Fawkes Development Version
base_thread.cpp
1 
2 /***************************************************************************
3  * base_thread.cpp - FireVision Base Thread
4  *
5  * Created: Tue May 29 16:41:50 2007
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "base_thread.h"
24 #include "acquisition_thread.h"
25 #include "aqt_vision_threads.h"
26 
27 #include <core/threading/thread.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/threading/barrier.h>
31 #include <logging/logger.h>
32 
33 #include <fvutils/system/camargp.h>
34 #include <fvutils/ipc/shm_image.h>
35 #include <fvutils/ipc/shm_lut.h>
36 #include <fvcams/factory.h>
37 #include <fvcams/cam_exceptions.h>
38 #include <fvcams/control/factory.h>
39 #include <core/exceptions/software.h>
40 
41 #include <aspect/vision.h>
42 
43 #include <algorithm>
44 #include <unistd.h>
45 
46 using namespace fawkes;
47 using namespace firevision;
48 
49 /** @class FvBaseThread "base_thread.h"
50  * FireVision base thread.
51  * This implements the functionality of the FvBasePlugin.
52  * @author Tim Niemueller
53  */
54 
55 /** Constructor. */
57  : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
58  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
59  VisionMasterAspect(this)
60 {
61  // default to 30 seconds
62  __aqt_timeout = 30;
63  __aqt_barrier = new Barrier(1);
64 }
65 
66 
67 /** Destructor. */
69 {
70  delete __aqt_barrier;
71 }
72 
73 
74 void
76 {
77  // wipe all previously existing FireVision shared memory segments
78  // that are orphaned
79  SharedMemoryImageBuffer::cleanup(/* use lister */ false);
80  SharedMemoryLookupTable::cleanup(/* use lister */ false);
81 }
82 
83 
84 void
86 {
87  __aqts.lock();
88  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
89  thread_collector->remove(__ait->second);
90  delete __ait->second;
91  }
92  __aqts.clear();
93  __aqts.unlock();
94  __owned_controls.lock();
96  for (i = __owned_controls.begin(); i != __owned_controls.end(); ++i) {
97  delete *i;
98  }
99  __owned_controls.clear();
100  __owned_controls.unlock();
101 }
102 
103 
104 /** Thread loop. */
105 void
107 {
108  __aqts.lock();
109 
110  try {
111  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
112  __ait->second->set_vt_prepfin_hold(true);
113  }
114  } catch (Exception &e) {
115  logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
116  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
117  __ait->second->set_vt_prepfin_hold(false);
118  }
119  __aqts.unlock();
120  return;
121  }
122 
123  // Wakeup all cyclic acquisition threads and wait for them
124  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
125  if ( __ait->second->aqtmode() == FvAcquisitionThread::AqtCyclic ) {
126  //logger->log_debug(name(), "Waking Thread %s", __ait->second->name());
127  __ait->second->wakeup(__aqt_barrier);
128  }
129  }
130 
131  __aqt_barrier->wait();
132 
133  // Check for aqt timeouts
134  for (__ait = __aqts.begin(); __ait != __aqts.end();) {
135  if ( __ait->second->vision_threads->empty() &&
136  (__ait->second->vision_threads->empty_time() > __aqt_timeout) ) {
137 
138  logger->log_info(name(), "Acquisition thread %s timed out, destroying",
139  __ait->second->name());
140 
141 
142  thread_collector->remove(__ait->second);
143  delete __ait->second;
144  __aqts.erase(__ait++);
145  } else {
146  ++__ait;
147  }
148  }
149 
150  __started_threads.lock();
151  fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = __started_threads.begin();
152  while (stit != __started_threads.end()) {
153 
154  logger->log_info(name(), "Thread %s has been started, %zu",
155  stit->second->name(), __started_threads.size());
156 
157  // if the thread is registered in that aqt mark it running
158  stit->second->vision_threads->set_thread_running(stit->first);
159 
160  if ( stit->second->vision_threads->has_cyclic_thread() ) {
161  // Make thread actually capture data
162  stit->second->set_enabled(true);
163 
164  if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic ) {
165  logger->log_info(name(), "Switching acquisition thread %s to cyclic mode",
166  stit->second->name());
167 
168  stit->second->prepare_finalize();
169  stit->second->cancel();
170  stit->second->join();
171  stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
172  stit->second->start();
173  stit->second->cancel_finalize();
174  }
175  } else if ( stit->second->vision_threads->has_cont_thread() ) {
176  // Make thread actually capture data
177  stit->second->set_enabled(true);
178 
179  if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
180  logger->log_info(name(), "Switching acquisition thread %s to continuous mode",
181  stit->second->name());
182  stit->second->prepare_finalize();
183  stit->second->cancel();
184  stit->second->join();
185  stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
186  stit->second->start();
187  stit->second->cancel_finalize();
188  }
189  } else {
190  logger->log_warn(name(), "Acquisition thread %s has no threads while we expected some",
191  stit->second->name());
192  // Make thread stop capturing data
193  stit->second->set_enabled(false);
194  }
195 
197  ++stit;
198  __started_threads.erase( stittmp );
199  }
200  __started_threads.unlock();
201 
202  // Re-create barrier as necessary after _adding_ threads
203  unsigned int num_cyclic_threads = 0;
204  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
205  if ( __ait->second->vision_threads->has_cyclic_thread() ) {
206  ++num_cyclic_threads;
207  }
208  }
209  cond_recreate_barrier(num_cyclic_threads);
210 
211  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
212  __ait->second->set_vt_prepfin_hold(false);
213  }
214 
215  __aqts.unlock();
216 }
217 
218 
219 /** Get vision master.
220  * @return vision master
221  */
222 VisionMaster *
224 {
225  return this;
226 }
227 
228 
229 Camera *
230 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread,
231  colorspace_t cspace)
232 {
233  Camera *c = NULL;
234  __aqts.lock();
235 
236  logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
237 
238  VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
239  if ( vision_thread == NULL ) {
240  throw TypeMismatchException("Thread is not a vision thread");
241  }
242 
243  CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
244  try {
245  std::string id = cap->cam_type() + "." + cap->cam_id();
246  if ( __aqts.find(id) != __aqts.end() ) {
247  // this camera has already been loaded
248  c = __aqts[id]->camera_instance(cspace,
249  (vision_thread->vision_thread_mode() ==
250  VisionAspect::CONTINUOUS));
251 
252  __aqts[id]->vision_threads->add_waiting_thread(thread);
253 
254  } else {
255  Camera *cam = NULL;
256  try {
257  cam = CameraFactory::instance(cap);
258  cam->open();
259  } catch (Exception &e) {
260  delete cam;
261  e.append("Could not open camera");
262  throw;
263  }
264 
265  FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
266 
267  c = aqt->camera_instance(cspace, (vision_thread->vision_thread_mode() ==
268  VisionAspect::CONTINUOUS));
269 
270  aqt->vision_threads->add_waiting_thread(thread);
271 
272  __aqts[id] = aqt;
273  thread_collector->add(aqt);
274 
275  // no need to recreate barrier, by default aqts operate in continuous mode
276 
277  logger->log_info(name(), "Acquisition thread '%s' started for thread '%s' and camera '%s'",
278  aqt->name(), thread->name(), id.c_str());
279 
280  }
281 
282  thread->add_notification_listener(this);
283 
284  } catch (UnknownCameraTypeException &e) {
285  delete cap;
286  e.append("FvBaseVisionMaster: could not instantiate camera");
287  __aqts.unlock();
288  throw;
289  } catch (Exception &e) {
290  delete cap;
291  e.append("FvBaseVisionMaster: could not open or start camera");
292  __aqts.unlock();
293  throw;
294  }
295 
296  delete cap;
297 
298  __aqts.unlock();
299  return c;
300 }
301 
302 
303 Camera *
304 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
305 {
306  Camera *camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
307  CameraArgumentParser cap(camera_string);
308  try {
309  std::string id = cap.cam_type() + "." + cap.cam_id();
310  __aqts.lock();
311  if ( __aqts.find(id) != __aqts.end() ) {
312  __aqts[id]->raw_subscriber_thread = thread;
313  }
314  __aqts.unlock();
315  } catch (Exception &e) {
316  __aqts.unlock();
317  throw;
318  }
319  return camera;
320 }
321 
323 FvBaseThread::create_camctrl(const char *camera_string)
324 {
325  CameraControl *cc = CameraControlFactory::instance(camera_string);
326  if (cc) {
327  __owned_controls.lock();
328  __owned_controls.push_back(cc);
329  __owned_controls.sort();
330  __owned_controls.unique();
331  __owned_controls.unlock();
332  return cc;
333  } else {
334  throw Exception("Cannot create camera control of desired type");
335  }
336 }
337 
339 FvBaseThread::acquire_camctrl(const char *cam_string)
340 {
341  CameraArgumentParser cap(cam_string);
342  std::string id = cap.cam_type() + "." + cap.cam_id();
343 
344  // Has this camera been loaded?
345  MutexLocker lock(__aqts.mutex());
346  if (__aqts.find(id) != __aqts.end()) {
347  return CameraControlFactory::instance(__aqts[id]->get_camera());
348  } else {
349  return create_camctrl(cam_string);
350  }
351 }
352 
353 
355 FvBaseThread::acquire_camctrl(const char *cam_string,
356  const std::type_info &typeinf)
357 {
358  CameraArgumentParser cap(cam_string);
359  std::string id = cap.cam_type() + "." + cap.cam_id();
360 
361  // Has this camera been loaded?
362  MutexLocker lock(__aqts.mutex());
363  if (__aqts.find(id) != __aqts.end()) {
364  return CameraControlFactory::instance(typeinf, __aqts[id]->get_camera());
365  } else {
366  return create_camctrl(cam_string);
367  }
368 }
369 
370 
371 void
373 {
374  __owned_controls.lock();
376  if ((f = std::find(__owned_controls.begin(), __owned_controls.end(), cc)) != __owned_controls.end()) {
377  delete *f;
378  __owned_controls.erase(f);
379  }
380  __owned_controls.unlock();
381 }
382 
383 
384 /** Conditionally re-create barriers.
385  * Re-create barriers if the number of cyclic threads has changed.
386  * @param num_cyclic_threads new number of cyclic threads
387  */
388 void
389 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
390 {
391  if ( (num_cyclic_threads + 1) != __aqt_barrier->count() ) {
392  delete __aqt_barrier;
393  __aqt_barrier = new Barrier( num_cyclic_threads + 1 ); // +1 for base thread
394  }
395 }
396 
397 
398 void
400 {
401  __aqts.lock();
402  unsigned int num_cyclic_threads = 0;
403 
404  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
405 
406  // Remove thread from all aqts
407  __ait->second->vision_threads->remove_thread(thread);
408 
409  if (__ait->second->raw_subscriber_thread == thread) {
410  __ait->second->raw_subscriber_thread = NULL;
411  }
412 
413  if ( __ait->second->vision_threads->has_cyclic_thread() ) {
414  ++num_cyclic_threads;
415 
416  } else if (__ait->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
417  logger->log_info(name(), "Switching acquisition thread %s to continuous mode "
418  "on unregister", __ait->second->name());
419 
420  __ait->second->prepare_finalize();
421  __ait->second->cancel();
422  __ait->second->join();
423  __ait->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
424  __ait->second->start();
425  __ait->second->cancel_finalize();
426  }
427 
428  if (__ait->second->vision_threads->empty()) {
429  // Make thread stop capturing data
430  logger->log_info(name(), "Disabling capturing on thread %s (no more threads)",
431  __ait->second->name());
432  __ait->second->set_enabled(false);
433  }
434  }
435  // Recreate as necessary after _removing_ threads
436  cond_recreate_barrier(num_cyclic_threads);
437 
438  __aqts.unlock();
439 }
440 
441 
442 bool
444 {
445  __aqts.lock();
446  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
447  if (__ait->second->vision_threads->has_waiting_thread(thread)) {
448  __started_threads.lock();
449  __started_threads[thread] = __ait->second;
450  __started_threads.unlock();
451  }
452  }
453  __aqts.unlock();
454 
455  return false;
456 }
457 
458 
459 bool
461 {
462  __aqts.lock();
463  for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
464  __ait->second->vision_threads->remove_waiting_thread(thread);
465  }
466  __aqts.unlock();
467 
468  return false;
469 }
void add_notification_listener(ThreadNotificationListener *notification_listener)
Add notification listener.
Definition: thread.cpp:1170
virtual bool thread_started(fawkes::Thread *thread)
Thread started successfully.
virtual firevision::Camera * register_for_raw_camera(const char *camera_string, fawkes::Thread *thread)
Register thread for camera.
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:181
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Camera interface for image aquiring devices in FireVision.
Definition: camera.h:35
void lock() const
Lock list.
Definition: lock_map.h:100
virtual void lock() const
Lock list.
Definition: lock_list.h:128
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
Fawkes library namespace.
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:157
Mutex locking helper.
Definition: mutex_locker.h:33
virtual firevision::Camera * register_for_camera(const char *camera_string, fawkes::Thread *thread, firevision::colorspace_t cspace=firevision::YUV422_PLANAR)
Register thread for camera.
Unknown camera type exception.
FvAqtVisionThreads * vision_threads
Vision threads assigned to this acquisition thread.
virtual void finalize()
Finalize the thread.
Definition: base_thread.cpp:85
Camera argument parser.
Definition: camargp.h:38
Thread class encapsulation of pthreads.
Definition: thread.h:42
FireVision base application acquisition thread.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:44
Vision Master Aspect.
Definition: vision_master.h:39
std::string cam_id() const
Get camera ID.
Definition: camargp.cpp:139
virtual firevision::VisionMaster * vision_master()
Get vision master.
Map with a lock.
Definition: lock_map.h:37
ThreadCollector * thread_collector
Thread collector.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:45
Thread aspect to use blocked timing.
virtual void release_camctrl(firevision::CameraControl *cc)
Release a camera control.
virtual firevision::CameraControl * acquire_camctrl(const char *cam_string)
Retrieve a CameraControl for the specified camera string.
Camera control interface base class.
Definition: control.h:33
Base class for exceptions in Fawkes.
Definition: exception.h:36
FvBaseThread()
Constructor.
Definition: base_thread.cpp:56
Thread aspect to use in FireVision apps.
Definition: vision.h:35
VisionThreadMode vision_thread_mode()
Get the vision thread mode of this thread.
Definition: vision.cpp:85
List with a lock.
Definition: thread.h:40
virtual void unregister_thread(fawkes::Thread *thread)
Unregister a thread.
virtual void unlock() const
Unlock list.
Definition: lock_list.h:144
void unlock() const
Unlock list.
Definition: lock_map.h:120
const char * name() const
Get name of thread.
Definition: thread.h:95
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
firevision::Camera * camera_instance(firevision::colorspace_t cspace, bool deep_copy)
Get a camera instance.
virtual void add(ThreadList &tl)=0
Add multiple threads.
virtual ~FvBaseThread()
Destructor.
Definition: base_thread.cpp:68
virtual void open()=0
Open the camera.
void add_waiting_thread(fawkes::Thread *thread)
Add a thread in waiting state.
cyclic mode, use if there is at least one cyclic thread for this acquisition thread.
std::string cam_type() const
Get camera type.
Definition: camargp.cpp:127
virtual void loop()
Thread loop.
virtual bool thread_init_failed(fawkes::Thread *thread)
Thread initialization failed.
virtual void init()
Initialize the thread.
Definition: base_thread.cpp:75
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:146
continuous mode, use if there are only continuous threads for this acquisition thread.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:341