vrpn  07.33
Virtual Reality Peripheral Network
vrpn_RedundantTransmission.C
Go to the documentation of this file.
1 #include <stdio.h> // for fprintf, stderr, fclose, etc
2 #include <string.h> // for NULL, memcpy
3 
5 
6 struct timeval;
7 
9  : d_connection(c)
10  , d_messageList(NULL)
11  , d_numMessagesQueued(0)
12  , d_numTransmissions(0)
13  , d_isEnabled(VRPN_FALSE)
14 {
15 
16  d_transmissionInterval.tv_sec = 0L;
17  d_transmissionInterval.tv_usec = 0L;
18 
19  if (d_connection) {
21  }
22 }
23 
25 {
26 
27  if (d_connection) {
29  }
30 }
31 
33 {
34  return d_numTransmissions;
35 }
36 
38 {
40 }
41 
43 {
44  return d_isEnabled;
45 }
46 
47 // virtual
49 {
50 
51  queuedMessage *qm;
52  queuedMessage **snitch;
53  timeval now;
54 
55  if (!d_connection) {
56  return;
57  }
58 
59  // fprintf(stderr, "mainloop: %d messages queued.\n", d_numMessagesQueued);
60 
61  vrpn_gettimeofday(&now, NULL);
62  for (qm = d_messageList; qm; qm = qm->next) {
63  if ((qm->remainingTransmissions > 0) &&
66  qm->p.type, qm->p.sender, qm->p.buffer,
70  // fprintf(stderr, "Sending message; "
71  //"%d transmissions remaining at %d.%d int.\n",
72  // qm->remainingTransmissions, qm->transmissionInterval.tv_sec,
73  // qm->transmissionInterval.tv_usec);
74  }
75  }
76 
77  snitch = &d_messageList;
78  qm = *snitch;
79 
80  while (qm) {
81  if (!qm->remainingTransmissions) {
82  *snitch = qm->next;
83  delete[](char *)qm -> p.buffer;
84  delete qm;
85  qm = *snitch;
87  }
88  else {
89  snitch = &qm->next;
90  qm = *snitch;
91  }
92  }
93 
96  fprintf(stderr, "vrpn_RedundantTransmission::mainloop(): "
97  "serious internal error.\n");
99  d_messageList = NULL;
100  }
101 }
102 
104 {
105  d_isEnabled = on;
106 
107  // fprintf(stderr, "vrpn_RedundantTransmission::enable(%s)\n",
108  // on ? "on" : "off");
109 }
110 
111 void vrpn_RedundantTransmission::setDefaults(vrpn_uint32 numTransmissions,
112  timeval transmissionInterval)
113 {
114 
115  // fprintf(stderr, "vrpn_RedundantTransmission::setDefaults: "
116  //"to %d, %d.%d\n", numTransmissions, transmissionInterval.tv_sec,
117  // transmissionInterval.tv_usec);
118 
119  d_numTransmissions = numTransmissions;
120  d_transmissionInterval = transmissionInterval;
121 }
122 
123 // virtual
125  vrpn_uint32 len, timeval time, vrpn_uint32 type, vrpn_uint32 sender,
126  const char *buffer, vrpn_uint32 class_of_service,
127  vrpn_int32 numTransmissions, timeval *transmissionInterval)
128 {
129  queuedMessage *qm;
130  int ret;
131  int i;
132 
133  if (!d_connection) {
134  fprintf(stderr, "vrpn_RedundantTransmission::pack_message: "
135  "Connection not defined!\n");
136  return -1;
137  }
138 
139  if (!d_isEnabled) {
140  return d_connection->pack_message(len, time, type, sender, buffer,
141  class_of_service);
142  }
143 
144  ret = d_connection->pack_message(len, time, type, sender, buffer,
146 
147  // TODO: check ret
148 
149  // use defaults?
150  if (numTransmissions < 0) {
151  numTransmissions = d_numTransmissions;
152  }
153  if (!transmissionInterval) {
154  transmissionInterval = &d_transmissionInterval;
155  }
156 
157  // fprintf(stderr, "In pack message with %d xmit at %d.%d\n",
158  // numTransmissions, transmissionInterval->tv_sec,
159  // transmissionInterval->tv_usec);
160 
161  if (!numTransmissions) {
162  return ret;
163  }
164 
165  // Special case - if transmissionInterval is 0, we send them all right
166  // away, but force VRPN to use separate network packets.
167  if (!transmissionInterval->tv_sec && !transmissionInterval->tv_usec) {
168  for (i = 0; i < numTransmissions; i++) {
170  ret = d_connection->pack_message(len, time, type, sender, buffer,
172  // TODO: check ret
173  }
175  return 0;
176  }
177 
178  qm = new queuedMessage;
179  if (!qm) {
180  fprintf(stderr,
181  "vrpn_RedundantTransmission::pack_message: "
182  "Out of memory; can't queue message for retransmission.\n");
183  return ret;
184  }
185 
186  qm->p.payload_len = len;
187  qm->p.msg_time = time;
188  qm->p.type = type;
189  qm->p.sender = sender;
190  qm->p.buffer = new char[len];
191  if (!qm->p.buffer) {
192  fprintf(stderr,
193  "vrpn_RedundantTransmission::pack_message: "
194  "Out of memory; can't queue message for retransmission.\n");
195  return ret;
196  }
197  memcpy((char *)qm->p.buffer, buffer, len);
198 
199  qm->remainingTransmissions = numTransmissions;
200  qm->transmissionInterval = *transmissionInterval;
201  qm->nextValidTime = vrpn_TimevalSum(time, *transmissionInterval);
202  qm->next = d_messageList;
203 
205 
206  // timeval now;
207  // vrpn_gettimeofday(&now, NULL);
208  // fprintf(stderr, " Queued message to go at %d.%d (now is %d.%d)\n",
209  // qm->nextValidTime.tv_sec, qm->nextValidTime.tv_usec,
210  // now.tv_sec, now.tv_usec);
211 
212  d_messageList = qm;
213 
214  return ret;
215 }
216 
217 char *vrpn_RedundantController_Protocol::encode_set(int *len, vrpn_uint32 num,
218  timeval interval)
219 {
220  char *buffer;
221  char *bp;
222  int buflen;
223 
224  buflen = sizeof(vrpn_uint32) + sizeof(timeval);
225  *len = buflen;
226  buffer = new char[buflen];
227  if (!buffer) {
228  fprintf(stderr, "vrpn_RedundantController_Protocol::encode_set: "
229  "Out of memory.\n");
230  return NULL;
231  }
232 
233  bp = buffer;
234  vrpn_buffer(&bp, &buflen, num);
235  vrpn_buffer(&bp, &buflen, interval);
236 
237  return buffer;
238 }
239 
241  vrpn_uint32 *num,
242  timeval *interval)
243 {
244  vrpn_unbuffer(buf, num);
245  vrpn_unbuffer(buf, interval);
246 }
247 
249 {
250  char *buffer;
251  char *bp;
252  int buflen;
253 
254  buflen = sizeof(vrpn_bool);
255  *len = buflen;
256  buffer = new char[buflen];
257  if (!buffer) {
258  fprintf(stderr, "vrpn_RedundantController_Protocol::encode_enable: "
259  "Out of memory.\n");
260  return NULL;
261  }
262 
263  bp = buffer;
264  vrpn_buffer(&bp, &buflen, on);
265 
266  return buffer;
267 }
268 
270  vrpn_bool *on)
271 {
272  vrpn_unbuffer(buf, on);
273 }
274 
276 {
277  d_set_type = c->register_message_type("vrpn_Red_Xmit_Ctrl set");
278  d_enable_type = c->register_message_type("vrpn_Red_Xmit_Ctrl enable");
279 }
280 
283  : vrpn_BaseClass("vrpn Redundant Transmission Controller", c)
284  , d_object(r)
285 {
286 
288 
289  // fprintf(stderr, "Registering set handler with type %d.\n",
290  // d_protocol.d_set_type);
291 
294 }
295 
297 
299 {
300  server_mainloop();
301 
302  // do nothing
303 }
304 
306 {
308 
309  return 0;
310 }
311 
312 // static
314 {
316  const char **bp = &p.buffer;
317  vrpn_uint32 num;
318  timeval interval;
319 
320  me->d_protocol.decode_set(bp, &num, &interval);
321  // fprintf(stderr, "vrpn_RedundantController::handle_set: %d, %d.%d\n",
322  // num, interval.tv_sec, interval.tv_usec);
323  me->d_object->setDefaults(num, interval);
324 
325  return 0;
326 }
327 
329 {
331  const char **bp = &p.buffer;
332  vrpn_bool on;
333 
334  me->d_protocol.decode_enable(bp, &on);
335  me->d_object->enable(on);
336 
337  return 0;
338 }
339 
341  : vrpn_BaseClass("vrpn Redundant Transmission Controller", c)
342 {
343 
345 }
346 
348 
350 {
351  client_mainloop();
352 
353  // do nothing
354 }
355 
356 void vrpn_RedundantRemote::set(int num, timeval interval)
357 {
358  char *buf;
359  int len = 0;
360  timeval now;
361 
362  buf = d_protocol.encode_set(&len, num, interval);
363  if (!buf) {
364  return;
365  }
366  // fprintf(stderr, "vrpn_RedundantRemote::set: %d, %d.%d\n",
367  // num, interval.tv_sec, interval.tv_usec);
368 
369  vrpn_gettimeofday(&now, NULL);
372 }
373 
375 {
376  char *buf;
377  int len = 0;
378  timeval now;
379 
380  buf = d_protocol.encode_enable(&len, on);
381  if (!buf) {
382  return;
383  }
384 
385  vrpn_gettimeofday(&now, NULL);
388 }
389 
391 {
393 
394  return 0;
395 }
396 
398  : nextTimestampToReplace(0)
399  , cb(NULL)
400  , handlerIsRegistered(vrpn_FALSE)
401 {
402  timeval zero;
403  int i;
404 
405  zero.tv_sec = 0;
406  zero.tv_usec = 0;
407 
408  for (i = 0; i < VRPN_RR_LENGTH; i++) {
409  timestampSeen[i] = zero;
410  numSeen[i] = 0;
411  }
412 }
413 
415  : d_connection(c)
416  , d_memory(NULL)
417  , d_lastMemory(NULL)
418  , d_record(VRPN_FALSE)
419 {
420 
421  if (d_connection) {
423  }
424 }
425 
427 {
428  vrpnMsgCallbackEntry *pVMCB, *pVMCB_Del;
429  int i;
430 
431  for (i = 0; i < vrpn_CONNECTION_MAX_TYPES; i++) {
432  pVMCB = d_records[i].cb;
433  while (pVMCB) {
434  pVMCB_Del = pVMCB;
435  pVMCB = pVMCB_Del->next;
436  delete pVMCB_Del;
437  }
438  }
439 
440  pVMCB = d_generic.cb;
441 
442  while (pVMCB) {
443  pVMCB_Del = pVMCB;
444  pVMCB = pVMCB_Del->next;
445  delete pVMCB_Del;
446  }
447 
448  if (d_connection) {
450  }
451 }
452 
453 // virtual
455  vrpn_MESSAGEHANDLER handler,
456  void *userdata, vrpn_int32 sender)
457 {
459  if (!ce) {
460  fprintf(stderr, "vrpn_RedundantReceiver::register_handler: "
461  "Out of memory.\n");
462  return -1;
463  }
464 
465  ce->handler = handler;
466  ce->userdata = userdata;
467  ce->sender = sender;
468 
469  if (type == vrpn_ANY_TYPE) {
470  ce->next = d_generic.cb;
471  d_generic.cb = ce;
472  return 0;
473  }
474  else if (type < 0) {
475  fprintf(stderr, "vrpn_RedundantReceiver::register_handler: "
476  "Negative type passed in.\n");
477  return -1;
478  }
479  else {
480  ce->next = d_records[type].cb;
481  d_records[type].cb = ce;
482  }
483 
484  if (!d_records[type].handlerIsRegistered) {
486  this, sender);
487  d_records[type].handlerIsRegistered = VRPN_TRUE;
488  }
489 
490  return 0;
491 }
492 
493 // virtual
495  vrpn_MESSAGEHANDLER handler,
496  void *userdata,
497  vrpn_int32 sender)
498 {
499  // The pointer at *snitch points to victim
500  vrpnMsgCallbackEntry *victim, **snitch;
501 
502  // Find a handler with this registry in the list (any one will do,
503  // since all duplicates are the same).
504  if (type == vrpn_ANY_TYPE) {
505  snitch = &d_generic.cb;
506  }
507  else {
508  snitch = &(d_records[type].cb);
509  }
510  victim = *snitch;
511  while ((victim != NULL) &&
512  ((victim->handler != handler) || (victim->userdata != userdata) ||
513  (victim->sender != sender))) {
514  snitch = &((*snitch)->next);
515  victim = victim->next;
516  }
517 
518  // Make sure we found one
519  if (victim == NULL) {
520  fprintf(stderr,
521  "vrpn_TypeDispatcher::removeHandler: No such handler\n");
522  return -1;
523  }
524 
525  // Remove the entry from the list
526  *snitch = victim->next;
527  delete victim;
528 
529  return 0;
530 }
531 
532 void vrpn_RedundantReceiver::record(vrpn_bool on) { d_record = on; }
533 
534 void vrpn_RedundantReceiver::writeMemory(const char *filename)
535 {
536  FILE *fp;
537  RRMemory *mp;
538 
539  if (!d_memory) {
540  fprintf(stderr, "vrpn_RedundantReceiver::writeMemory: "
541  "Memory is empty.\n");
542  return;
543  }
544 
545  // BUG: This doesn't write out the records of the things that
546  // are still in the active list! As long as our network tests
547  // are recording a few seconds beyond the end of the interesting
548  // period & trimming them back that's OK, but if this code is ever
549  // used for precise timings there will be slight undersampling.
550 
551  fp = fopen(filename, "wb");
552  if (!fp) {
553  fprintf(stderr, "vrpn_RedundantReceiver::writeMemory: "
554  "Couldn't open %s for writing.\n",
555  filename);
556  return;
557  }
558 
559  for (mp = d_memory; mp; mp = mp->next) {
560  fprintf(fp, "%ld.%ld %d\n", mp->timestamp.tv_sec,
561  static_cast<long>(mp->timestamp.tv_usec), mp->numSeen);
562  }
563 
564  fclose(fp);
565 }
566 
568 {
569  RRMemory *mp;
570 
571  for (mp = d_memory; d_memory; mp = d_memory) {
572  d_memory = mp->next;
573  delete mp;
574  }
575 
576  d_lastMemory = NULL;
577 }
578 
579 // static
582 {
585  RRMemory *memory;
586  int ntr;
587  int i;
588 
589  for (i = 0; i < VRPN_RR_LENGTH; i++) {
590  if ((p.msg_time.tv_sec ==
591  me->d_records[p.type].timestampSeen[i].tv_sec) &&
592  (p.msg_time.tv_usec ==
593  me->d_records[p.type].timestampSeen[i].tv_usec)) {
594 
595  // We've seen this one (recently); it's a redundant send whose
596  // predecessor didn't get lost, so this one is unnecessary FEC.
597  // Discard it.
598 
599  // fprintf(stderr, "Threw out redundant copy of type %d.\n",
600  // p.type);
601 
602  me->d_records[p.type].numSeen[i]++;
603 
604  return 0;
605  }
606  }
607 
609 
610  // Store a record of how many copies of this message we received so
611  // we can compute loss later (knowing what level of redundancy was being
612  // used).
613 
614  if (me->d_record) {
615  if (me->d_records[p.type].numSeen[ntr]) {
616  memory = new RRMemory;
617  if (!memory) {
618  fprintf(stderr,
619  "vrpn_RedundantReceiver::"
620  "handle_possiblyRedundantMessage: Out of memory.\n");
621  return -1;
622  }
623 
624  memory->timestamp = me->d_records[p.type].timestampSeen[ntr];
625  memory->numSeen = me->d_records[p.type].numSeen[ntr];
626  memory->next = NULL;
627 
628  if (me->d_lastMemory) {
629  me->d_lastMemory->next = memory;
630  }
631  else {
632  me->d_memory = memory;
633  }
634  me->d_lastMemory = memory;
635  }
636  }
637 
638  me->d_records[p.type].timestampSeen[ntr] = p.msg_time;
639  me->d_records[p.type].numSeen[ntr] = 1;
641 
642  for (who = me->d_generic.cb; who; who = who->next) {
643  if ((who->sender == vrpn_ANY_SENDER) || (who->sender == p.sender)) {
644  if (who->handler(who->userdata, p)) {
645  fprintf(stderr, "vrpn_RedundantReceiver::"
646  "handle_possiblyRedundantMessage: "
647  "Nonzero user generic handler return.\n");
648  return -1;
649  }
650  }
651  }
652 
653  for (who = me->d_records[p.type].cb; who; who = who->next) {
654  // Verify that the sender is ANY or matches
655  if ((who->sender == vrpn_ANY_SENDER) || (who->sender == p.sender)) {
656  if (who->handler(who->userdata, p)) {
657  fprintf(stderr, "vrpn_RedundantReceiver::"
658  "handle_possiblyRedundantMessage: "
659  "Nonzero user handler return.\n");
660  return -1;
661  }
662  }
663  }
664 
665  return 0;
666 }
void clearMemory(void)
Throws away / resets statistics.
const vrpn_uint32 vrpn_CONNECTION_LOW_LATENCY
const int vrpn_ANY_TYPE
vrpn_ANY_TYPE can be used to register callbacks for any USER type of message from a given sender....
void server_mainloop(void)
Handles functions that all servers should provide in their mainloop() (ping/pong, for example) Should...
struct timeval msg_time
VRPN_API int vrpn_unbuffer(const char **buffer, timeval *t)
Utility routine for taking a struct timeval from a buffer that was sent as a message.
Definition: vrpn_Shared.C:312
static int VRPN_CALLBACK handle_enable(void *, vrpn_HANDLERPARAM)
void record(vrpn_bool)
Turns "memory" (tracking statistics of redundant reception) on and off.
vrpn_RedundantController_Protocol d_protocol
const vrpn_uint32 vrpn_CONNECTION_RELIABLE
Classes of service for messages, specify multiple by ORing them together Priority of satisfying these...
vrpn_int32 payload_len
Description of a callback entry for a user type.
void client_mainloop(void)
Handles functions that all clients should provide in their mainloop() (warning of no server,...
virtual int register_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
virtual int unregister_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
int register_types(void)
Register the types of messages this device sends/receives. Return 0 on success, -1 on fail.
static int VRPN_CALLBACK handle_set(void *, vrpn_HANDLERPARAM)
const char * buffer
virtual int send_pending_reports(void)=0
send pending report, clear the buffer. This function was protected, now is public,...
virtual int pack_message(vrpn_uint32 len, timeval time, vrpn_uint32 type, vrpn_uint32 sender, const char *buffer, vrpn_uint32 class_of_service, vrpn_int32 numRetransmissions=-1, timeval *transmissionInterval=NULL)
If !isEnabled(), does a normal pack_message(), but if isEnabled() ignores class_of_service and sends ...
Generic connection class not specific to the transport mechanism.
virtual int register_types(void)
Register the types of messages this device sends/receives. Return 0 on success, -1 on fail.
vrpn_MESSAGEHANDLER handler
Routine to call.
vrpn_RedundantTransmission(vrpn_Connection *c)
virtual int register_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
Set up (or remove) a handler for a message of a given type. Optionally, specify which sender to handl...
int(VRPN_CALLBACK * vrpn_MESSAGEHANDLER)(void *userdata, vrpn_HANDLERPARAM p)
Type of a message handler for vrpn_Connection messages.
Accepts commands over a connection to control a local vrpn_RedundantTransmission's default parameters...
void decode_enable(const char **buf, vrpn_bool *)
Helper class that eliminates duplicates; only the first instance of a message is delivered....
RRRecord d_records[vrpn_CONNECTION_MAX_TYPES]
virtual int init(void)
Initialize things that the constructor can't. Returns 0 on success, -1 on failure.
int register_autodeleted_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
Registers a handler with the connection, and remembers to delete at destruction.
void set(int numRetransmissions, timeval transmissionInterval)
static int VRPN_CALLBACK handle_possiblyRedundantMessage(void *, vrpn_HANDLERPARAM)
vrpn_Connection * d_connection
Connection that this object talks to.
This structure is what is passed to a vrpn_Connection message callback.
vrpn_uint32 defaultRetransmissions(void) const
virtual int pack_message(vrpn_uint32 len, struct timeval time, vrpn_int32 type, vrpn_int32 sender, const char *buffer, vrpn_uint32 class_of_service)
Pack a message that will be sent the next time mainloop() is called. Turn off the RELIABLE flag if yo...
void * userdata
Passed along.
vrpn_int32 sender
Only if from sender.
bool vrpn_TimevalGreater(const timeval &tv1, const timeval &tv2)
Definition: vrpn_Shared.C:113
virtual void mainloop(void)
Determines which messages need to be resent and queues them up on the connection for transmission.
vrpn_RedundantController_Protocol d_protocol
void mainloop(void)
Called once through each main loop iteration to handle updates. Remote object mainloop() should call ...
vrpn_RedundantController(vrpn_RedundantTransmission *, vrpn_Connection *)
vrpn_RedundantRemote(vrpn_Connection *)
const int vrpn_CONNECTION_MAX_TYPES
#define VRPN_RR_LENGTH
virtual void setDefaults(vrpn_uint32 numRetransmissions, timeval transmissionInterval)
Set default values for future calls to pack_message().
#define vrpn_gettimeofday
Definition: vrpn_Shared.h:89
void decode_set(const char **buf, vrpn_uint32 *num, timeval *interval)
VRPN_API int vrpn_buffer(char **insertPt, vrpn_int32 *buflen, const timeval t)
Utility routine for placing a timeval struct into a buffer that is to be sent as a message.
Definition: vrpn_Shared.C:241
vrpn_RedundantReceiver(vrpn_Connection *)
void writeMemory(const char *filename)
Writes statistics to the named file: timestamp of every message received and number of copies of that...
Class from which all user-level (and other) classes that communicate with vrpn_Connections should der...
vrpn_RedundantTransmission * d_object
vrpnMsgCallbackEntry * next
Next handler.
vrpn_int32 d_sender_id
Sender ID registered with the connection.
char * encode_set(int *len, vrpn_uint32 num, timeval interval)
void mainloop(void)
Called once through each main loop iteration to handle updates. Remote object mainloop() should call ...
void addReference()
Counting references to this connection.
timeval vrpn_TimevalSum(const timeval &tv1, const timeval &tv2)
Definition: vrpn_Shared.C:45
Helper class for vrpn_Connection that automates redundant transmission for unreliable (low-latency) m...
vrpn_uint32 d_numMessagesQueued
For debugging, mostly.
virtual vrpn_int32 register_message_type(const char *name)
const int vrpn_ANY_SENDER
vrpn_ANY_SENDER can be used to register callbacks on a given message type from any sender.