00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117 #ifdef HAVE_CONFIG_H
00118 # include "config.h"
00119 #endif
00120
00121 #ifdef HAVE_GETOPT
00122 # include <unistd.h>
00123 extern char* optarg;
00124 extern int optind;
00125 #else
00126 # include "getopt.h"
00127 #endif
00128
00129 #ifdef HAVE_IOSTREAM
00130 # include <iostream>
00131 #else
00132 # include <iostream.h>
00133 #endif
00134
00135 #ifdef HAVE_STD_IOSTREAM
00136 using namespace std;
00137 #endif
00138
00139 #ifdef HAVE_STDLIB_H
00140 # include <stdlib.h>
00141 #endif
00142
00143 #ifdef HAVE_SIGNAL_H
00144 # include <signal.h>
00145 #endif
00146
00147 #include <cstdio>
00148
00149 #include "CosEventComm.hh"
00150 #include "CosEventChannelAdmin.hh"
00151 #include "naming.h"
00152
00153 static omni_semaphore connect_cond(0);
00154 static void usage(int argc, char **argv);
00155
00156 class Supplier_i : virtual public POA_CosEventComm::PullSupplier {
00157 public:
00158 Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {};
00159 CORBA::Any *pull();
00160 CORBA::Any *try_pull(CORBA::Boolean &has_event);
00161 void disconnect_pull_supplier ();
00162
00163 private:
00164 long i;
00165 long _disconnect;
00166 CORBA::ULong l;
00167 };
00168
00169 void
00170 Supplier_i::disconnect_pull_supplier () {
00171 cout << "Pull Supplier: disconnected by channel." << endl;
00172 }
00173
00174 CORBA::Any *
00175 Supplier_i::pull() {
00176 cout << "Pull Supplier: pull() called. Data : ";
00177 CORBA::Any *any = new CORBA::Any();
00178 *any <<= l++;
00179 cout << l-1 << endl;
00180
00181
00182 if ((_disconnect > 0) && (i == _disconnect)) {
00183 i = 0;
00184
00185 connect_cond.post();
00186 }
00187 i++;
00188 return (any);
00189 }
00190
00191 CORBA::Any *
00192 Supplier_i::try_pull(CORBA::Boolean &has_event)
00193 {
00194 cout << "Pull Supplier: try_pull() called. Data : ";
00195 CORBA::Any *any = new CORBA::Any();
00196 *any <<= l++;
00197 cout << l-1 << endl;
00198 has_event = 1;
00199
00200
00201 if ((_disconnect > 0) && (i == _disconnect)) {
00202 i = 0;
00203
00204 connect_cond.post();
00205 }
00206 i++;
00207 return (any);
00208 }
00209
00210
00211 int
00212 main (int argc, char** argv)
00213 {
00214 #if defined(HAVE_OMNIORB4)
00215 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4");
00216 #else
00217 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3");
00218 #endif
00219
00220
00221 int discnum =0;
00222 int sleepInterval =0;
00223 const char* channelName ="EventChannel";
00224
00225 int c;
00226 while ((c = getopt(argc,argv,"d:s:n:h")) != EOF)
00227 {
00228 switch (c)
00229 {
00230 case 'd': discnum = atoi(optarg);
00231 break;
00232
00233 case 's': sleepInterval = atoi(optarg);
00234 break;
00235
00236 case 'n': channelName = optarg;
00237 break;
00238
00239 case 'h':
00240 default : usage(argc,argv);
00241 exit(-1);
00242 break;
00243 }
00244 }
00245
00246 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00247
00248 signal(SIGPIPE, SIG_IGN);
00249 #endif
00250
00251 Supplier_i* supplier = new Supplier_i (discnum);
00252 CosEventChannelAdmin::EventChannel_var channel;
00253
00254 const char* action="";
00255 try {
00256 CORBA::Object_var obj;
00257
00258 action="resolve initial reference 'RootPOA'";
00259 obj=orb->resolve_initial_references("RootPOA");
00260 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00261 if(CORBA::is_nil(rootPoa))
00262 throw CORBA::OBJECT_NOT_EXIST();
00263
00264 action="activate the RootPOA's POAManager";
00265 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00266 pman->activate();
00267
00268
00269
00270
00271 if(optind<argc)
00272 {
00273 action="convert URI from command line into object reference";
00274 obj=orb->string_to_object(argv[optind]);
00275 }
00276 else
00277 {
00278 action="resolve initial reference 'NameService'";
00279 obj=orb->resolve_initial_references("NameService");
00280 CosNaming::NamingContext_var rootContext=
00281 CosNaming::NamingContext::_narrow(obj);
00282 if(CORBA::is_nil(rootContext))
00283 throw CORBA::OBJECT_NOT_EXIST();
00284
00285 action="find EventChannel in NameService";
00286 cout << action << endl;
00287 obj=rootContext->resolve(str2name(channelName));
00288 }
00289
00290 action="narrow object reference to event channel";
00291 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00292 if(CORBA::is_nil(channel))
00293 {
00294 cerr << "Failed to narrow Event Channel reference." << endl;
00295 exit(1);
00296 }
00297
00298 }
00299 catch(CORBA::ORB::InvalidName& ex) {
00300 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00301 exit(1);
00302 }
00303 catch(CosNaming::NamingContext::InvalidName& ex) {
00304 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00305 exit(1);
00306 }
00307 catch(CosNaming::NamingContext::NotFound& ex) {
00308 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00309 exit(1);
00310 }
00311 catch(CosNaming::NamingContext::CannotProceed& ex) {
00312 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00313 exit(1);
00314 }
00315 catch(CORBA::TRANSIENT& ex) {
00316 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00317 exit(1);
00318 }
00319 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00320 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00321 exit(1);
00322 }
00323 catch(CORBA::SystemException& ex) {
00324 cerr<<"Failed to "<<action<<".";
00325 #if defined(HAVE_OMNIORB4)
00326 cerr<<" "<<ex._name();
00327 if(ex.NP_minorString())
00328 cerr<<" ("<<ex.NP_minorString()<<")";
00329 #endif
00330 cerr<<endl;
00331 exit(1);
00332 }
00333 catch(CORBA::Exception& ex) {
00334 cerr<<"Failed to "<<action<<"."
00335 #if defined(HAVE_OMNIORB4)
00336 " "<<ex._name()
00337 #endif
00338 <<endl;
00339 exit(1);
00340 }
00341
00342
00343
00344 CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
00345 while (1)
00346 {
00347 try {
00348 supplier_admin = channel->for_suppliers ();
00349 if (CORBA::is_nil(supplier_admin))
00350 {
00351 cerr << "Event Channel returned nil Supplier Admin!"
00352 << endl;
00353 exit(1);
00354 }
00355 break;
00356 }
00357 catch (CORBA::COMM_FAILURE& ex) {
00358 cerr << "Caught COMM_FAILURE exception "
00359 << "obtaining Supplier Admin! Retrying..."
00360 << endl;
00361 continue;
00362 }
00363 }
00364 cout << "Obtained SupplierAdmin." << endl;
00365
00366 while (1)
00367 {
00368
00369
00370 CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer;
00371 while (1)
00372 {
00373 try {
00374 proxy_consumer = supplier_admin->obtain_pull_consumer ();
00375 if (CORBA::is_nil(proxy_consumer))
00376 {
00377 cerr << "Supplier Admin returned nil proxy_consumer!"
00378 << endl;
00379 exit(1);
00380 }
00381 break;
00382 }
00383 catch (CORBA::COMM_FAILURE& ex) {
00384 cerr << "Caught COMM_FAILURE exception "
00385 << "obtaining Proxy Pull Consumer! Retrying..."
00386 << endl;
00387 continue;
00388 }
00389 }
00390 cout << "Obtained ProxyPullConsumer." << endl;
00391
00392
00393 CosEventComm::PullSupplier_var supplierRef =supplier->_this();
00394 while (1)
00395 {
00396 try {
00397 proxy_consumer->connect_pull_supplier(supplierRef.in());
00398 break;
00399 }
00400 catch (CORBA::BAD_PARAM& ex) {
00401 cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl;
00402 exit(1);
00403 }
00404 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00405 cerr << "Pull Supplier already connected!"
00406 << endl;
00407 break;
00408 }
00409 catch (CORBA::COMM_FAILURE& ex) {
00410 cerr << "Caught COMM_FAILURE exception "
00411 << "connecting Pull Supplier! Retrying..."
00412 << endl;
00413 continue;
00414 }
00415 }
00416 cout << "Connected Pull Supplier." << endl;
00417
00418
00419 connect_cond.wait();
00420
00421
00422 while (1)
00423 {
00424 try {
00425 proxy_consumer->disconnect_pull_consumer();
00426 break;
00427 }
00428 catch (CORBA::COMM_FAILURE& ex) {
00429 cerr << "Caught COMM_FAILURE exception "
00430 << "disconnecting Pull Supplier! Retrying..."
00431 << endl;
00432 continue;
00433 }
00434 }
00435 cout << "Disconnected Pull Supplier." << endl;
00436
00437
00438 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00439 omni_thread::sleep(sleepInterval);
00440 }
00441
00442
00443 return 0;
00444 }
00445
00446 static void
00447 usage(int argc, char **argv)
00448 {
00449 cerr<<
00450 "\nCreate a PullSupplier to send events to a channel.\n"
00451 "syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n"
00452 "\n"
00453 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00454 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00455 "\n"
00456 "OPTIONS: DEFAULT:\n"
00457 " -d NUM disconnect after sending NUM events [0 - never disconnect]\n"
00458 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00459 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00460 " -h display this help text\n" << endl;
00461 }