dataqueue.cc

Go to the documentation of this file.
00001 ///
00002 /// \file       dataqueue.cc
00003 ///             FIFO queue of Data objects
00004 ///
00005 
00006 /*
00007     Copyright (C) 2008-2011, Net Direct Inc. (http://www.netdirect.ca/)
00008 
00009     This program is free software; you can redistribute it and/or modify
00010     it under the terms of the GNU General Public License as published by
00011     the Free Software Foundation; either version 2 of the License, or
00012     (at your option) any later version.
00013 
00014     This program is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00017 
00018     See the GNU General Public License in the COPYING file at the
00019     root directory of this project for more details.
00020 */
00021 
00022 #include "dataqueue.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "time.h"
00026 
00027 namespace Barry {
00028 
00029 //////////////////////////////////////////////////////////////////////////////
00030 // DataQueue class
00031 
00032 DataQueue::DataQueue()
00033 {
00034         pthread_mutex_init(&m_waitMutex, NULL);
00035         pthread_cond_init(&m_waitCond, NULL);
00036 
00037         pthread_mutex_init(&m_accessMutex, NULL);
00038 }
00039 
00040 DataQueue::~DataQueue()
00041 {
00042         scoped_lock lock(m_accessMutex);        // FIXME - is this sane?
00043 
00044         while( m_queue.size() ) {
00045                 delete m_queue.front();
00046                 m_queue.pop();
00047         }
00048 }
00049 
00050 //
00051 // push
00052 //
00053 /// Pushes data into the end of the queue.
00054 ///
00055 /// The queue owns this pointer as soon as the function is
00056 /// called.  In the case of an exception, it will be freed.
00057 /// Performs a thread broadcast once new data has been added.
00058 ///
00059 void DataQueue::push(Data *data)
00060 {
00061         try {
00062 
00063                 {
00064                         scoped_lock lock(m_accessMutex);
00065                         m_queue.push(data);
00066                 }
00067 
00068                 scoped_lock wait(m_waitMutex);
00069                 pthread_cond_broadcast(&m_waitCond);
00070 
00071         }
00072         catch(...) {
00073                 delete data;
00074                 throw;
00075         }
00076 }
00077 
00078 //
00079 // pop
00080 //
00081 /// Pops the next element off the front of the queue.
00082 ///
00083 /// Returns 0 if empty.
00084 /// The queue no longer owns this pointer upon return.
00085 ///
00086 Data* DataQueue::pop()
00087 {
00088         scoped_lock lock(m_accessMutex);
00089 
00090         if( m_queue.size() == 0 )
00091                 return 0;
00092 
00093         Data *ret = m_queue.front();
00094         m_queue.pop();
00095         return ret;
00096 }
00097 
00098 //
00099 // wait_pop
00100 //
00101 /// Pops the next element off the front of the queue, and
00102 /// waits until one exists if empty.  If still no data
00103 /// on timeout, returns null.
00104 /// (unlock the access mutex while waiting!)
00105 ///
00106 /// Timeout specified in milliseconds.  Default is wait forever.
00107 ///
00108 Data* DataQueue::wait_pop(int timeout)
00109 {
00110         Data *ret = 0;
00111 
00112         // check if something's there already
00113         {
00114                 scoped_lock access(m_accessMutex);
00115                 if( m_queue.size() ) {
00116                         ret = m_queue.front();
00117                         m_queue.pop();
00118                         return ret;
00119                 }
00120         }
00121 
00122         // nothing there, so wait...
00123 
00124         if( timeout == -1 ) {
00125                 // no timeout
00126                 int size = 0;
00127                 do {
00128                         {
00129                                 scoped_lock wait(m_waitMutex);
00130                                 pthread_cond_wait(&m_waitCond, &m_waitMutex);
00131                         }
00132 
00133                         // anything there?
00134                         scoped_lock access(m_accessMutex);
00135                         size = m_queue.size();
00136                         if( size != 0 ) {
00137                                 // already have the lock, return now
00138                                 ret = m_queue.front();
00139                                 m_queue.pop();
00140                                 return ret;
00141                         }
00142 
00143                 } while( size == 0 );
00144         }
00145         else {
00146                 // timeout in conditional wait
00147                 struct timespec to;
00148                 scoped_lock wait(m_waitMutex);
00149                 pthread_cond_timedwait(&m_waitCond, &m_waitMutex,
00150                         ThreadTimeout(timeout, &to));
00151         }
00152 
00153         scoped_lock access(m_accessMutex);
00154         if( m_queue.size() == 0 )
00155                 return 0;
00156 
00157         ret = m_queue.front();
00158         m_queue.pop();
00159         return ret;
00160 }
00161 
00162 //
00163 // append_from
00164 //
00165 /// Pops all data from other and appends it to this.
00166 ///
00167 /// After calling this function, other will be empty, and
00168 /// this will contain all its data.
00169 ///
00170 /// In the case of an exception, any uncopied data will
00171 /// remain in other.
00172 ///
00173 /// This is a locking optimization, so all copying can happen
00174 /// inside one lock, instead of locking for each copy.
00175 ///
00176 void DataQueue::append_from(DataQueue &other)
00177 {
00178         scoped_lock us(m_accessMutex);
00179         scoped_lock them(other.m_accessMutex);
00180 
00181         while( other.m_queue.size() ) {
00182                 m_queue.push( other.m_queue.front() );
00183 
00184                 // only pop after the copy, since in the
00185                 // case of an exception we want to leave other intact
00186                 other.m_queue.pop();
00187         }
00188 }
00189 
00190 //
00191 // empty
00192 //
00193 /// Returns true if the queue is empty.
00194 ///
00195 bool DataQueue::empty() const
00196 {
00197         scoped_lock access(m_accessMutex);
00198         return m_queue.size() == 0;
00199 }
00200 
00201 //
00202 // size
00203 //
00204 /// Returns number of items in the queue.
00205 ///
00206 size_t DataQueue::size() const
00207 {
00208         scoped_lock access(m_accessMutex);
00209         return m_queue.size();
00210 }
00211 
00212 } // namespace Barry
00213 
00214 
00215 #ifdef __DQ_TEST_MODE__
00216 
00217 #include <iostream>
00218 
00219 using namespace std;
00220 using namespace Barry;
00221 
00222 void *WriteThread(void *userdata)
00223 {
00224         DataQueue *dq = (DataQueue*) userdata;
00225 
00226         dq->push( new Data );
00227         dq->push( new Data );
00228         sleep(5);
00229         dq->push( new Data );
00230 
00231         return 0;
00232 }
00233 
00234 void *ReadThread(void *userdata)
00235 {
00236         DataQueue *dq = (DataQueue*) userdata;
00237 
00238         sleep(1);
00239         if( Data *d = dq->pop() ) {
00240                 cout << "Received via pop: " << d << endl;
00241                 delete d;
00242         }
00243         else {
00244                 cout << "No data in the queue yet." << endl;
00245         }
00246 
00247         while( Data *d = dq->wait_pop(5010) ) {
00248                 cout << "Received: " << d << endl;
00249                 delete d;
00250         }
00251         return 0;
00252 }
00253 
00254 int main()
00255 {
00256         DataQueue from;
00257         from.push( new Data );
00258 
00259         DataQueue dq;
00260         dq.append_from(from);
00261 
00262         pthread_t t1, t2;
00263         pthread_create(&t1, NULL, &ReadThread, &dq);
00264         pthread_create(&t2, NULL, &WriteThread, &dq);
00265 
00266         pthread_join(t2, NULL);
00267         pthread_join(t1, NULL);
00268 }
00269 
00270 #endif
00271 

Generated on Tue Mar 1 17:50:15 2011 for Barry by  doxygen 1.5.6