• Main Page
  • Classes
  • Files
  • File List
  • File Members

circular_buffer.h

Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2006 Free Software Foundation, Inc.
00004  * 
00005  * This file is part of GNU Radio.
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  * 
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025 
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028 
00029 #ifndef DO_DEBUG
00030 #define DO_DEBUG 0
00031 #endif
00032 
00033 #if DO_DEBUG
00034 #define DEBUG(X) do{X} while(0);
00035 #else
00036 #define DEBUG(X) do{} while(0);
00037 #endif
00038 
00039 template <class T> class circular_buffer
00040 {
00041 private:
00042 // the buffer to use
00043   T* d_buffer;
00044 
00045 // the following are in Items (type T)
00046   UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00047   UInt32 d_n_avail_write_I, d_n_avail_read_I;
00048 
00049 // stuff to control access to class internals
00050   mld_mutex_ptr d_internal;
00051   mld_condition_ptr d_readBlock, d_writeBlock;
00052 
00053 // booleans to decide how to control reading, writing, and aborting
00054   bool d_doWriteBlock, d_doFullRead, d_doAbort;
00055 
00056   void delete_mutex_cond () {
00057     if (d_internal) {
00058       delete d_internal;
00059       d_internal = NULL;
00060     }
00061     if (d_readBlock) {
00062       delete d_readBlock;
00063       d_readBlock = NULL;
00064     }
00065     if (d_writeBlock) {
00066       delete d_writeBlock;
00067       d_writeBlock = NULL;
00068     }
00069   };
00070 
00071 public:
00072   circular_buffer (UInt32 bufLen_I,
00073                    bool doWriteBlock = true, bool doFullRead = false) {
00074     if (bufLen_I == 0)
00075       throw std::runtime_error ("circular_buffer(): "
00076                                 "Number of items to buffer must be > 0.\n");
00077     d_bufLen_I = bufLen_I;
00078     d_buffer = (T*) new T[d_bufLen_I];
00079     d_doWriteBlock = doWriteBlock;
00080     d_doFullRead = doFullRead;
00081     d_internal = NULL;
00082     d_readBlock = d_writeBlock = NULL;
00083     reset ();
00084     DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
00085                     "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00086                     (d_doWriteBlock ? "true" : "false"),
00087                     (d_doFullRead ? "true" : "false")););
00088   };
00089 
00090   ~circular_buffer () {
00091     delete_mutex_cond ();
00092     delete [] d_buffer;
00093   };
00094 
00095   inline UInt32 n_avail_write_items () {
00096     d_internal->lock ();
00097     UInt32 retVal = d_n_avail_write_I;
00098     d_internal->unlock ();
00099     return (retVal);
00100   };
00101 
00102   inline UInt32 n_avail_read_items () {
00103     d_internal->lock ();
00104     UInt32 retVal = d_n_avail_read_I;
00105     d_internal->unlock ();
00106     return (retVal);
00107   };
00108 
00109   inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00110   inline bool do_write_block () {return (d_doWriteBlock);};
00111   inline bool do_full_read () {return (d_doFullRead);};
00112 
00113   void reset () {
00114     d_doAbort = false;
00115     bzero (d_buffer, d_bufLen_I * sizeof (T));
00116     d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00117     d_n_avail_write_I = d_bufLen_I;
00118     delete_mutex_cond ();
00119     // create a mutex to handle contention of shared resources;
00120     // any routine needed access to shared resources uses lock()
00121     // before doing anything, then unlock() when finished.
00122     d_internal = new mld_mutex ();
00123     // link the internal mutex to the read and write conditions;
00124     // when wait() is called, the internal mutex will automatically
00125     // be unlock()'ed.  Upon return (from a signal() to the condition),
00126     // the internal mutex will be lock()'ed.
00127     d_readBlock = new mld_condition (d_internal);
00128     d_writeBlock = new mld_condition (d_internal);
00129   };
00130 
00131 /*
00132  * enqueue: add the given buffer of item-length to the queue,
00133  *     first-in-first-out (FIFO).
00134  *
00135  * inputs:
00136  *     buf: a pointer to the buffer holding the data
00137  *
00138  *     bufLen_I: the buffer length in items (of the instantiated type)
00139  *
00140  * returns:
00141  *    -1: on overflow (write is not blocking, and data is being
00142  *                     written faster than it is being read)
00143  *     0: if nothing to do (0 length buffer)
00144  *     1: if success
00145  *     2: in the process of aborting, do doing nothing
00146  *
00147  * will throw runtime errors if inputs are improper:
00148  *     buffer pointer is NULL
00149  *     buffer length is larger than the instantiated buffer length
00150  */
00151 
00152   int enqueue (T* buf, UInt32 bufLen_I) {
00153     DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
00154                     "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
00155                     d_n_avail_write_I, d_n_avail_read_I););
00156     if (bufLen_I > d_bufLen_I) {
00157       fprintf (stderr, "cannot add buffer longer (%ld"
00158                ") than instantiated length (%ld"
00159                ").\n", bufLen_I, d_bufLen_I);
00160       throw std::runtime_error ("circular_buffer::enqueue()");
00161     }
00162 
00163     if (bufLen_I == 0)
00164       return (0);
00165     if (!buf)
00166       throw std::runtime_error ("circular_buffer::enqueue(): "
00167                                 "input buffer is NULL.\n");
00168     d_internal->lock ();
00169     if (d_doAbort) {
00170       d_internal->unlock ();
00171       return (2);
00172     }
00173     // set the return value to 1: success; change if needed
00174     int retval = 1;
00175     if (bufLen_I > d_n_avail_write_I) {
00176       if (d_doWriteBlock) {
00177         while (bufLen_I > d_n_avail_write_I) {
00178           DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"););
00179           // wait will automatically unlock() the internal mutex
00180           d_writeBlock->wait ();
00181           // and lock() it here.
00182           if (d_doAbort) {
00183             d_internal->unlock ();
00184             DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"););
00185             return (2);
00186           }
00187           DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"););
00188         }
00189       } else {
00190         d_n_avail_read_I = d_bufLen_I - bufLen_I;
00191         d_n_avail_write_I = bufLen_I;
00192         DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"););
00193         retval = -1;
00194       }
00195     }
00196     UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00197     if (n_now_I > bufLen_I)
00198       n_now_I = bufLen_I;
00199     else if (n_now_I < bufLen_I)
00200       n_start_I = bufLen_I - n_now_I;
00201     bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00202     if (n_start_I) {
00203       bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00204       d_writeNdx_I = n_start_I;
00205     } else
00206       d_writeNdx_I += n_now_I;
00207     d_n_avail_read_I += bufLen_I;
00208     d_n_avail_write_I -= bufLen_I;
00209     d_readBlock->signal ();
00210     d_internal->unlock ();
00211     return (retval);
00212   };
00213 
00214 /*
00215  * dequeue: removes from the queue the number of items requested, or
00216  *     available, into the given buffer on a FIFO basis.
00217  *
00218  * inputs:
00219  *     buf: a pointer to the buffer into which to copy the data
00220  *
00221  *     bufLen_I: pointer to the number of items to remove in items
00222  *         (of the instantiated type)
00223  *
00224  * returns:
00225  *     0: if nothing to do (0 length buffer)
00226  *     1: if success
00227  *     2: in the process of aborting, do doing nothing
00228  *
00229  * will throw runtime errors if inputs are improper:
00230  *     buffer pointer is NULL
00231  *     buffer length pointer is NULL
00232  *     buffer length is larger than the instantiated buffer length
00233  */
00234 
00235   int dequeue (T* buf, UInt32* bufLen_I) {
00236     DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
00237                     "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
00238                     d_n_avail_write_I, d_n_avail_read_I););
00239     if (!bufLen_I)
00240       throw std::runtime_error ("circular_buffer::dequeue(): "
00241                                 "input bufLen pointer is NULL.\n");
00242     if (!buf)
00243       throw std::runtime_error ("circular_buffer::dequeue(): "
00244                                 "input buffer pointer is NULL.\n");
00245     UInt32 l_bufLen_I = *bufLen_I;
00246     if (l_bufLen_I == 0)
00247       return (0);
00248     if (l_bufLen_I > d_bufLen_I) {
00249       fprintf (stderr, "cannot remove buffer longer (%ld"
00250                ") than instantiated length (%ld"
00251                ").\n", l_bufLen_I, d_bufLen_I);
00252       throw std::runtime_error ("circular_buffer::dequeue()");
00253     }
00254 
00255     d_internal->lock ();
00256     if (d_doAbort) {
00257       d_internal->unlock ();
00258       return (2);
00259     }
00260     if (d_doFullRead) {
00261       while (d_n_avail_read_I < l_bufLen_I) {
00262         DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"););
00263         // wait will automatically unlock() the internal mutex
00264         d_readBlock->wait ();
00265         // and lock() it here.
00266         if (d_doAbort) {
00267           d_internal->unlock ();
00268           DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"););
00269           return (2);
00270         }
00271         DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"););
00272      }
00273     } else {
00274       while (d_n_avail_read_I == 0) {
00275         DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"););
00276         // wait will automatically unlock() the internal mutex
00277         d_readBlock->wait ();
00278         // and lock() it here.
00279         if (d_doAbort) {
00280           d_internal->unlock ();
00281           DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"););
00282           return (2);
00283         }
00284         DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"););
00285       }
00286     }
00287     if (l_bufLen_I > d_n_avail_read_I)
00288       l_bufLen_I = d_n_avail_read_I;
00289     UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00290     if (n_now_I > l_bufLen_I)
00291       n_now_I = l_bufLen_I;
00292     else if (n_now_I < l_bufLen_I)
00293       n_start_I = l_bufLen_I - n_now_I;
00294     bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00295     if (n_start_I) {
00296       bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00297       d_readNdx_I = n_start_I;
00298     } else
00299       d_readNdx_I += n_now_I;
00300     *bufLen_I = l_bufLen_I;
00301     d_n_avail_read_I -= l_bufLen_I;
00302     d_n_avail_write_I += l_bufLen_I;
00303     d_writeBlock->signal ();
00304     d_internal->unlock ();
00305     return (1);
00306   };
00307 
00308   void abort () {
00309     d_internal->lock ();
00310     d_doAbort = true;
00311     d_writeBlock->signal ();
00312     d_readBlock->signal ();
00313     d_internal->unlock ();
00314   };
00315 };
00316 
00317 #endif /* _CIRCULAR_BUFFER_H_ */

Generated on Thu Aug 19 2010 06:11:59 for Universal Software Radio Peripheral by  doxygen 1.7.1