Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * qa_bb_remote.cpp - BlackBoard remote access QA 00004 * 00005 * Created: Mon Mar 03 17:31:18 2008 00006 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 00025 /// @cond QA 00026 00027 #include <blackboard/local.h> 00028 #include <blackboard/remote.h> 00029 #include <blackboard/exceptions.h> 00030 #include <blackboard/bbconfig.h> 00031 #include <blackboard/interface_listener.h> 00032 00033 #include <interfaces/TestInterface.h> 00034 00035 #include <interface/interface_info.h> 00036 #include <core/exceptions/system.h> 00037 #include <netcomm/fawkes/client.h> 00038 #include <netcomm/fawkes/server_thread.h> 00039 #include <utils/time/time.h> 00040 00041 #include <signal.h> 00042 #include <cstdlib> 00043 #include <cstring> 00044 #include <cstdio> 00045 00046 #include <iostream> 00047 #include <vector> 00048 00049 using namespace std; 00050 using namespace fawkes; 00051 00052 00053 bool quit = false; 00054 00055 void 00056 signal_handler(int signum) 00057 { 00058 quit = true; 00059 } 00060 00061 00062 #define NUM_CHUNKS 5 00063 00064 void 00065 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer) 00066 { 00067 while (! quit) { 00068 int expval = ti_reader->test_int() + 1; 00069 TestInterface::SetTestIntMessage *m = new TestInterface::SetTestIntMessage(expval); 00070 unsigned int msgid = ti_reader->msgq_enqueue(m); 00071 printf("Sent with message ID %u\n", msgid); 00072 00073 if ( ti_writer->msgq_size() > 1 ) { 00074 cout << "Error, more than one message! flushing." << endl; 00075 ti_writer->msgq_flush(); 00076 } 00077 00078 usleep(100000); 00079 00080 if ( ti_writer->msgq_first() != NULL ) { 00081 if ( ti_writer->msgq_first_is<TestInterface::SetTestStringMessage>() ) { 00082 TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg); 00083 printf("Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n", msg->id()); 00084 } 00085 if ( ti_writer->msgq_first_is<TestInterface::SetTestIntMessage>() ) { 00086 TestInterface::SetTestIntMessage *m2 = ti_writer->msgq_first<TestInterface::SetTestIntMessage>(); 00087 printf("Received message with ID %u (enqueue time: %s)\n", m2->id(), m2->time_enqueued()->str()); 00088 ti_writer->set_test_int( m2->test_int() ); 00089 try { 00090 ti_writer->write(); 00091 } catch (InterfaceWriteDeniedException &e) { 00092 cout << "BUG: caught write denied exception" << endl; 00093 e.print_trace(); 00094 } 00095 ti_writer->msgq_pop(); 00096 } else { 00097 cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl; 00098 } 00099 00100 usleep(100000); 00101 00102 //cout << "Reading value from reader interface.. " << flush; 00103 ti_reader->read(); 00104 int val = ti_reader->test_int(); 00105 if ( val == expval ) { 00106 //cout << " success, value is " << ti_reader->test_int() << " as expected" << endl; 00107 } else { 00108 cout << " failure, value is " << ti_reader->test_int() << ", expected " 00109 << expval << endl; 00110 } 00111 } else { 00112 printf("No message in queue, if network test this means the message was dropped\n"); 00113 } 00114 00115 usleep(10); 00116 } 00117 } 00118 00119 class SyncInterfaceListener : public fawkes::BlackBoardInterfaceListener 00120 { 00121 public: 00122 SyncInterfaceListener(fawkes::Interface *reader, 00123 fawkes::Interface *writer, 00124 fawkes::BlackBoard *reader_bb, 00125 fawkes::BlackBoard *writer_bb) 00126 : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id()) 00127 { 00128 __reader = reader; 00129 __writer = writer; 00130 __reader_bb = reader_bb; 00131 __writer_bb = writer_bb; 00132 00133 bbil_add_data_interface(__reader); 00134 bbil_add_message_interface(__writer); 00135 00136 __reader_bb->register_listener(this); 00137 __writer_bb->register_listener(this); 00138 } 00139 00140 00141 /** Destructor. */ 00142 ~SyncInterfaceListener() 00143 { 00144 __reader_bb->unregister_listener(this); 00145 __writer_bb->unregister_listener(this); 00146 } 00147 00148 00149 bool 00150 bb_interface_message_received(Interface *interface, 00151 Message *message) throw() 00152 { 00153 try { 00154 if ( interface == __writer ) { 00155 printf("%s: Forwarding message\n", bbil_name()); 00156 Message *m = message->clone(); 00157 m->set_hops(message->hops()); 00158 m->ref(); 00159 __reader->msgq_enqueue(m); 00160 message->set_id(m->id()); 00161 m->unref(); 00162 return false; 00163 } else { 00164 // Don't know why we were called, let 'em enqueue 00165 printf("%s: Message received for unknown interface\n", bbil_name()); 00166 return true; 00167 } 00168 } catch (Exception &e) { 00169 printf("%s: Exception when message received\n", bbil_name()); 00170 e.print_trace(); 00171 return false; 00172 } 00173 } 00174 00175 00176 void 00177 bb_interface_data_changed(Interface *interface) throw() 00178 { 00179 try { 00180 if ( interface == __reader ) { 00181 //__logger->log_debug(bbil_name(), "Copying data"); 00182 __reader->read(); 00183 __writer->copy_values(__reader); 00184 __writer->write(); 00185 } else { 00186 // Don't know why we were called, let 'em enqueue 00187 printf("%s: Data changed for unknown interface", bbil_name()); 00188 } 00189 } catch (Exception &e) { 00190 printf("%s: Exception when data changed\n", bbil_name()); 00191 e.print_trace(); 00192 } 00193 } 00194 00195 private: 00196 fawkes::Interface *__writer; 00197 fawkes::Interface *__reader; 00198 00199 fawkes::BlackBoard *__writer_bb; 00200 fawkes::BlackBoard *__reader_bb; 00201 00202 }; 00203 00204 00205 int 00206 main(int argc, char **argv) 00207 { 00208 signal(SIGINT, signal_handler); 00209 00210 LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE); 00211 BlackBoard *lbb = llbb; 00212 00213 FawkesNetworkServerThread *fns = new FawkesNetworkServerThread(1910); 00214 fns->start(); 00215 00216 llbb->start_nethandler(fns); 00217 00218 BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910); 00219 00220 InterfaceInfoList *infl = rbb->list_all(); 00221 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) { 00222 const unsigned char *hash = (*i).hash(); 00223 char phash[__INTERFACE_HASH_SIZE * 2 + 1]; 00224 memset(phash, 0, sizeof(phash)); 00225 for (unsigned int j = 0; j < __INTERFACE_HASH_SIZE; ++j) { 00226 sprintf(&phash[j * 2], "%02x", hash[j]); 00227 } 00228 printf("%s::%s (%s), w:%i r:%u s:%u\n", 00229 (*i).type(), (*i).id(), phash, (*i).has_writer(), 00230 (*i).num_readers(), (*i).serial()); 00231 } 00232 delete infl; 00233 00234 //TestInterface *ti_writer; 00235 TestInterface *ti_reader; 00236 TestInterface *ti_writer; 00237 try { 00238 cout << "Opening interfaces.. " << flush; 00239 ti_writer = rbb->open_for_writing<TestInterface>("SomeID"); 00240 ti_reader = rbb->open_for_reading<TestInterface>("SomeID"); 00241 cout << "success, " 00242 << "writer hash=" << ti_writer->hash_printable() 00243 << " reader hash=" << ti_reader->hash_printable() 00244 << endl; 00245 } catch (Exception &e) { 00246 cout << "failed! Aborting" << endl; 00247 e.print_trace(); 00248 exit(1); 00249 } 00250 00251 try { 00252 cout << "Trying to open second writer.. " << flush; 00253 TestInterface *ti_writer_two; 00254 ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID"); 00255 rbb->close(ti_writer_two); 00256 cout << "BUG: Detection of second writer did NOT work!" << endl; 00257 exit(2); 00258 } catch (BlackBoardWriterActiveException &e) { 00259 cout << "exception caught as expected, detected and prevented second writer!" << endl; 00260 } 00261 00262 try { 00263 cout << "Trying to open third writer.. " << flush; 00264 TestInterface *ti_writer_three; 00265 ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID"); 00266 cout << "No exception as expected, different ID ok!" << endl; 00267 rbb->close(ti_writer_three); 00268 } catch (BlackBoardWriterActiveException &e) { 00269 cout << "BUG: Third writer with different ID detected as another writer!" << endl; 00270 exit(3); 00271 } 00272 00273 cout << endl << endl 00274 << "Running data tests ==================================================" << endl; 00275 00276 cout << "Writing initial value (" 00277 << TestInterface::TEST_CONSTANT << ") into interface as TestInt" << endl; 00278 ti_writer->set_test_int( TestInterface::TEST_CONSTANT ); 00279 try { 00280 ti_writer->write(); 00281 } catch (InterfaceWriteDeniedException &e) { 00282 cout << "BUG: caught write denied exception" << endl; 00283 e.print_trace(); 00284 } 00285 00286 cout << "Giving some time to have value processed" << endl; 00287 usleep(100000); 00288 00289 cout << "Reading value from reader interface.. " << flush; 00290 ti_reader->read(); 00291 int val = ti_reader->test_int(); 00292 if ( val == TestInterface::TEST_CONSTANT ) { 00293 cout << " success, value is " << ti_reader->test_int() << " as expected" << endl; 00294 } else { 00295 cout << " failure, value is " << ti_reader->test_int() << ", expected " 00296 << TestInterface::TEST_CONSTANT << endl; 00297 } 00298 00299 cout << "Closing interfaces.. " << flush; 00300 try { 00301 rbb->close(ti_reader); 00302 rbb->close(ti_writer); 00303 cout << "done" << endl; 00304 } catch (Exception &e) { 00305 cout << "failed" << endl; 00306 e.print_trace(); 00307 } 00308 00309 cout << endl << endl << "Starting MESSAGING tests" << endl 00310 << "Press Ctrl-C to continue with next test" << endl << endl; 00311 00312 ti_writer = lbb->open_for_writing<TestInterface>("Messaging"); 00313 ti_reader = rbb->open_for_reading<TestInterface>("Messaging"); 00314 00315 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16); 00316 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16); 00317 00318 test_messaging(ti_reader, ti_writer); 00319 00320 rbb->close(ti_reader); 00321 lbb->close(ti_writer); 00322 00323 cout << endl << endl << "Starting MESSAGING tests, doing repeater scenario" << endl 00324 << "Press Ctrl-C to continue with next test" << endl << endl; 00325 quit = false; 00326 00327 delete rbb; 00328 00329 LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE); 00330 00331 FawkesNetworkServerThread *repfns = new FawkesNetworkServerThread(1911); 00332 repfns->start(); 00333 00334 repllbb->start_nethandler(repfns); 00335 00336 BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911); 00337 rbb = new RemoteBlackBoard("localhost", 1911); 00338 00339 TestInterface *rep_reader; 00340 TestInterface *rep_writer; 00341 00342 ti_writer = rbb->open_for_writing<TestInterface>("Messaging"); 00343 ti_reader = lbb->open_for_reading<TestInterface>("Messaging"); 00344 00345 rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging"); 00346 rep_writer = lbb->open_for_writing<TestInterface>("Messaging"); 00347 00348 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16); 00349 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16); 00350 00351 SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb); 00352 00353 test_messaging(ti_reader, ti_writer); 00354 00355 delete sil; 00356 lbb->close(ti_reader); 00357 rbb->close(ti_writer); 00358 rep_rbb->close(rep_reader); 00359 lbb->close(rep_writer); 00360 delete repllbb; 00361 delete rep_rbb; 00362 00363 cout << "Tests done" << endl; 00364 00365 delete rbb; 00366 delete llbb; 00367 delete fns; 00368 } 00369 00370 00371 /// @endcond