Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

Initiator.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "Initiator.h"
00028 #include "Utility.h"
00029 #include "Session.h"
00030 #include "SessionFactory.h"
00031 #include "HttpServer.h"
00032 #include <algorithm>
00033 #include <fstream>
00034 
00035 namespace FIX
00036 {
00037 Initiator::Initiator( Application& application,
00038                       MessageStoreFactory& messageStoreFactory,
00039                       const SessionSettings& settings ) throw( ConfigError )
00040 : m_threadid( 0 ),
00041   m_application( application ),
00042   m_messageStoreFactory( messageStoreFactory ),
00043   m_settings( settings ),
00044   m_pLogFactory( 0 ),
00045   m_pLog( 0 ),
00046   m_stop( true )
00047 { initialize(); }
00048 
00049 Initiator::Initiator( Application& application,
00050                       MessageStoreFactory& messageStoreFactory,
00051                       const SessionSettings& settings,
00052                       LogFactory& logFactory ) throw( ConfigError )
00053 : m_threadid( 0 ),
00054   m_application( application ),
00055   m_messageStoreFactory( messageStoreFactory ),
00056   m_settings( settings ),
00057   m_pLogFactory( &logFactory ),
00058   m_pLog( logFactory.create() ),
00059   m_stop( true )
00060 { initialize(); }
00061 
00062 void Initiator::initialize() throw ( ConfigError )
00063 { QF_STACK_PUSH(Initiator::initialize)
00064 
00065   std::set < SessionID > sessions = m_settings.getSessions();
00066   std::set < SessionID > ::iterator i;
00067 
00068   if ( !sessions.size() )
00069     throw ConfigError( "No sessions defined" );
00070 
00071   SessionFactory factory( m_application, m_messageStoreFactory,
00072                           m_pLogFactory );
00073 
00074   for ( i = sessions.begin(); i != sessions.end(); ++i )
00075   {
00076     if ( m_settings.get( *i ).getString( "ConnectionType" ) == "initiator" )
00077     {
00078       m_sessionIDs.insert( *i );
00079       m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
00080       setDisconnected( *i );
00081     }
00082   }
00083 
00084   if ( !m_sessions.size() )
00085     throw ConfigError( "No sessions defined for initiator" );
00086 
00087   QF_STACK_POP
00088 }
00089 
00090 Initiator::~Initiator()
00091 { QF_STACK_IGNORE_BEGIN
00092 
00093   Sessions::iterator i;
00094   for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
00095     delete i->second;
00096 
00097   if( m_pLogFactory && m_pLog )
00098     m_pLogFactory->destroy( m_pLog );
00099 
00100   QF_STACK_IGNORE_END
00101 }
00102 
00103 Session* Initiator::getSession( const SessionID& sessionID,
00104                                 Responder& responder )
00105 { QF_STACK_PUSH(Initiator::getSession)
00106 
00107   Sessions::iterator i = m_sessions.find( sessionID );
00108   if ( i != m_sessions.end() )
00109   {
00110     i->second->setResponder( &responder );
00111     return i->second;
00112   }
00113   return 0;
00114 
00115   QF_STACK_POP
00116 }
00117 
00118 Session* Initiator::getSession( const SessionID& sessionID ) const
00119 { QF_STACK_PUSH(Initiator::getSession)
00120 
00121   Sessions::const_iterator i = m_sessions.find( sessionID );
00122   if( i != m_sessions.end() )
00123     return i->second;
00124   else
00125     return 0;
00126 
00127   QF_STACK_POP
00128 }
00129 
00130 const Dictionary* const Initiator::getSessionSettings( const SessionID& sessionID ) const
00131 { QF_STACK_PUSH(Initiator::getSessionSettings)
00132 
00133   try
00134   {
00135     return &m_settings.get( sessionID );
00136   }
00137   catch( ConfigError& )
00138   {
00139     return 0;
00140   }
00141 
00142   QF_STACK_POP
00143 }
00144 
00145 void Initiator::connect()
00146 { QF_STACK_PUSH(Initiator::connect)
00147 
00148   Locker l(m_mutex);
00149 
00150   SessionIDs disconnected = m_disconnected;
00151   SessionIDs::iterator i = disconnected.begin();
00152   for ( ; i != disconnected.end(); ++i )
00153   {
00154     Session* pSession = Session::lookupSession( *i );
00155     if ( pSession->isEnabled() && pSession->isSessionTime(UtcTimeStamp()) )
00156       doConnect( *i, m_settings.get( *i ));
00157   }
00158 
00159   QF_STACK_POP
00160 }
00161 
00162 void Initiator::setPending( const SessionID& sessionID )
00163 { QF_STACK_PUSH(Initiator::setPending)
00164 
00165   Locker l(m_mutex);
00166 
00167   m_pending.insert( sessionID );
00168   m_connected.erase( sessionID );
00169   m_disconnected.erase( sessionID );
00170 
00171   QF_STACK_POP
00172 }
00173 
00174 void Initiator::setConnected( const SessionID& sessionID )
00175 { QF_STACK_PUSH(Initiator::setConnected)
00176 
00177   Locker l(m_mutex);
00178 
00179   m_pending.erase( sessionID );
00180   m_connected.insert( sessionID );
00181   m_disconnected.erase( sessionID );
00182 
00183   QF_STACK_POP
00184 }
00185 
00186 void Initiator::setDisconnected( const SessionID& sessionID )
00187 { QF_STACK_PUSH(Initiator::setDisconnected)
00188 
00189   Locker l(m_mutex);
00190 
00191   m_pending.erase( sessionID );
00192   m_connected.erase( sessionID );
00193   m_disconnected.insert( sessionID );
00194 
00195   QF_STACK_POP
00196 }
00197 
00198 bool Initiator::isPending( const SessionID& sessionID )
00199 { QF_STACK_PUSH(Initiator::isPending)
00200 
00201   Locker l(m_mutex);
00202   return m_pending.find( sessionID ) != m_pending.end();
00203 
00204   QF_STACK_POP
00205 }
00206 
00207 bool Initiator::isConnected( const SessionID& sessionID )
00208 { QF_STACK_PUSH(Initiator::isConnected)
00209 
00210   Locker l(m_mutex);
00211   return m_connected.find( sessionID ) != m_connected.end();
00212 
00213   QF_STACK_POP
00214 }
00215 
00216 bool Initiator::isDisconnected( const SessionID& sessionID )
00217 { QF_STACK_PUSH(Initiator::isDisconnected)
00218 
00219   Locker l(m_mutex);
00220   return m_disconnected.find( sessionID ) != m_disconnected.end();
00221 
00222   QF_STACK_POP
00223 }
00224 
00225 void Initiator::start() throw ( ConfigError, RuntimeError )
00226 { QF_STACK_PUSH(Initiator::start)
00227 
00228   m_stop = false;
00229   onConfigure( m_settings );
00230   onInitialize( m_settings );
00231 
00232   HttpServer::startGlobal( m_settings );
00233 
00234   if( !thread_spawn( &startThread, this, m_threadid ) )
00235     throw RuntimeError("Unable to spawn thread");
00236 
00237   QF_STACK_POP
00238 }
00239 
00240 
00241 void Initiator::block() throw ( ConfigError, RuntimeError )
00242 { QF_STACK_PUSH(Initiator::block)
00243 
00244   m_stop = false;
00245   onConfigure( m_settings );
00246   onInitialize( m_settings );
00247 
00248   startThread(this);
00249 
00250   QF_STACK_POP
00251 }
00252 
00253 bool Initiator::poll( double timeout ) throw ( ConfigError, RuntimeError )
00254 { QF_STACK_PUSH(Initiator::poll)
00255 
00256   if( m_firstPoll )
00257   {
00258     onConfigure( m_settings );
00259     onInitialize( m_settings );
00260     m_firstPoll = false;
00261   }
00262 
00263   return onPoll( timeout );
00264 
00265   QF_STACK_POP
00266 }
00267 
00268 void Initiator::stop( bool force )
00269 { QF_STACK_PUSH(Initiator::stop)
00270 
00271   if( isStopped() ) return;
00272 
00273   HttpServer::stopGlobal();
00274 
00275   std::vector<Session*> enabledSessions;
00276 
00277   SessionIDs connected = m_connected;
00278   SessionIDs::iterator i = connected.begin();
00279   for ( ; i != connected.end(); ++i )
00280   {
00281     Session* pSession = Session::lookupSession(*i);
00282     if( pSession->isEnabled() )
00283     {
00284       enabledSessions.push_back( pSession );
00285       pSession->logout();
00286     }
00287   }
00288 
00289   if( !force )
00290   {
00291     for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
00292       process_sleep( 1 );
00293   }
00294 
00295   {
00296     Locker l(m_mutex);
00297     for ( i = connected.begin(); i != connected.end(); ++i )
00298       setDisconnected( Session::lookupSession(*i)->getSessionID() );
00299   }
00300 
00301   m_stop = true;
00302   onStop();
00303   if( m_threadid )
00304     thread_join( m_threadid );
00305   m_threadid = 0;
00306 
00307   std::vector<Session*>::iterator session = enabledSessions.begin();
00308   for( ; session != enabledSessions.end(); ++session )
00309     (*session)->logon();
00310 
00311   QF_STACK_POP
00312 }
00313 
00314 bool Initiator::isLoggedOn()
00315 { QF_STACK_PUSH(Initiator::isLoggedOn)
00316 
00317   Locker l(m_mutex);
00318 
00319   SessionIDs connected = m_connected;
00320   SessionIDs::iterator i = connected.begin();
00321   for ( ; i != connected.end(); ++i )
00322   {
00323     if( Session::lookupSession(*i)->isLoggedOn() )
00324       return true;
00325   }
00326   return false;
00327 
00328   QF_STACK_POP
00329 }
00330 
00331 THREAD_PROC Initiator::startThread( void* p )
00332 { QF_STACK_TRY
00333   QF_STACK_PUSH(Initiator::startThread)
00334 
00335   Initiator * pInitiator = static_cast < Initiator* > ( p );
00336   pInitiator->onStart();
00337   return 0;
00338 
00339   QF_STACK_POP
00340   QF_STACK_CATCH
00341 }
00342 }

Generated on Mon Apr 5 20:59:50 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001