Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

SocketAcceptor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketAcceptor.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030 #include "Utility.h"
00031 #include "Exceptions.h"
00032 
00033 namespace FIX
00034 {
00035 SocketAcceptor::SocketAcceptor( Application& application,
00036                                 MessageStoreFactory& factory,
00037                                 const SessionSettings& settings ) throw( ConfigError )
00038 : Acceptor( application, factory, settings ),
00039   m_pServer( 0 ) {}
00040 
00041 SocketAcceptor::SocketAcceptor( Application& application,
00042                                 MessageStoreFactory& factory,
00043                                 const SessionSettings& settings,
00044                                 LogFactory& logFactory ) throw( ConfigError )
00045 : Acceptor( application, factory, settings, logFactory ),
00046   m_pServer( 0 ) 
00047 {
00048 }
00049 
00050 SocketAcceptor::~SocketAcceptor()
00051 {
00052   SocketConnections::iterator iter;
00053   for ( iter = m_connections.begin(); iter != m_connections.end(); ++iter )
00054     delete iter->second;
00055 }
00056 
00057 void SocketAcceptor::onConfigure( const SessionSettings& s )
00058 throw ( ConfigError )
00059 { QF_STACK_PUSH(SocketAcceptor::onConfigure)
00060 
00061   std::set<SessionID> sessions = s.getSessions();
00062   std::set<SessionID>::iterator i;
00063   for( i = sessions.begin(); i != sessions.end(); ++i )
00064   {
00065     const Dictionary& settings = s.get( *i );
00066     settings.getLong( SOCKET_ACCEPT_PORT );
00067     if( settings.has(SOCKET_REUSE_ADDRESS) )
00068       settings.getBool( SOCKET_REUSE_ADDRESS );
00069     if( settings.has(SOCKET_NODELAY) )
00070       settings.getBool( SOCKET_NODELAY );
00071   }
00072 
00073   QF_STACK_POP
00074 }
00075 
00076 void SocketAcceptor::onInitialize( const SessionSettings& s )
00077 throw ( RuntimeError )
00078 { QF_STACK_PUSH(SocketAcceptor::onInitialize)
00079 
00080   short port = 0;
00081 
00082   try
00083   {
00084     m_pServer = new SocketServer( 1 );
00085 
00086     std::set<SessionID> sessions = s.getSessions();
00087     std::set<SessionID>::iterator i = sessions.begin();
00088     for( ; i != sessions.end(); ++i )
00089     {
00090       Dictionary settings = s.get( *i );
00091       short port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00092 
00093       const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ? 
00094         s.get().getBool( SOCKET_REUSE_ADDRESS ) : true;
00095 
00096       const bool noDelay = settings.has( SOCKET_NODELAY ) ? 
00097         s.get().getBool( SOCKET_NODELAY ) : false;
00098 
00099       const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
00100         s.get().getLong( SOCKET_SEND_BUFFER_SIZE ) : 0;
00101 
00102       const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
00103         s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
00104 
00105       m_portToSessions[port].insert( *i );
00106       m_pServer->add( port, reuseAddress, noDelay, sendBufSize, rcvBufSize );      
00107     }    
00108   }
00109   catch( SocketException& e )
00110   {
00111     throw RuntimeError( "Unable to create, bind, or listen to port "
00112                        + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
00113   }
00114 
00115   QF_STACK_POP
00116 }
00117 
00118 void SocketAcceptor::onStart()
00119 { QF_STACK_PUSH(SocketAcceptor::onStart)
00120 
00121   while ( !isStopped() && m_pServer && m_pServer->block( *this ) ) {}
00122 
00123   if( !m_pServer )
00124     return;
00125 
00126   time_t start = 0;
00127   time_t now = 0;
00128 
00129   ::time( &start );
00130   while ( isLoggedOn() )
00131   {
00132     m_pServer->block( *this );
00133     if( ::time(&now) -5 >= start )
00134       break;
00135   }
00136 
00137   m_pServer->close();
00138   delete m_pServer;
00139   m_pServer = 0;
00140 
00141   QF_STACK_POP
00142 }
00143 
00144 bool SocketAcceptor::onPoll( double timeout )
00145 { QF_STACK_PUSH(SocketAcceptor::onPoll)
00146 
00147   if( !m_pServer )
00148     return false;
00149 
00150   time_t start = 0;
00151   time_t now = 0;
00152 
00153   if( isStopped() )
00154   {
00155     if( start == 0 )
00156       ::time( &start );
00157     if( !isLoggedOn() )
00158     {
00159       start = 0;
00160       return false;
00161     }
00162     if( ::time(&now) - 5 >= start )
00163     {
00164       start = 0;
00165       return false;
00166     }
00167   }
00168 
00169   m_pServer->block( *this, true, timeout );
00170   return true;
00171 
00172   QF_STACK_POP
00173 }
00174 
00175 void SocketAcceptor::onStop()
00176 { QF_STACK_PUSH(SocketAcceptor::onStop)
00177   QF_STACK_POP
00178 }
00179 
00180 void SocketAcceptor::onConnect( SocketServer& server, int a, int s )
00181 { QF_STACK_PUSH(SocketAcceptor::onConnect)
00182 
00183   if ( !socket_isValid( s ) ) return;
00184   SocketConnections::iterator i = m_connections.find( s );
00185   if ( i != m_connections.end() ) return;
00186   int port = server.socketToPort( a );
00187   Sessions sessions = m_portToSessions[port];
00188   m_connections[ s ] = new SocketConnection( s, sessions, &server.getMonitor() );
00189 
00190   std::stringstream stream;
00191   stream << "Accepted connection from " << socket_peername( s ) << " on port " << port;
00192 
00193   if( getLog() )
00194     getLog()->onEvent( stream.str() );
00195 
00196   QF_STACK_POP
00197 }
00198 
00199 void SocketAcceptor::onWrite( SocketServer& server, int s )
00200 { QF_STACK_PUSH(SocketAcceptor::onWrite)
00201 
00202   SocketConnections::iterator i = m_connections.find( s );
00203   if ( i == m_connections.end() ) return ;
00204   SocketConnection* pSocketConnection = i->second;
00205   if( pSocketConnection->processQueue() )
00206     pSocketConnection->unsignal();
00207 
00208   QF_STACK_POP
00209 }
00210 
00211 bool SocketAcceptor::onData( SocketServer& server, int s )
00212 { QF_STACK_PUSH(SocketAcceptor::onData)
00213 
00214   SocketConnections::iterator i = m_connections.find( s );
00215   if ( i == m_connections.end() ) return false;
00216   SocketConnection* pSocketConnection = i->second;
00217   return pSocketConnection->read( *this, server );
00218 
00219   QF_STACK_POP
00220 }
00221 
00222 void SocketAcceptor::onDisconnect( SocketServer&, int s )
00223 { QF_STACK_PUSH(SocketAcceptor::onDisconnect)
00224 
00225   SocketConnections::iterator i = m_connections.find( s );
00226   if ( i == m_connections.end() ) return ;
00227   SocketConnection* pSocketConnection = i->second;
00228 
00229   Session* pSession = pSocketConnection->getSession();
00230   if ( pSession ) pSession->disconnect();
00231 
00232   delete pSocketConnection;
00233   m_connections.erase( s );
00234 
00235   QF_STACK_POP
00236 }
00237 
00238 void SocketAcceptor::onError( SocketServer& ) {}
00239 
00240 void SocketAcceptor::onTimeout( SocketServer& )
00241 { QF_STACK_PUSH(SocketAcceptor::onInitialize)
00242 
00243   SocketConnections::iterator i;
00244   for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00245     i->second->onTimeout();
00246 
00247   QF_STACK_POP
00248 }
00249 }

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001