00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028
00029 #define DO_DEBUG 0
00030
00031 template <class T> class circular_buffer
00032 {
00033 private:
00034
00035 T* d_buffer;
00036
00037
00038 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00039 UInt32 d_n_avail_write_I, d_n_avail_read_I;
00040
00041
00042 mld_mutex_ptr d_internal;
00043 mld_condition_ptr d_readBlock, d_writeBlock;
00044
00045
00046 bool d_doWriteBlock, d_doFullRead, d_doAbort;
00047
00048 void delete_mutex_cond () {
00049 if (d_internal) {
00050 delete d_internal;
00051 d_internal = NULL;
00052 }
00053 if (d_readBlock) {
00054 delete d_readBlock;
00055 d_readBlock = NULL;
00056 }
00057 if (d_writeBlock) {
00058 delete d_writeBlock;
00059 d_writeBlock = NULL;
00060 }
00061 };
00062
00063 public:
00064 circular_buffer (UInt32 bufLen_I,
00065 bool doWriteBlock = true, bool doFullRead = false) {
00066 if (bufLen_I == 0)
00067 throw std::runtime_error ("circular_buffer(): "
00068 "Number of items to buffer must be > 0.\n");
00069 d_bufLen_I = bufLen_I;
00070 d_buffer = (T*) new T[d_bufLen_I];
00071 d_doWriteBlock = doWriteBlock;
00072 d_doFullRead = doFullRead;
00073 d_internal = NULL;
00074 d_readBlock = d_writeBlock = NULL;
00075 reset ();
00076 #if DO_DEBUG
00077 fprintf (stderr, "c_b(): buf len (items) = %ld, "
00078 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00079 (d_doWriteBlock ? "true" : "false"),
00080 (d_doFullRead ? "true" : "false"));
00081 #endif
00082 };
00083
00084 ~circular_buffer () {
00085 delete_mutex_cond ();
00086 delete [] d_buffer;
00087 };
00088
00089 inline UInt32 n_avail_write_items () {
00090 d_internal->lock ();
00091 UInt32 retVal = d_n_avail_write_I;
00092 d_internal->unlock ();
00093 return (retVal);
00094 };
00095
00096 inline UInt32 n_avail_read_items () {
00097 d_internal->lock ();
00098 UInt32 retVal = d_n_avail_read_I;
00099 d_internal->unlock ();
00100 return (retVal);
00101 };
00102
00103 inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00104 inline bool do_write_block () {return (d_doWriteBlock);};
00105 inline bool do_full_read () {return (d_doFullRead);};
00106
00107 void reset () {
00108 d_doAbort = false;
00109 bzero (d_buffer, d_bufLen_I * sizeof (T));
00110 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00111 d_n_avail_write_I = d_bufLen_I;
00112 delete_mutex_cond ();
00113 d_internal = new mld_mutex ();
00114 d_readBlock = new mld_condition ();
00115 d_writeBlock = new mld_condition ();
00116 };
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139 int enqueue (T* buf, UInt32 bufLen_I) {
00140 #if DO_DEBUG
00141 fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
00142 "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
00143 d_n_avail_write_I, d_n_avail_read_I);
00144 #endif
00145 if (bufLen_I > d_bufLen_I) {
00146 fprintf (stderr, "cannot add buffer longer (%ld"
00147 ") than instantiated length (%ld"
00148 ").\n", bufLen_I, d_bufLen_I);
00149 throw std::runtime_error ("circular_buffer::enqueue()");
00150 }
00151
00152 if (bufLen_I == 0)
00153 return (0);
00154 if (!buf)
00155 throw std::runtime_error ("circular_buffer::enqueue(): "
00156 "input buffer is NULL.\n");
00157 d_internal->lock ();
00158 if (d_doAbort) {
00159 d_internal->unlock ();
00160 return (2);
00161 }
00162 if (bufLen_I > d_n_avail_write_I) {
00163 if (d_doWriteBlock) {
00164 while (bufLen_I > d_n_avail_write_I) {
00165 #if DO_DEBUG
00166 fprintf (stderr, "enqueue: #len > #a, waiting.\n");
00167 #endif
00168 d_internal->unlock ();
00169 d_writeBlock->wait ();
00170 d_internal->lock ();
00171 if (d_doAbort) {
00172 d_internal->unlock ();
00173 #if DO_DEBUG
00174 fprintf (stderr, "enqueue: #len > #a, aborting.\n");
00175 #endif
00176 return (2);
00177 }
00178 #if DO_DEBUG
00179 fprintf (stderr, "enqueue: #len > #a, done waiting.\n");
00180 #endif
00181 }
00182 } else {
00183 d_n_avail_read_I = d_bufLen_I - bufLen_I;
00184 d_n_avail_write_I = bufLen_I;
00185 #if DO_DEBUG
00186 fprintf (stderr, "circular_buffer::enqueue: overflow\n");
00187 #endif
00188 return (-1);
00189 }
00190 }
00191 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00192 if (n_now_I > bufLen_I)
00193 n_now_I = bufLen_I;
00194 else if (n_now_I < bufLen_I)
00195 n_start_I = bufLen_I - n_now_I;
00196 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00197 if (n_start_I) {
00198 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00199 d_writeNdx_I = n_start_I;
00200 } else
00201 d_writeNdx_I += n_now_I;
00202 d_n_avail_read_I += bufLen_I;
00203 d_n_avail_write_I -= bufLen_I;
00204 d_readBlock->signal ();
00205 d_internal->unlock ();
00206 return (1);
00207 };
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230 int dequeue (T* buf, UInt32* bufLen_I) {
00231 #if DO_DEBUG
00232 fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
00233 "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
00234 d_n_avail_write_I, d_n_avail_read_I);
00235 #endif
00236 if (!bufLen_I)
00237 throw std::runtime_error ("circular_buffer::dequeue(): "
00238 "input bufLen pointer is NULL.\n");
00239 if (!buf)
00240 throw std::runtime_error ("circular_buffer::dequeue(): "
00241 "input buffer pointer is NULL.\n");
00242 UInt32 l_bufLen_I = *bufLen_I;
00243 if (l_bufLen_I == 0)
00244 return (0);
00245 if (l_bufLen_I > d_bufLen_I) {
00246 fprintf (stderr, "cannot remove buffer longer (%ld"
00247 ") than instantiated length (%ld"
00248 ").\n", l_bufLen_I, d_bufLen_I);
00249 throw std::runtime_error ("circular_buffer::dequeue()");
00250 }
00251
00252 d_internal->lock ();
00253 if (d_doAbort) {
00254 d_internal->unlock ();
00255 return (2);
00256 }
00257 if (d_doFullRead) {
00258 while (d_n_avail_read_I < l_bufLen_I) {
00259 #if DO_DEBUG
00260 fprintf (stderr, "dequeue: #a < #len, waiting.\n");
00261 #endif
00262 d_internal->unlock ();
00263 d_readBlock->wait ();
00264 d_internal->lock ();
00265 if (d_doAbort) {
00266 d_internal->unlock ();
00267 #if DO_DEBUG
00268 fprintf (stderr, "dequeue: #a < #len, aborting.\n");
00269 #endif
00270 return (2);
00271 }
00272 #if DO_DEBUG
00273 fprintf (stderr, "dequeue: #a < #len, done waiting.\n");
00274 #endif
00275 }
00276 } else {
00277 while (d_n_avail_read_I == 0) {
00278 #if DO_DEBUG
00279 fprintf (stderr, "dequeue: #a == 0, waiting.\n");
00280 #endif
00281 d_internal->unlock ();
00282 d_readBlock->wait ();
00283 d_internal->lock ();
00284 if (d_doAbort) {
00285 d_internal->unlock ();
00286 #if DO_DEBUG
00287 fprintf (stderr, "dequeue: #a == 0, aborting.\n");
00288 #endif
00289 return (2);
00290 }
00291 #if DO_DEBUG
00292 fprintf (stderr, "dequeue: #a == 0, done waiting.\n");
00293 #endif
00294 }
00295 }
00296 if (l_bufLen_I > d_n_avail_read_I)
00297 l_bufLen_I = d_n_avail_read_I;
00298 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00299 if (n_now_I > l_bufLen_I)
00300 n_now_I = l_bufLen_I;
00301 else if (n_now_I < l_bufLen_I)
00302 n_start_I = l_bufLen_I - n_now_I;
00303 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00304 if (n_start_I) {
00305 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00306 d_readNdx_I = n_start_I;
00307 } else
00308 d_readNdx_I += n_now_I;
00309 *bufLen_I = l_bufLen_I;
00310 d_n_avail_read_I -= l_bufLen_I;
00311 d_n_avail_write_I += l_bufLen_I;
00312 d_writeBlock->signal ();
00313 d_internal->unlock ();
00314 return (1);
00315 };
00316
00317 void abort () {
00318 d_internal->lock ();
00319 d_doAbort = true;
00320 d_writeBlock->signal ();
00321 d_readBlock->signal ();
00322 d_internal->unlock ();
00323 };
00324 };
00325
00326 #endif