Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

PostgreSQLStore.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #ifdef HAVE_POSTGRESQL
00028 
00029 #include "PostgreSQLStore.h"
00030 #include "SessionID.h"
00031 #include "SessionSettings.h"
00032 #include "FieldConvertors.h"
00033 #include "Parser.h"
00034 #include "Utility.h"
00035 #include "strptime.h"
00036 #include <fstream>
00037 
00038 namespace FIX
00039 {
00040 
00041 const std::string PostgreSQLStoreFactory::DEFAULT_DATABASE = "quickfix";
00042 const std::string PostgreSQLStoreFactory::DEFAULT_USER = "postgres";
00043 const std::string PostgreSQLStoreFactory::DEFAULT_PASSWORD = "";
00044 const std::string PostgreSQLStoreFactory::DEFAULT_HOST = "localhost";
00045 const short PostgreSQLStoreFactory::DEFAULT_PORT = 0;
00046 
00047 PostgreSQLStore::PostgreSQLStore
00048 ( const SessionID& s, const DatabaseConnectionID& d, PostgreSQLConnectionPool* p )
00049 : m_pConnectionPool( p ), m_sessionID( s )
00050 {
00051   m_pConnection = m_pConnectionPool->create( d );
00052   populateCache();
00053 }
00054 
00055 PostgreSQLStore::PostgreSQLStore
00056 ( const SessionID& s, const std::string& database, const std::string& user,
00057   const std::string& password, const std::string& host, short port )
00058   : m_pConnectionPool( 0 ), m_sessionID( s )
00059 {
00060   m_pConnection = new PostgreSQLConnection( database, user, password, host, port );
00061   populateCache();
00062 }
00063 
00064 PostgreSQLStore::~PostgreSQLStore()
00065 {
00066   if( m_pConnectionPool )
00067     m_pConnectionPool->destroy( m_pConnection );
00068   else
00069     delete m_pConnection;
00070 }
00071 
00072 void PostgreSQLStore::populateCache()
00073 { QF_STACK_PUSH(PostgreSQLStore::populateCache)
00074 
00075   std::stringstream queryString;
00076 
00077   queryString << "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM sessions WHERE "
00078   << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00079   << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00080   << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00081   << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00082 
00083   PostgreSQLQuery query( queryString.str() );
00084   if( !m_pConnection->execute(query) )
00085     throw ConfigError( "No entries found for session in database" );
00086 
00087   int rows = query.rows();
00088   if( rows > 1 )
00089     throw ConfigError( "Multiple entries found for session in database" );
00090 
00091   if( rows == 1 )
00092   {
00093     struct tm time;
00094     std::string sqlTime = query.getValue( 0, 0 );
00095     strptime( sqlTime.c_str(), "%Y-%m-%d %H:%M:%S", &time );
00096     m_cache.setCreationTime (UtcTimeStamp (&time));
00097     m_cache.setNextTargetMsgSeqNum( atol( query.getValue( 0, 1 ) ) );
00098     m_cache.setNextSenderMsgSeqNum( atol( query.getValue( 0, 2 ) ) );
00099   }
00100   else
00101   {
00102     UtcTimeStamp time = m_cache.getCreationTime();
00103     char sqlTime[ 20 ];
00104     int year, month, day, hour, minute, second, millis;
00105     time.getYMD (year, month, day);
00106     time.getHMS (hour, minute, second, millis);
00107     STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00108              year, month, day, hour, minute, second );
00109     std::stringstream queryString2;
00110     queryString2 << "INSERT INTO sessions (beginstring, sendercompid, targetcompid, session_qualifier,"
00111     << "creation_time, incoming_seqnum, outgoing_seqnum) VALUES("
00112     << "'" << m_sessionID.getBeginString().getValue() << "',"
00113     << "'" << m_sessionID.getSenderCompID().getValue() << "',"
00114     << "'" << m_sessionID.getTargetCompID().getValue() << "',"
00115     << "'" << m_sessionID.getSessionQualifier() << "',"
00116     << "'" << sqlTime << "',"
00117     << m_cache.getNextTargetMsgSeqNum() << ","
00118     << m_cache.getNextSenderMsgSeqNum() << ")";
00119 
00120     PostgreSQLQuery query2( queryString2.str() );
00121     if( !m_pConnection->execute(query2) )
00122       throw ConfigError( "Unable to create session in database" );
00123   }
00124 
00125   QF_STACK_POP
00126 }
00127 
00128 MessageStore* PostgreSQLStoreFactory::create( const SessionID& s )
00129 { QF_STACK_PUSH(PostgreSQLStoreFactory::create)
00130 
00131   if( m_useSettings )
00132     return create( s, m_settings.get(s) );
00133   else if( m_useDictionary )
00134     return create( s, m_dictionary );
00135   else
00136   {
00137     DatabaseConnectionID id( m_database, m_user, m_password, m_host, m_port );
00138     return new PostgreSQLStore( s, id, m_connectionPoolPtr.get() );
00139   }
00140 
00141   QF_STACK_POP
00142 }
00143 
00144 MessageStore* PostgreSQLStoreFactory::create( const SessionID& s, const Dictionary& settings )
00145 { QF_STACK_PUSH(PostgreSQLStoreFactory::create)
00146 
00147   std::string database = DEFAULT_DATABASE;
00148   std::string user = DEFAULT_USER;
00149   std::string password = DEFAULT_PASSWORD;
00150   std::string host = DEFAULT_HOST;
00151   short port = DEFAULT_PORT;
00152 
00153   try { database = settings.getString( POSTGRESQL_STORE_DATABASE ); }
00154   catch( ConfigError& ) {}
00155 
00156   try { user = settings.getString( POSTGRESQL_STORE_USER ); }
00157   catch( ConfigError& ) {}
00158 
00159   try { password = settings.getString( POSTGRESQL_STORE_PASSWORD ); }
00160   catch( ConfigError& ) {}
00161 
00162   try { host = settings.getString( POSTGRESQL_STORE_HOST ); }
00163   catch( ConfigError& ) {}
00164 
00165   try { port = ( short ) settings.getLong( POSTGRESQL_STORE_PORT ); }
00166   catch( ConfigError& ) {}
00167 
00168   DatabaseConnectionID id( database, user, password, host, port );
00169   return new PostgreSQLStore( s, id, m_connectionPoolPtr.get() );
00170 
00171   QF_STACK_POP
00172 }
00173 
00174 void PostgreSQLStoreFactory::destroy( MessageStore* pStore )
00175 { QF_STACK_PUSH(PostgreSQLStoreFactory::destroy)
00176   delete pStore;
00177   QF_STACK_POP
00178 }
00179 
00180 bool PostgreSQLStore::set( int msgSeqNum, const std::string& msg )
00181 throw ( IOException )
00182 { QF_STACK_PUSH(PostgreSQLStore::set)
00183 
00184   char* msgCopy = new char[ (msg.size() * 2) + 1 ];
00185   PQescapeString( msgCopy, msg.c_str(), msg.size() );
00186 
00187   std::stringstream queryString;
00188   queryString << "INSERT INTO messages "
00189   << "(beginstring, sendercompid, targetcompid, session_qualifier, msgseqnum, message) "
00190   << "VALUES ("
00191   << "'" << m_sessionID.getBeginString().getValue() << "',"
00192   << "'" << m_sessionID.getSenderCompID().getValue() << "',"
00193   << "'" << m_sessionID.getTargetCompID().getValue() << "',"
00194   << "'" << m_sessionID.getSessionQualifier() << "',"
00195   << msgSeqNum << ","
00196   << "'" << msgCopy << "')";
00197 
00198   delete [] msgCopy;
00199 
00200   PostgreSQLQuery query( queryString.str() );
00201   if( !m_pConnection->execute(query) )
00202   {
00203     std::stringstream queryString2;
00204     queryString2 << "UPDATE messages SET message='" << msg << "' WHERE "
00205     << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00206     << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00207     << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00208     << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "' and "
00209     << "msgseqnum=" << msgSeqNum;
00210     PostgreSQLQuery query2( queryString2.str() );
00211     if( !m_pConnection->execute(query2) )
00212       query2.throwException();
00213   }
00214 
00215   return true;
00216 
00217   QF_STACK_POP
00218 }
00219 
00220 void PostgreSQLStore::get( int begin, int end,
00221                       std::vector < std::string > & result ) const
00222 throw ( IOException )
00223 { QF_STACK_PUSH(PostgreSQLStore::get)
00224 
00225   result.clear();
00226   std::stringstream queryString;
00227   queryString << "SELECT message FROM messages WHERE "
00228   << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00229   << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00230   << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00231   << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "' and "
00232   << "msgseqnum>=" << begin << " and " << "msgseqnum<=" << end << " "
00233   << "ORDER BY msgseqnum";
00234 
00235   PostgreSQLQuery query( queryString.str() );
00236   if( !m_pConnection->execute(query) )
00237     query.throwException();
00238 
00239   int rows = query.rows();
00240   for( int row = 0; row < rows; row++ )
00241     result.push_back( query.getValue( row, 0 ) );
00242 
00243   QF_STACK_POP
00244 }
00245 
00246 int PostgreSQLStore::getNextSenderMsgSeqNum() const throw ( IOException )
00247 { QF_STACK_PUSH(PostgreSQLStore::getNextSenderMsgSeqNum)
00248   return m_cache.getNextSenderMsgSeqNum();
00249   QF_STACK_POP
00250 }
00251 
00252 int PostgreSQLStore::getNextTargetMsgSeqNum() const throw ( IOException )
00253 { QF_STACK_PUSH(PostgreSQLStore::getNextTargetMsgSeqNum)
00254   return m_cache.getNextTargetMsgSeqNum();
00255   QF_STACK_POP
00256 }
00257 
00258 void PostgreSQLStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00259 { QF_STACK_PUSH(PostgreSQLStore::setNextSenderMsgSeqNum)
00260 
00261   std::stringstream queryString;
00262   queryString << "UPDATE sessions SET outgoing_seqnum=" << value << " WHERE "
00263   << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00264   << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00265   << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00266   << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00267 
00268   PostgreSQLQuery query( queryString.str() );
00269   if( !m_pConnection->execute(query) )
00270     query.throwException();
00271 
00272   m_cache.setNextSenderMsgSeqNum( value );
00273 
00274   QF_STACK_POP
00275 }
00276 
00277 void PostgreSQLStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00278 { QF_STACK_PUSH(PostgreSQLStore::setNextTargetMsgSeqNum)
00279 
00280   std::stringstream queryString;
00281   queryString << "UPDATE sessions SET incoming_seqnum=" << value << " WHERE "
00282   << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00283   << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00284   << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00285   << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00286 
00287   PostgreSQLQuery query( queryString.str() );
00288   if( !m_pConnection->execute(query) )
00289     query.throwException();
00290 
00291   m_cache.setNextTargetMsgSeqNum( value );
00292 
00293   QF_STACK_POP
00294 }
00295 
00296 void PostgreSQLStore::incrNextSenderMsgSeqNum() throw ( IOException )
00297 { QF_STACK_PUSH(PostgreSQLStore::incrNextSenderMsgSeqNum)
00298   m_cache.incrNextSenderMsgSeqNum();
00299   setNextSenderMsgSeqNum( m_cache.getNextSenderMsgSeqNum() );
00300   QF_STACK_POP
00301 }
00302 
00303 void PostgreSQLStore::incrNextTargetMsgSeqNum() throw ( IOException )
00304 { QF_STACK_PUSH(PostgreSQLStore::incrNextTargetMsgSeqNum)
00305   m_cache.incrNextTargetMsgSeqNum();
00306   setNextTargetMsgSeqNum( m_cache.getNextTargetMsgSeqNum() );
00307   QF_STACK_POP
00308 }
00309 
00310 UtcTimeStamp PostgreSQLStore::getCreationTime() const throw ( IOException )
00311 { QF_STACK_PUSH(PostgreSQLStore::getCreationTime)
00312   return m_cache.getCreationTime();
00313   QF_STACK_POP
00314 }
00315 
00316 void PostgreSQLStore::reset() throw ( IOException )
00317 { QF_STACK_PUSH(PostgreSQLStore::reset)
00318 
00319   std::stringstream queryString;
00320   queryString << "DELETE FROM messages WHERE "
00321   << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00322   << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00323   << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00324   << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00325 
00326   PostgreSQLQuery query( queryString.str() );
00327   if( !m_pConnection->execute(query) )
00328     query.throwException();
00329 
00330   m_cache.reset();
00331   UtcTimeStamp time = m_cache.getCreationTime();
00332 
00333   int year, month, day, hour, minute, second, millis;
00334   time.getYMD( year, month, day );
00335   time.getHMS( hour, minute, second, millis );
00336 
00337   char sqlTime[ 20 ];
00338   STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00339            year, month, day, hour, minute, second );
00340 
00341   std::stringstream queryString2;
00342   queryString2 << "UPDATE sessions SET creation_time='" << sqlTime << "', "
00343   << "incoming_seqnum=" << m_cache.getNextTargetMsgSeqNum() << ", "
00344   << "outgoing_seqnum=" << m_cache.getNextSenderMsgSeqNum() << " WHERE "
00345   << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00346   << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00347   << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00348   << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00349 
00350   PostgreSQLQuery query2( queryString2.str() );
00351   if( !m_pConnection->execute(query2) )
00352     query2.throwException();
00353 
00354   QF_STACK_POP
00355 }
00356 
00357 void PostgreSQLStore::refresh() throw ( IOException )
00358 { QF_STACK_PUSH(PostgreSQLStore::refresh)
00359 
00360   m_cache.reset();
00361   populateCache(); 
00362 
00363   QF_STACK_POP
00364 }
00365 
00366 }
00367 
00368 #endif

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001