00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "EventChannel.h"
00025 #include "ConsumerAdmin.h"
00026 #include "SupplierAdmin.h"
00027 #include "omniEventsLog.h"
00028 #include "Orb.h"
00029
00030 #include <list>
00031
00032 namespace OmniEvents {
00033
00034
00035 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
00036 {
00037 if(!_consumerAdmin || _shutdownRequested)
00038 throw CORBA::OBJECT_NOT_EXIST();
00039 return _consumerAdmin->_this();
00040 }
00041
00042
00043 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
00044 {
00045 if(!_supplierAdmin || _shutdownRequested)
00046 throw CORBA::OBJECT_NOT_EXIST();
00047 return _supplierAdmin->_this();
00048 }
00049
00050
00051 void EventChannel_i::destroy()
00052 {
00053 if(_shutdownRequested)
00054 throw CORBA::OBJECT_NOT_EXIST();
00055
00056
00057
00058 if(_consumerAdmin)
00059 _consumerAdmin->disconnect();
00060 if(_supplierAdmin)
00061 _supplierAdmin->disconnect();
00062 if(_mapper)
00063 _mapper->destroy();
00064
00065
00066 _shutdownRequested=true;
00067
00068
00069
00070 }
00071
00072
00073 EventChannel_i::EventChannel_i(EventChannelStore* store)
00074 : Servant(PortableServer::POA::_nil()),
00075 _eventChannelStore(store),
00076 _consumerAdmin(NULL),
00077 _supplierAdmin(NULL),
00078 _poaManager(),
00079 _shutdownRequested(false),
00080 _properties(),
00081 _mapper(NULL)
00082 {}
00083
00084
00085 void EventChannel_i::activate(
00086 const char* channelName,
00087 const PersistNode* node
00088 )
00089 {
00090
00091
00092
00093 createPoa(channelName);
00094
00095 if(node)
00096 _properties._attr=node->_attr;
00097
00098
00099 _consumerAdmin=new ConsumerAdmin_i(*this,_poa);
00100
00101
00102 _supplierAdmin=new SupplierAdmin_i(*this,_poa);
00103
00104 if(node)
00105 {
00106 PersistNode* saNode =node->child("SupplierAdmin");
00107 if(saNode)
00108 _supplierAdmin->reincarnate(*saNode);
00109
00110 PersistNode* caNode =node->child("ConsumerAdmin");
00111 if(caNode)
00112 _consumerAdmin->reincarnate(*caNode);
00113 }
00114
00115 activateObjectWithId("EventChannel");
00116
00117
00118 setInsName(_properties.attrString("InsName"));
00119 }
00120
00121
00122 EventChannel_i::~EventChannel_i()
00123 {
00124 if(CORBA::is_nil(_poa))
00125 {
00126 DB(20,"~EventChannel_i()")
00127 }
00128 else
00129 {
00130 DB(20,"~EventChannel_i() - destroying POA")
00131 try {
00132 _poa->destroy(
00133 CORBA::Boolean(1) ,
00134 CORBA::Boolean(0)
00135 );
00136 }
00137 catch(...) {
00138 DB(2,"~EventChannel_i() - ERROR destroying POA")
00139 }
00140 _poa=PortableServer::POA::_nil();
00141 }
00142 }
00143
00144
00145 void EventChannel_i::run(void* arg)
00146 {
00147
00148 assert(!CORBA::is_nil(_poa));
00149
00150 try
00151 {
00152
00153 if(_eventChannelStore)
00154 {
00155 _eventChannelStore->insert(this);
00156 output(WriteLock().os);
00157 }
00158
00159
00160 mainLoop();
00161
00162
00163 if(_eventChannelStore)
00164 {
00165 _eventChannelStore->erase(this);
00166 CORBA::String_var poaName =_poa->the_name();
00167 WriteLock log;
00168 log.os<<"-ecf/"<<poaName.in()<<'\n';
00169 }
00170
00171 _poa->destroy(
00172 CORBA::Boolean(1) ,
00173 CORBA::Boolean(1)
00174 );
00175 _poaManager->deactivate(
00176 CORBA::Boolean(1) ,
00177 CORBA::Boolean(1)
00178 );
00179 _poa=PortableServer::POA::_nil();
00180
00181 }
00182 catch(PortableServer::POAManager::AdapterInactive& ex)
00183 {
00184 DB(0,"EventChannel_i::run() - POA deactivated from the outside.")
00185 Orb::inst().reportObjectFailure(HERE,_this(),&ex);
00186 }
00187 catch (CORBA::Exception& ex) {
00188 Orb::inst().reportObjectFailure(HERE,_this(),&ex);
00189 }
00190 catch(...)
00191 {
00192 Orb::inst().reportObjectFailure(HERE,_this(),NULL);
00193 }
00194
00195
00196
00197 }
00198
00199
00200 void EventChannel_i::mainLoop()
00201 {
00202 _poaManager->activate();
00203 unsigned long localCyclePeriod_ns=cyclePeriod_ns();
00204 while(!_shutdownRequested)
00205 {
00206
00207
00208 _poaManager->hold_requests(CORBA::Boolean(1) );
00209
00210 list<CORBA::Any*> events;
00211 _supplierAdmin->collect(events);
00212 _consumerAdmin->send(events);
00213 assert(events.empty());
00214
00215 _poaManager->activate();
00216
00217
00218
00219
00220 omni_thread::sleep(0,localCyclePeriod_ns);
00221 }
00222 }
00223
00224
00225 void EventChannel_i::output(ostream& os)
00226 {
00227 CORBA::String_var poaName =_poa->the_name();
00228 string name =string("ecf/")+poaName.in();
00229 _properties.output(os,name);
00230 if(_supplierAdmin)
00231 _supplierAdmin->output(os);
00232 if(_consumerAdmin)
00233 _consumerAdmin->output(os);
00234 }
00235
00236
00237 void EventChannel_i::setInsName(const string v)
00238 {
00239 Mapper* newMapper =NULL;
00240 try
00241 {
00242
00243
00244
00245 if(!v.empty())
00246 {
00247
00248 newMapper=new Mapper(v.c_str(),_this());
00249 }
00250
00251 if(_mapper)
00252 _mapper->destroy();
00253 _mapper=newMapper;
00254
00255 }
00256 catch(...)
00257 {
00258
00259 delete newMapper;
00260 throw;
00261 }
00262 }
00263
00264
00265 void EventChannel_i::createPoa(const char* channelName)
00266 {
00267 using namespace PortableServer;
00268 POA_ptr p=Orb::inst()._RootPOA.in();
00269 try
00270 {
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280 CORBA::PolicyList policies;
00281 policies.length(3);
00282 policies[0]=p->create_lifespan_policy(PERSISTENT);
00283 policies[1]=p->create_id_assignment_policy(USER_ID);
00284 policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00285
00286
00287
00288 _poa=p->create_POA(channelName,POAManager::_nil(),policies);
00289 _poaManager=_poa->the_POAManager();
00290 }
00291 catch(POA::AdapterAlreadyExists& ex)
00292 {
00293 DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
00294 throw;
00295 }
00296 catch(POA::InvalidPolicy& ex)
00297 {
00298 DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
00299 throw;
00300 }
00301 }
00302
00303
00304
00305
00306
00307
00308
00309 EventChannelStore::EventChannelStore()
00310 :_channels(),_lock()
00311 {}
00312
00313 EventChannelStore::~EventChannelStore()
00314 {
00315
00316 }
00317
00318 void EventChannelStore::insert(EventChannel_i* channel)
00319 {
00320 omni_mutex_lock l(_lock);
00321 bool insertOK =_channels.insert(channel).second;
00322 if(!insertOK)
00323 DB(2,"Attempted to store an EventChannel, when it is already stored.");
00324 }
00325
00326 void EventChannelStore::erase(EventChannel_i* channel)
00327 {
00328 omni_mutex_lock l(_lock);
00329 set<EventChannel_i*>::iterator pos =_channels.find(channel);
00330 if(pos==_channels.end())
00331 DB(2,"Failed to erase unknown EventChannel.")
00332 else
00333 _channels.erase(pos);
00334 }
00335
00336 void EventChannelStore::output(ostream &os)
00337 {
00338 omni_mutex_lock l(_lock);
00339 for(set<EventChannel_i*>::iterator i=_channels.begin();
00340 i!=_channels.end();
00341 ++i)
00342 {
00343 (*i)->output(os);
00344 }
00345 }
00346
00347
00348 };
00349