sockagg.h

Go to the documentation of this file.
00001 /*
00002  * sockagg.h
00003  *
00004  * Generalised Socket Aggregation functions
00005  *
00006  * Portable Windows Library
00007  *
00008  * Copyright (C) 2005 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Log: sockagg.h,v $
00030  * Revision 1.4  2006/01/03 04:23:32  csoutheren
00031  * Fixed Unix implementation
00032  *
00033  * Revision 1.3  2005/12/23 06:44:30  csoutheren
00034  * Working implementation
00035  *
00036  * Revision 1.2  2005/12/22 07:27:36  csoutheren
00037  * More implementation
00038  *
00039  * Revision 1.1  2005/12/22 03:55:52  csoutheren
00040  * Added initial version of socket aggregation classes
00041  *
00042  *
00043  */
00044 
00045 
00046 #ifndef _SOCKAGG_H
00047 #define _SOCKAGG_H
00048 
00049 #ifdef P_USE_PRAGMA
00050 #pragma interface
00051 #endif
00052 
00053 #include <ptlib.h>
00054 #include <ptlib/sockets.h>
00055 
00056 /*
00057 
00058 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is
00059 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323
00060 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be
00061 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at:
00062 
00063          http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf
00064 
00065 There are two assumptions made by this code:
00066 
00067   1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other
00068      polling mechanism is less efficient
00069 
00070   2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of
00071      other calls that are handled in the same thread
00072 
00073 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks
00074 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 
00075 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect
00076 received data into a buffer until the handling routine decides that a full PDU has been received.
00077 
00078 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed
00079 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can
00080 be woken up in order to allow the addition or removal of sockets to the list
00081 
00082 */
00083 
00084 #include <ptlib.h>
00085 #include <functional>
00086 #include <vector>
00087 
00089 //
00090 // this class encapsulates the system specific handle needed to specifiy a socket.
00091 // On Unix systems, this is a simple file handle. This file handle is used to uniquely
00092 // identify the socket and used in the "select" system call
00093 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT
00094 // handle is needed for the WSWaitForMultpleEvents call.
00095 // This is further complicated by the fact that we need to treat some pairs of sockets as a single
00096 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code.
00097 //
00098 
00099 class PAggregatedHandle;
00100 
00101 class PAggregatorFD 
00102 {
00103   public:
00104 #ifdef _WIN32
00105     typedef WSAEVENT FD;
00106     typedef SOCKET FDType;
00107     SOCKET socket;
00108 #else
00109     typedef int FD;
00110     typedef int FDType;
00111 #endif
00112 
00113     PAggregatorFD(FDType fd);
00114 
00115     FD fd;
00116 
00117     ~PAggregatorFD();
00118     bool IsValid();
00119 };
00120 
00121 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00122 
00124 //
00125 // This class defines an abstract class used to define a handle that can be aggregated
00126 //
00127 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets
00128 // which greatly complicates things
00129 //
00130 
00131 #ifdef _MSC_VER
00132 #pragma warning(push)
00133 #pragma warning(disable:4127)
00134 #endif
00135 
00136 class PAggregatedHandle : public PObject
00137 {
00138   PCLASSINFO(PAggregatedHandle, PObject);
00139   public:
00140     PAggregatedHandle(BOOL _autoDelete = FALSE)
00141       : autoDelete(_autoDelete), preReadDone(FALSE)
00142     { }
00143 
00144     virtual PAggregatorFDList_t GetFDs() = 0;
00145 
00146     virtual PTimeInterval GetTimeout()
00147     { return PMaxTimeInterval; }
00148 
00149     virtual BOOL Init()      { return TRUE; }
00150     virtual BOOL PreRead()   { return TRUE; }
00151     virtual BOOL OnRead() = 0;
00152     virtual void DeInit()    { }
00153 
00154     virtual BOOL IsPreReadDone() const
00155     { return preReadDone; }
00156 
00157     virtual void SetPreReadDone(BOOL v = TRUE)
00158     { preReadDone = v; }
00159 
00160     BOOL autoDelete;
00161 
00162   protected:
00163     BOOL preReadDone;
00164 };
00165 
00166 #ifdef _MSC_VER
00167 #pragma warning(pop)
00168 #endif
00169 
00170 
00172 //
00173 // This class is the actual socket aggregator
00174 //
00175 
00176 class PHandleAggregator : public PObject
00177 {
00178   PCLASSINFO(PHandleAggregator, PObject)
00179   public:
00180     class EventBase
00181     {
00182       public:
00183         virtual PAggregatorFD::FD GetHandle() = 0;
00184         virtual void Set() = 0;
00185         virtual void Reset() = 0;
00186     };
00187 
00188     typedef std::vector<PAggregatedHandle *> HandleContextList_t;
00189 
00190     class WorkerThreadBase : public PThread
00191     {
00192       public:
00193         WorkerThreadBase(EventBase & _event)
00194           : PThread(100, NoAutoDeleteThread), event(_event), listChanged(TRUE)
00195         { }
00196 
00197         virtual void Trigger() = 0;
00198         void Main();
00199 
00200         EventBase & event;
00201         PMutex mutex;
00202         BOOL listChanged;
00203 
00204         HandleContextList_t handleList;
00205     };
00206 
00207     typedef std::vector<WorkerThreadBase *> WorkerList_t;
00208 
00209     PHandleAggregator(unsigned _max = 10);
00210 
00211     BOOL AddHandle(PAggregatedHandle * handle);
00212 
00213     BOOL RemoveHandle(PAggregatedHandle * handle);
00214 
00215     PMutex mutex;
00216     WorkerList_t workers;
00217     unsigned maxWorkerSize;
00218     unsigned minWorkerSize;
00219 };
00220 
00221 
00223 //
00224 // This template class allows the creation of aggregators for sockets that are
00225 // descendants of PIPSocket
00226 //
00227 
00228 template <class PSocketType>
00229 class PSocketAggregator : public PHandleAggregator
00230 {
00231   PCLASSINFO(PSocketAggregator, PHandleAggregator)
00232   public:
00233     class AggregatedPSocket : public PAggregatedHandle
00234     {
00235       public:
00236         AggregatedPSocket(PSocketType * _s)
00237           : psocket(_s), fd(_s->GetHandle()) { }
00238 
00239         BOOL OnRead()
00240         { return psocket->OnRead(); }
00241 
00242         PAggregatorFDList_t GetFDs()
00243         { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00244 
00245       protected:
00246         PSocketType * psocket;
00247         PAggregatorFD fd;
00248     };
00249 
00250     typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00251     SocketList_t socketList;
00252 
00253     BOOL AddSocket(PSocketType * sock)
00254     { 
00255       PWaitAndSignal m(mutex);
00256 
00257       AggregatedPSocket * handle = new AggregatedPSocket(sock);
00258       if (AddHandle(handle)) {
00259         socketList.insert(SocketList_t::value_type(sock, handle));
00260         return true;
00261       }
00262 
00263       delete handle;
00264       return false;
00265     }
00266 
00267     BOOL RemoveSocket(PSocketType * sock)
00268     { 
00269       PWaitAndSignal m(mutex);
00270 
00271       typename SocketList_t::iterator r = socketList.find(sock);
00272       if (r == socketList.end()) 
00273         return FALSE;
00274 
00275       AggregatedPSocket * handle = r->second;
00276       RemoveHandle(handle);
00277       delete handle;
00278       socketList.erase(r);
00279       return TRUE;
00280     }
00281 };
00282 
00283 #endif

Generated on Fri Sep 21 14:40:11 2007 for PWLib by  doxygen 1.5.3