ThreadedSocketConnection.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "ThreadedSocketConnection.h"
00028 #include "ThreadedSocketAcceptor.h"
00029 #include "ThreadedSocketInitiator.h"
00030 #include "Session.h"
00031 #include "Utility.h"
00032
00033 namespace FIX
00034 {
00035 ThreadedSocketConnection::ThreadedSocketConnection
00036 ( int s, Sessions sessions, Application& application, Log* pLog )
00037 : m_socket( s ), m_application( application ), m_pLog( pLog ),
00038 m_sessions( sessions ), m_pSession( 0 ),
00039 m_disconnect( false )
00040 {
00041 FD_ZERO( &m_fds );
00042 FD_SET( m_socket, &m_fds );
00043 }
00044
00045 ThreadedSocketConnection::ThreadedSocketConnection
00046 ( const SessionID& sessionID, int s,
00047 const std::string& address, short port,
00048 Application& application, Log* pLog )
00049 : m_socket( s ), m_address( address ), m_port( port ),
00050 m_application( application ), m_pLog( m_pLog ),
00051 m_pSession( Session::lookupSession( sessionID ) ),
00052 m_disconnect( false )
00053 {
00054 FD_ZERO( &m_fds );
00055 FD_SET( m_socket, &m_fds );
00056 if ( m_pSession ) m_pSession->setResponder( this );
00057 }
00058
00059 ThreadedSocketConnection::~ThreadedSocketConnection()
00060 {
00061 if ( m_pSession )
00062 {
00063 m_pSession->setResponder( 0 );
00064 Session::unregisterSession( m_pSession->getSessionID() );
00065 }
00066 }
00067
00068 bool ThreadedSocketConnection::send( const std::string& msg )
00069 { QF_STACK_PUSH(ThreadedSocketConnection::send)
00070 return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0;
00071 QF_STACK_POP
00072 }
00073
00074 bool ThreadedSocketConnection::connect()
00075 { QF_STACK_PUSH(ThreadedSocketConnection::connect)
00076 return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
00077 QF_STACK_POP
00078 }
00079
00080 void ThreadedSocketConnection::disconnect()
00081 { QF_STACK_PUSH(ThreadedSocketConnection::disconnect)
00082
00083 m_disconnect = true;
00084 socket_close( m_socket );
00085
00086 QF_STACK_POP
00087 }
00088
00089 bool ThreadedSocketConnection::read()
00090 { QF_STACK_PUSH(ThreadedSocketConnection::read)
00091
00092 struct timeval timeout = { 1, 0 };
00093 fd_set readset = m_fds;
00094
00095 try
00096 {
00097
00098 int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
00099
00100 if( result > 0 )
00101 {
00102
00103 int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00104 if ( size <= 0 ) { throw SocketRecvFailed( size ); }
00105 m_parser.addToStream( m_buffer, size );
00106 }
00107 else if( result == 0 && m_pSession )
00108 {
00109 m_pSession->next();
00110 }
00111 else if( result < 0 )
00112 {
00113 throw SocketRecvFailed( result );
00114 }
00115
00116 processStream();
00117 return true;
00118 }
00119 catch ( SocketRecvFailed& e )
00120 {
00121 if( m_disconnect )
00122 return false;
00123
00124 if( m_pSession )
00125 {
00126 m_pSession->getLog()->onEvent( e.what() );
00127 m_pSession->disconnect();
00128 }
00129 else
00130 {
00131 disconnect();
00132 }
00133
00134 return false;
00135 }
00136
00137 QF_STACK_POP
00138 }
00139
00140 bool ThreadedSocketConnection::readMessage( std::string& msg )
00141 throw( SocketRecvFailed )
00142 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage)
00143
00144 try
00145 {
00146 return m_parser.readFixMessage( msg );
00147 }
00148 catch ( MessageParseError& ) {}
00149 return true;
00150
00151 QF_STACK_POP
00152 }
00153
00154 void ThreadedSocketConnection::processStream()
00155 { QF_STACK_PUSH(ThreadedSocketConnection::processStream)
00156
00157 std::string msg;
00158 while( readMessage(msg) )
00159 {
00160 if ( !m_pSession )
00161 {
00162 if ( !setSession( msg ) )
00163 { disconnect(); continue; }
00164 }
00165 try
00166 {
00167 m_pSession->next( msg, UtcTimeStamp() );
00168 }
00169 catch( InvalidMessage& )
00170 {
00171 if( !m_pSession->isLoggedOn() )
00172 {
00173 disconnect();
00174 return;
00175 }
00176 }
00177 }
00178
00179 QF_STACK_POP
00180 }
00181
00182 bool ThreadedSocketConnection::setSession( const std::string& msg )
00183 { QF_STACK_PUSH(ThreadedSocketConnection::setSession)
00184
00185 m_pSession = Session::lookupSession( msg, true );
00186 if ( !m_pSession )
00187 {
00188 if( m_pLog )
00189 {
00190 m_pLog->onEvent( "Session not found for incoming message: " + msg );
00191 m_pLog->onIncoming( msg );
00192 }
00193 return false;
00194 }
00195
00196 SessionID sessionID = m_pSession->getSessionID();
00197 m_pSession = 0;
00198
00199
00200 for( int i = 1; i <= 5; i++ )
00201 {
00202 if( !Session::isSessionRegistered( sessionID ) )
00203 m_pSession = Session::registerSession( sessionID );
00204 if( m_pSession ) break;
00205 process_sleep( 1 );
00206 }
00207
00208 if ( !m_pSession )
00209 return false;
00210 if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
00211 return false;
00212
00213 m_pSession->setResponder( this );
00214 return true;
00215
00216 QF_STACK_POP
00217 }
00218
00219 }