Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "dataqueue.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "time.h"
00026
00027 namespace Barry {
00028
00029
00030
00031
00032 DataQueue::DataQueue()
00033 {
00034 pthread_mutex_init(&m_waitMutex, NULL);
00035 pthread_cond_init(&m_waitCond, NULL);
00036
00037 pthread_mutex_init(&m_accessMutex, NULL);
00038 }
00039
00040 DataQueue::~DataQueue()
00041 {
00042 scoped_lock lock(m_accessMutex);
00043
00044 while( m_queue.size() ) {
00045 delete m_queue.front();
00046 m_queue.pop();
00047 }
00048 }
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059 void DataQueue::push(Data *data)
00060 {
00061 try {
00062
00063 {
00064 scoped_lock lock(m_accessMutex);
00065 m_queue.push(data);
00066 }
00067
00068 scoped_lock wait(m_waitMutex);
00069 pthread_cond_broadcast(&m_waitCond);
00070
00071 }
00072 catch(...) {
00073 delete data;
00074 throw;
00075 }
00076 }
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 Data* DataQueue::pop()
00087 {
00088 scoped_lock lock(m_accessMutex);
00089
00090 if( m_queue.size() == 0 )
00091 return 0;
00092
00093 Data *ret = m_queue.front();
00094 m_queue.pop();
00095 return ret;
00096 }
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108 Data* DataQueue::wait_pop(int timeout)
00109 {
00110 Data *ret = 0;
00111
00112
00113 {
00114 scoped_lock access(m_accessMutex);
00115 if( m_queue.size() ) {
00116 ret = m_queue.front();
00117 m_queue.pop();
00118 return ret;
00119 }
00120 }
00121
00122
00123
00124 if( timeout == -1 ) {
00125
00126 int size = 0;
00127 do {
00128 {
00129 scoped_lock wait(m_waitMutex);
00130 pthread_cond_wait(&m_waitCond, &m_waitMutex);
00131 }
00132
00133
00134 scoped_lock access(m_accessMutex);
00135 size = m_queue.size();
00136 if( size != 0 ) {
00137
00138 ret = m_queue.front();
00139 m_queue.pop();
00140 return ret;
00141 }
00142
00143 } while( size == 0 );
00144 }
00145 else {
00146
00147 struct timespec to;
00148 scoped_lock wait(m_waitMutex);
00149 pthread_cond_timedwait(&m_waitCond, &m_waitMutex,
00150 ThreadTimeout(timeout, &to));
00151 }
00152
00153 scoped_lock access(m_accessMutex);
00154 if( m_queue.size() == 0 )
00155 return 0;
00156
00157 ret = m_queue.front();
00158 m_queue.pop();
00159 return ret;
00160 }
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176 void DataQueue::append_from(DataQueue &other)
00177 {
00178 scoped_lock us(m_accessMutex);
00179 scoped_lock them(other.m_accessMutex);
00180
00181 while( other.m_queue.size() ) {
00182 m_queue.push( other.m_queue.front() );
00183
00184
00185
00186 other.m_queue.pop();
00187 }
00188 }
00189
00190
00191
00192
00193
00194
00195 bool DataQueue::empty() const
00196 {
00197 scoped_lock access(m_accessMutex);
00198 return m_queue.size() == 0;
00199 }
00200
00201
00202
00203
00204
00205
00206 size_t DataQueue::size() const
00207 {
00208 scoped_lock access(m_accessMutex);
00209 return m_queue.size();
00210 }
00211
00212 }
00213
00214
00215 #ifdef __DQ_TEST_MODE__
00216
00217 #include <iostream>
00218
00219 using namespace std;
00220 using namespace Barry;
00221
00222 void *WriteThread(void *userdata)
00223 {
00224 DataQueue *dq = (DataQueue*) userdata;
00225
00226 dq->push( new Data );
00227 dq->push( new Data );
00228 sleep(5);
00229 dq->push( new Data );
00230
00231 return 0;
00232 }
00233
00234 void *ReadThread(void *userdata)
00235 {
00236 DataQueue *dq = (DataQueue*) userdata;
00237
00238 sleep(1);
00239 if( Data *d = dq->pop() ) {
00240 cout << "Received via pop: " << d << endl;
00241 delete d;
00242 }
00243 else {
00244 cout << "No data in the queue yet." << endl;
00245 }
00246
00247 while( Data *d = dq->wait_pop(5010) ) {
00248 cout << "Received: " << d << endl;
00249 delete d;
00250 }
00251 return 0;
00252 }
00253
00254 int main()
00255 {
00256 DataQueue from;
00257 from.push( new Data );
00258
00259 DataQueue dq;
00260 dq.append_from(from);
00261
00262 pthread_t t1, t2;
00263 pthread_create(&t1, NULL, &ReadThread, &dq);
00264 pthread_create(&t2, NULL, &WriteThread, &dq);
00265
00266 pthread_join(t2, NULL);
00267 pthread_join(t1, NULL);
00268 }
00269
00270 #endif
00271