circular_buffer.h

Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2006 Free Software Foundation, Inc.
00004  * 
00005  * This file is part of GNU Radio.
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 2, or (at your option)
00010  * any later version.
00011  * 
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025 
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028 
00029 #define DO_DEBUG 0
00030 
00031 template <class T> class circular_buffer
00032 {
00033 private:
00034 // the buffer to use
00035   T* d_buffer;
00036 
00037 // the following are in Items (type T)
00038   UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00039   UInt32 d_n_avail_write_I, d_n_avail_read_I;
00040 
00041 // stuff to control access to class internals
00042   mld_mutex_ptr d_internal;
00043   mld_condition_ptr d_readBlock, d_writeBlock;
00044 
00045 // booleans to decide how to control reading, writing, and aborting
00046   bool d_doWriteBlock, d_doFullRead, d_doAbort;
00047 
00048   void delete_mutex_cond () {
00049     if (d_internal) {
00050       delete d_internal;
00051       d_internal = NULL;
00052     }
00053     if (d_readBlock) {
00054       delete d_readBlock;
00055       d_readBlock = NULL;
00056     }
00057     if (d_writeBlock) {
00058       delete d_writeBlock;
00059       d_writeBlock = NULL;
00060     }
00061   };
00062 
00063 public:
00064   circular_buffer (UInt32 bufLen_I,
00065                    bool doWriteBlock = true, bool doFullRead = false) {
00066     if (bufLen_I == 0)
00067       throw std::runtime_error ("circular_buffer(): "
00068                                 "Number of items to buffer must be > 0.\n");
00069     d_bufLen_I = bufLen_I;
00070     d_buffer = (T*) new T[d_bufLen_I];
00071     d_doWriteBlock = doWriteBlock;
00072     d_doFullRead = doFullRead;
00073     d_internal = NULL;
00074     d_readBlock = d_writeBlock = NULL;
00075     reset ();
00076 #if DO_DEBUG
00077     fprintf (stderr, "c_b(): buf len (items) = %ld, "
00078              "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00079              (d_doWriteBlock ? "true" : "false"),
00080              (d_doFullRead ? "true" : "false"));
00081 #endif
00082   };
00083 
00084   ~circular_buffer () {
00085     delete_mutex_cond ();
00086     delete [] d_buffer;
00087   };
00088 
00089   inline UInt32 n_avail_write_items () {
00090     d_internal->lock ();
00091     UInt32 retVal = d_n_avail_write_I;
00092     d_internal->unlock ();
00093     return (retVal);
00094   };
00095 
00096   inline UInt32 n_avail_read_items () {
00097     d_internal->lock ();
00098     UInt32 retVal = d_n_avail_read_I;
00099     d_internal->unlock ();
00100     return (retVal);
00101   };
00102 
00103   inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00104   inline bool do_write_block () {return (d_doWriteBlock);};
00105   inline bool do_full_read () {return (d_doFullRead);};
00106 
00107   void reset () {
00108     d_doAbort = false;
00109     bzero (d_buffer, d_bufLen_I * sizeof (T));
00110     d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00111     d_n_avail_write_I = d_bufLen_I;
00112     delete_mutex_cond ();
00113     d_internal = new mld_mutex ();
00114     d_readBlock = new mld_condition ();
00115     d_writeBlock = new mld_condition ();
00116   };
00117 
00118 /*
00119  * enqueue: add the given buffer of item-length to the queue,
00120  *     first-in-first-out (FIFO).
00121  *
00122  * inputs:
00123  *     buf: a pointer to the buffer holding the data
00124  *
00125  *     bufLen_I: the buffer length in items (of the instantiated type)
00126  *
00127  * returns:
00128  *    -1: on overflow (write is not blocking, and data is being
00129  *                     written faster than it is being read)
00130  *     0: if nothing to do (0 length buffer)
00131  *     1: if success
00132  *     2: in the process of aborting, do doing nothing
00133  *
00134  * will throw runtime errors if inputs are improper:
00135  *     buffer pointer is NULL
00136  *     buffer length is larger than the instantiated buffer length
00137  */
00138 
00139   int enqueue (T* buf, UInt32 bufLen_I) {
00140 #if DO_DEBUG
00141     fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
00142              "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
00143              d_n_avail_write_I, d_n_avail_read_I);
00144 #endif
00145     if (bufLen_I > d_bufLen_I) {
00146       fprintf (stderr, "cannot add buffer longer (%ld"
00147                ") than instantiated length (%ld"
00148                ").\n", bufLen_I, d_bufLen_I);
00149       throw std::runtime_error ("circular_buffer::enqueue()");
00150     }
00151 
00152     if (bufLen_I == 0)
00153       return (0);
00154     if (!buf)
00155       throw std::runtime_error ("circular_buffer::enqueue(): "
00156                                 "input buffer is NULL.\n");
00157     d_internal->lock ();
00158     if (d_doAbort) {
00159       d_internal->unlock ();
00160       return (2);
00161     }
00162     if (bufLen_I > d_n_avail_write_I) {
00163       if (d_doWriteBlock) {
00164         while (bufLen_I > d_n_avail_write_I) {
00165 #if DO_DEBUG
00166           fprintf (stderr, "enqueue: #len > #a, waiting.\n");
00167 #endif
00168           d_internal->unlock ();
00169           d_writeBlock->wait ();
00170           d_internal->lock ();
00171           if (d_doAbort) {
00172             d_internal->unlock ();
00173 #if DO_DEBUG
00174             fprintf (stderr, "enqueue: #len > #a, aborting.\n");
00175 #endif
00176             return (2);
00177           }
00178 #if DO_DEBUG
00179           fprintf (stderr, "enqueue: #len > #a, done waiting.\n");
00180 #endif
00181         }
00182       } else {
00183         d_n_avail_read_I = d_bufLen_I - bufLen_I;
00184         d_n_avail_write_I = bufLen_I;
00185 #if DO_DEBUG
00186         fprintf (stderr, "circular_buffer::enqueue: overflow\n");
00187 #endif
00188         return (-1);
00189       }
00190     }
00191     UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00192     if (n_now_I > bufLen_I)
00193       n_now_I = bufLen_I;
00194     else if (n_now_I < bufLen_I)
00195       n_start_I = bufLen_I - n_now_I;
00196     bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00197     if (n_start_I) {
00198       bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00199       d_writeNdx_I = n_start_I;
00200     } else
00201       d_writeNdx_I += n_now_I;
00202     d_n_avail_read_I += bufLen_I;
00203     d_n_avail_write_I -= bufLen_I;
00204     d_readBlock->signal ();
00205     d_internal->unlock ();
00206     return (1);
00207   };
00208 
00209 /*
00210  * dequeue: removes from the queue the number of items requested, or
00211  *     available, into the given buffer on a FIFO basis.
00212  *
00213  * inputs:
00214  *     buf: a pointer to the buffer into which to copy the data
00215  *
00216  *     bufLen_I: pointer to the number of items to remove in items
00217  *         (of the instantiated type)
00218  *
00219  * returns:
00220  *     0: if nothing to do (0 length buffer)
00221  *     1: if success
00222  *     2: in the process of aborting, do doing nothing
00223  *
00224  * will throw runtime errors if inputs are improper:
00225  *     buffer pointer is NULL
00226  *     buffer length pointer is NULL
00227  *     buffer length is larger than the instantiated buffer length
00228  */
00229 
00230   int dequeue (T* buf, UInt32* bufLen_I) {
00231 #if DO_DEBUG
00232     fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
00233              "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
00234              d_n_avail_write_I, d_n_avail_read_I);
00235 #endif
00236     if (!bufLen_I)
00237       throw std::runtime_error ("circular_buffer::dequeue(): "
00238                                 "input bufLen pointer is NULL.\n");
00239     if (!buf)
00240       throw std::runtime_error ("circular_buffer::dequeue(): "
00241                                 "input buffer pointer is NULL.\n");
00242     UInt32 l_bufLen_I = *bufLen_I;
00243     if (l_bufLen_I == 0)
00244       return (0);
00245     if (l_bufLen_I > d_bufLen_I) {
00246       fprintf (stderr, "cannot remove buffer longer (%ld"
00247                ") than instantiated length (%ld"
00248                ").\n", l_bufLen_I, d_bufLen_I);
00249       throw std::runtime_error ("circular_buffer::dequeue()");
00250     }
00251 
00252     d_internal->lock ();
00253     if (d_doAbort) {
00254       d_internal->unlock ();
00255       return (2);
00256     }
00257     if (d_doFullRead) {
00258       while (d_n_avail_read_I < l_bufLen_I) {
00259 #if DO_DEBUG
00260         fprintf (stderr, "dequeue: #a < #len, waiting.\n");
00261 #endif
00262         d_internal->unlock ();
00263         d_readBlock->wait ();
00264         d_internal->lock ();
00265         if (d_doAbort) {
00266           d_internal->unlock ();
00267 #if DO_DEBUG
00268           fprintf (stderr, "dequeue: #a < #len, aborting.\n");
00269 #endif
00270           return (2);
00271         }
00272 #if DO_DEBUG
00273         fprintf (stderr, "dequeue: #a < #len, done waiting.\n");
00274 #endif
00275      }
00276     } else {
00277       while (d_n_avail_read_I == 0) {
00278 #if DO_DEBUG
00279         fprintf (stderr, "dequeue: #a == 0, waiting.\n");
00280 #endif
00281         d_internal->unlock ();
00282         d_readBlock->wait ();
00283         d_internal->lock ();
00284         if (d_doAbort) {
00285           d_internal->unlock ();
00286 #if DO_DEBUG
00287           fprintf (stderr, "dequeue: #a == 0, aborting.\n");
00288 #endif
00289           return (2);
00290         }
00291 #if DO_DEBUG
00292         fprintf (stderr, "dequeue: #a == 0, done waiting.\n");
00293 #endif
00294       }
00295     }
00296     if (l_bufLen_I > d_n_avail_read_I)
00297       l_bufLen_I = d_n_avail_read_I;
00298     UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00299     if (n_now_I > l_bufLen_I)
00300       n_now_I = l_bufLen_I;
00301     else if (n_now_I < l_bufLen_I)
00302       n_start_I = l_bufLen_I - n_now_I;
00303     bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00304     if (n_start_I) {
00305       bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00306       d_readNdx_I = n_start_I;
00307     } else
00308       d_readNdx_I += n_now_I;
00309     *bufLen_I = l_bufLen_I;
00310     d_n_avail_read_I -= l_bufLen_I;
00311     d_n_avail_write_I += l_bufLen_I;
00312     d_writeBlock->signal ();
00313     d_internal->unlock ();
00314     return (1);
00315   };
00316 
00317   void abort () {
00318     d_internal->lock ();
00319     d_doAbort = true;
00320     d_writeBlock->signal ();
00321     d_readBlock->signal ();
00322     d_internal->unlock ();
00323   };
00324 };
00325 
00326 #endif /* _CIRCULAR_BUFFER_H_ */

Generated on Tue May 1 10:45:46 2007 for GNU Radio 3.0.3 by  doxygen 1.5.1