Initiator.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "Initiator.h"
00028 #include "Utility.h"
00029 #include "Session.h"
00030 #include "SessionFactory.h"
00031 #include "HttpServer.h"
00032 #include <algorithm>
00033 #include <fstream>
00034
00035 namespace FIX
00036 {
00037 Initiator::Initiator( Application& application,
00038 MessageStoreFactory& messageStoreFactory,
00039 const SessionSettings& settings ) throw( ConfigError )
00040 : m_threadid( 0 ),
00041 m_application( application ),
00042 m_messageStoreFactory( messageStoreFactory ),
00043 m_settings( settings ),
00044 m_pLogFactory( 0 ),
00045 m_pLog( 0 ),
00046 m_stop( true )
00047 { initialize(); }
00048
00049 Initiator::Initiator( Application& application,
00050 MessageStoreFactory& messageStoreFactory,
00051 const SessionSettings& settings,
00052 LogFactory& logFactory ) throw( ConfigError )
00053 : m_threadid( 0 ),
00054 m_application( application ),
00055 m_messageStoreFactory( messageStoreFactory ),
00056 m_settings( settings ),
00057 m_pLogFactory( &logFactory ),
00058 m_pLog( logFactory.create() ),
00059 m_stop( true )
00060 { initialize(); }
00061
00062 void Initiator::initialize() throw ( ConfigError )
00063 { QF_STACK_PUSH(Initiator::initialize)
00064
00065 std::set < SessionID > sessions = m_settings.getSessions();
00066 std::set < SessionID > ::iterator i;
00067
00068 if ( !sessions.size() )
00069 throw ConfigError( "No sessions defined" );
00070
00071 SessionFactory factory( m_application, m_messageStoreFactory,
00072 m_pLogFactory );
00073
00074 for ( i = sessions.begin(); i != sessions.end(); ++i )
00075 {
00076 if ( m_settings.get( *i ).getString( "ConnectionType" ) == "initiator" )
00077 {
00078 m_sessionIDs.insert( *i );
00079 m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
00080 setDisconnected( *i );
00081 }
00082 }
00083
00084 if ( !m_sessions.size() )
00085 throw ConfigError( "No sessions defined for initiator" );
00086
00087 QF_STACK_POP
00088 }
00089
00090 Initiator::~Initiator()
00091 { QF_STACK_IGNORE_BEGIN
00092
00093 Sessions::iterator i;
00094 for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
00095 delete i->second;
00096
00097 if( m_pLogFactory && m_pLog )
00098 m_pLogFactory->destroy( m_pLog );
00099
00100 QF_STACK_IGNORE_END
00101 }
00102
00103 Session* Initiator::getSession( const SessionID& sessionID,
00104 Responder& responder )
00105 { QF_STACK_PUSH(Initiator::getSession)
00106
00107 Sessions::iterator i = m_sessions.find( sessionID );
00108 if ( i != m_sessions.end() )
00109 {
00110 i->second->setResponder( &responder );
00111 return i->second;
00112 }
00113 return 0;
00114
00115 QF_STACK_POP
00116 }
00117
00118 Session* Initiator::getSession( const SessionID& sessionID ) const
00119 { QF_STACK_PUSH(Initiator::getSession)
00120
00121 Sessions::const_iterator i = m_sessions.find( sessionID );
00122 if( i != m_sessions.end() )
00123 return i->second;
00124 else
00125 return 0;
00126
00127 QF_STACK_POP
00128 }
00129
00130 const Dictionary* const Initiator::getSessionSettings( const SessionID& sessionID ) const
00131 { QF_STACK_PUSH(Initiator::getSessionSettings)
00132
00133 try
00134 {
00135 return &m_settings.get( sessionID );
00136 }
00137 catch( ConfigError& )
00138 {
00139 return 0;
00140 }
00141
00142 QF_STACK_POP
00143 }
00144
00145 void Initiator::connect()
00146 { QF_STACK_PUSH(Initiator::connect)
00147
00148 Locker l(m_mutex);
00149
00150 SessionIDs disconnected = m_disconnected;
00151 SessionIDs::iterator i = disconnected.begin();
00152 for ( ; i != disconnected.end(); ++i )
00153 {
00154 Session* pSession = Session::lookupSession( *i );
00155 if ( pSession->isEnabled() && pSession->isSessionTime(UtcTimeStamp()) )
00156 doConnect( *i, m_settings.get( *i ));
00157 }
00158
00159 QF_STACK_POP
00160 }
00161
00162 void Initiator::setPending( const SessionID& sessionID )
00163 { QF_STACK_PUSH(Initiator::setPending)
00164
00165 Locker l(m_mutex);
00166
00167 m_pending.insert( sessionID );
00168 m_connected.erase( sessionID );
00169 m_disconnected.erase( sessionID );
00170
00171 QF_STACK_POP
00172 }
00173
00174 void Initiator::setConnected( const SessionID& sessionID )
00175 { QF_STACK_PUSH(Initiator::setConnected)
00176
00177 Locker l(m_mutex);
00178
00179 m_pending.erase( sessionID );
00180 m_connected.insert( sessionID );
00181 m_disconnected.erase( sessionID );
00182
00183 QF_STACK_POP
00184 }
00185
00186 void Initiator::setDisconnected( const SessionID& sessionID )
00187 { QF_STACK_PUSH(Initiator::setDisconnected)
00188
00189 Locker l(m_mutex);
00190
00191 m_pending.erase( sessionID );
00192 m_connected.erase( sessionID );
00193 m_disconnected.insert( sessionID );
00194
00195 QF_STACK_POP
00196 }
00197
00198 bool Initiator::isPending( const SessionID& sessionID )
00199 { QF_STACK_PUSH(Initiator::isPending)
00200
00201 Locker l(m_mutex);
00202 return m_pending.find( sessionID ) != m_pending.end();
00203
00204 QF_STACK_POP
00205 }
00206
00207 bool Initiator::isConnected( const SessionID& sessionID )
00208 { QF_STACK_PUSH(Initiator::isConnected)
00209
00210 Locker l(m_mutex);
00211 return m_connected.find( sessionID ) != m_connected.end();
00212
00213 QF_STACK_POP
00214 }
00215
00216 bool Initiator::isDisconnected( const SessionID& sessionID )
00217 { QF_STACK_PUSH(Initiator::isDisconnected)
00218
00219 Locker l(m_mutex);
00220 return m_disconnected.find( sessionID ) != m_disconnected.end();
00221
00222 QF_STACK_POP
00223 }
00224
00225 void Initiator::start() throw ( ConfigError, RuntimeError )
00226 { QF_STACK_PUSH(Initiator::start)
00227
00228 m_stop = false;
00229 onConfigure( m_settings );
00230 onInitialize( m_settings );
00231
00232 HttpServer::startGlobal( m_settings );
00233
00234 if( !thread_spawn( &startThread, this, m_threadid ) )
00235 throw RuntimeError("Unable to spawn thread");
00236
00237 QF_STACK_POP
00238 }
00239
00240
00241 void Initiator::block() throw ( ConfigError, RuntimeError )
00242 { QF_STACK_PUSH(Initiator::block)
00243
00244 m_stop = false;
00245 onConfigure( m_settings );
00246 onInitialize( m_settings );
00247
00248 startThread(this);
00249
00250 QF_STACK_POP
00251 }
00252
00253 bool Initiator::poll( double timeout ) throw ( ConfigError, RuntimeError )
00254 { QF_STACK_PUSH(Initiator::poll)
00255
00256 if( m_firstPoll )
00257 {
00258 onConfigure( m_settings );
00259 onInitialize( m_settings );
00260 m_firstPoll = false;
00261 }
00262
00263 return onPoll( timeout );
00264
00265 QF_STACK_POP
00266 }
00267
00268 void Initiator::stop( bool force )
00269 { QF_STACK_PUSH(Initiator::stop)
00270
00271 if( isStopped() ) return;
00272
00273 HttpServer::stopGlobal();
00274
00275 std::vector<Session*> enabledSessions;
00276
00277 SessionIDs connected = m_connected;
00278 SessionIDs::iterator i = connected.begin();
00279 for ( ; i != connected.end(); ++i )
00280 {
00281 Session* pSession = Session::lookupSession(*i);
00282 if( pSession->isEnabled() )
00283 {
00284 enabledSessions.push_back( pSession );
00285 pSession->logout();
00286 }
00287 }
00288
00289 if( !force )
00290 {
00291 for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
00292 process_sleep( 1 );
00293 }
00294
00295 {
00296 Locker l(m_mutex);
00297 for ( i = connected.begin(); i != connected.end(); ++i )
00298 setDisconnected( Session::lookupSession(*i)->getSessionID() );
00299 }
00300
00301 m_stop = true;
00302 onStop();
00303 if( m_threadid )
00304 thread_join( m_threadid );
00305 m_threadid = 0;
00306
00307 std::vector<Session*>::iterator session = enabledSessions.begin();
00308 for( ; session != enabledSessions.end(); ++session )
00309 (*session)->logon();
00310
00311 QF_STACK_POP
00312 }
00313
00314 bool Initiator::isLoggedOn()
00315 { QF_STACK_PUSH(Initiator::isLoggedOn)
00316
00317 Locker l(m_mutex);
00318
00319 SessionIDs connected = m_connected;
00320 SessionIDs::iterator i = connected.begin();
00321 for ( ; i != connected.end(); ++i )
00322 {
00323 if( Session::lookupSession(*i)->isLoggedOn() )
00324 return true;
00325 }
00326 return false;
00327
00328 QF_STACK_POP
00329 }
00330
00331 THREAD_PROC Initiator::startThread( void* p )
00332 { QF_STACK_TRY
00333 QF_STACK_PUSH(Initiator::startThread)
00334
00335 Initiator * pInitiator = static_cast < Initiator* > ( p );
00336 pInitiator->onStart();
00337 return 0;
00338
00339 QF_STACK_POP
00340 QF_STACK_CATCH
00341 }
00342 }