00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00049 #ifndef __vtkMultiProcessController_h
00050 #define __vtkMultiProcessController_h
00051
00052 #include "vtkObject.h"
00053
00054 #include "vtkCommunicator.h"
00055
00056 class vtkDataSet;
00057 class vtkImageData;
00058 class vtkCollection;
00059 class vtkOutputWindow;
00060 class vtkDataObject;
00061 class vtkMultiProcessController;
00062
00063
00064
00065 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller,
00066 void *userData);
00067
00068
00069 typedef void (*vtkRMIFunctionType)(void *localArg,
00070 void *remoteArg, int remoteArgLength,
00071 int remoteProcessId);
00072
00073
00074
00075 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00076 {
00077 public:
00078 static vtkMultiProcessController *New();
00079 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00080 void PrintSelf(ostream& os, vtkIndent indent);
00081
00085 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00086
00088
00091 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00092 int initializedExternally)=0;
00094
00097 virtual void Finalize()=0;
00098
00102 virtual void Finalize(int finalizedExternally)=0;
00103
00105
00108 virtual void SetNumberOfProcesses(int num);
00109 vtkGetMacro( NumberOfProcesses, int );
00111
00112
00114
00117 void SetSingleMethod(vtkProcessFunctionType, void *data);
00118
00120
00124 virtual void SingleMethodExecute() = 0;
00125
00126
00128
00132 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data);
00133
00135
00139 virtual void MultipleMethodExecute() = 0;
00140
00142 virtual int GetLocalProcessId() { return this->LocalProcessId; }
00143
00148 static vtkMultiProcessController *GetGlobalController();
00149
00152 virtual void CreateOutputWindow() = 0;
00153
00155
00160 vtkSetMacro(ForceDeepCopy, int);
00161 vtkGetMacro(ForceDeepCopy, int);
00162 vtkBooleanMacro(ForceDeepCopy, int);
00164
00165
00166
00167
00173 void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00174
00176
00177 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00178 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00179
00181
00183 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00184
00187 void TriggerBreakRMIs();
00188
00190
00191 void TriggerRMI(int remoteProcessId, char *arg, int tag)
00192 { this->TriggerRMI(remoteProcessId, (void*)arg,
00193 static_cast<int>(strlen(arg))+1, tag); }
00195
00197
00198 void TriggerRMI(int remoteProcessId, int tag)
00199 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00201
00204 void ProcessRMIs();
00205
00207
00210 vtkSetMacro(BreakFlag, int);
00211 vtkGetMacro(BreakFlag, int);
00213
00215 vtkGetObjectMacro(Communicator, vtkCommunicator);
00217
00218
00219
00220 enum Consts {
00221 MAX_PROCESSES=8192,
00222 ANY_SOURCE=-1,
00223 INVALID_SOURCE=-2,
00224 RMI_TAG=315167,
00225 RMI_ARG_TAG=315168,
00226 BREAK_RMI_TAG=239954
00227 };
00228
00229
00230
00232 virtual void Barrier() = 0;
00233
00234 static void SetGlobalController(vtkMultiProcessController *controller);
00235
00236
00237
00239
00241 int Send(int* data, int length, int remoteProcessId, int tag);
00242 int Send(unsigned long* data, int length, int remoteProcessId,
00243 int tag);
00244 int Send(char* data, int length, int remoteProcessId, int tag);
00245 int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00246 int Send(float* data, int length, int remoteProcessId, int tag);
00247 int Send(double* data, int length, int remoteProcessId, int tag);
00248 #ifdef VTK_USE_64BIT_IDS
00249 int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00251 #endif
00252 int Send(vtkDataObject *data, int remoteId, int tag);
00253 int Send(vtkDataArray *data, int remoteId, int tag);
00254
00256
00259 int Receive(int* data, int length, int remoteProcessId, int tag);
00260 int Receive(unsigned long* data, int length, int remoteProcessId,
00261 int tag);
00262 int Receive(char* data, int length, int remoteProcessId, int tag);
00263 int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00264 int Receive(float* data, int length, int remoteProcessId, int tag);
00265 int Receive(double* data, int length, int remoteProcessId, int tag);
00266 #ifdef VTK_USE_64BIT_IDS
00267 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00269 #endif
00270 int Receive(vtkDataObject* data, int remoteId, int tag);
00271 int Receive(vtkDataArray* data, int remoteId, int tag);
00272
00273
00274
00275 protected:
00276 vtkMultiProcessController();
00277 ~vtkMultiProcessController();
00278
00279 int MaximumNumberOfProcesses;
00280 int NumberOfProcesses;
00281
00282 int LocalProcessId;
00283
00284 vtkProcessFunctionType SingleMethod;
00285 void *SingleData;
00286 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES];
00287 void *MultipleData[MAX_PROCESSES];
00288
00289 vtkCollection *RMIs;
00290
00291
00292
00293 int BreakFlag;
00294
00295 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00296
00297
00298
00299 virtual vtkMultiProcessController *GetLocalController();
00300
00301
00302
00303 int ForceDeepCopy;
00304
00305 vtkOutputWindow* OutputWindow;
00306
00307
00308
00309
00310 vtkCommunicator* Communicator;
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320 vtkCommunicator* RMICommunicator;
00321
00322 private:
00323 vtkMultiProcessController(const vtkMultiProcessController&);
00324 void operator=(const vtkMultiProcessController&);
00325 };
00326
00327
00328 inline int vtkMultiProcessController::Send(vtkDataObject *data,
00329 int remoteThreadId, int tag)
00330 {
00331 if (this->Communicator)
00332 {
00333 return this->Communicator->Send(data, remoteThreadId, tag);
00334 }
00335 else
00336 {
00337 return 0;
00338 }
00339 }
00340
00341 inline int vtkMultiProcessController::Send(vtkDataArray *data,
00342 int remoteThreadId, int tag)
00343 {
00344 if (this->Communicator)
00345 {
00346 return this->Communicator->Send(data, remoteThreadId, tag);
00347 }
00348 else
00349 {
00350 return 0;
00351 }
00352 }
00353
00354 inline int vtkMultiProcessController::Send(int* data, int length,
00355 int remoteThreadId, int tag)
00356 {
00357 if (this->Communicator)
00358 {
00359 return this->Communicator->Send(data, length, remoteThreadId, tag);
00360 }
00361 else
00362 {
00363 return 0;
00364 }
00365 }
00366
00367 inline int vtkMultiProcessController::Send(unsigned long* data,
00368 int length, int remoteThreadId,
00369 int tag)
00370 {
00371 if (this->Communicator)
00372 {
00373 return this->Communicator->Send(data, length, remoteThreadId, tag);
00374 }
00375 else
00376 {
00377 return 0;
00378 }
00379 }
00380
00381 inline int vtkMultiProcessController::Send(char* data, int length,
00382 int remoteThreadId, int tag)
00383 {
00384 if (this->Communicator)
00385 {
00386 return this->Communicator->Send(data, length, remoteThreadId, tag);
00387 }
00388 else
00389 {
00390 return 0;
00391 }
00392 }
00393
00394 inline int vtkMultiProcessController::Send(unsigned char* data, int length,
00395 int remoteThreadId, int tag)
00396 {
00397 if (this->Communicator)
00398 {
00399 return this->Communicator->Send(data, length, remoteThreadId, tag);
00400 }
00401 else
00402 {
00403 return 0;
00404 }
00405 }
00406
00407 inline int vtkMultiProcessController::Send(float* data, int length,
00408 int remoteThreadId, int tag)
00409 {
00410 if (this->Communicator)
00411 {
00412 return this->Communicator->Send(data, length, remoteThreadId, tag);
00413 }
00414 else
00415 {
00416 return 0;
00417 }
00418 }
00419
00420 inline int vtkMultiProcessController::Send(double* data, int length,
00421 int remoteThreadId, int tag)
00422 {
00423 if (this->Communicator)
00424 {
00425 return this->Communicator->Send(data, length, remoteThreadId, tag);
00426 }
00427 else
00428 {
00429 return 0;
00430 }
00431 }
00432
00433 #ifdef VTK_USE_64BIT_IDS
00434 inline int vtkMultiProcessController::Send(vtkIdType* data, int length,
00435 int remoteThreadId, int tag)
00436 {
00437 if (this->Communicator)
00438 {
00439 return this->Communicator->Send(data, length, remoteThreadId, tag);
00440 }
00441 else
00442 {
00443 return 0;
00444 }
00445 }
00446 #endif
00447
00448 inline int vtkMultiProcessController::Receive(vtkDataObject* data,
00449 int remoteThreadId, int tag)
00450 {
00451 if (this->Communicator)
00452 {
00453 return this->Communicator->Receive(data, remoteThreadId, tag);
00454 }
00455 else
00456 {
00457 return 0;
00458 }
00459 }
00460
00461 inline int vtkMultiProcessController::Receive(vtkDataArray* data,
00462 int remoteThreadId, int tag)
00463 {
00464 if (this->Communicator)
00465 {
00466 return this->Communicator->Receive(data, remoteThreadId, tag);
00467 }
00468 else
00469 {
00470 return 0;
00471 }
00472 }
00473
00474 inline int vtkMultiProcessController::Receive(int* data, int length,
00475 int remoteThreadId, int tag)
00476 {
00477 if (this->Communicator)
00478 {
00479 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00480 }
00481 else
00482 {
00483 return 0;
00484 }
00485 }
00486
00487 inline int vtkMultiProcessController::Receive(unsigned long* data,
00488 int length,int remoteThreadId,
00489 int tag)
00490 {
00491 if (this->Communicator)
00492 {
00493 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00494 }
00495 else
00496 {
00497 return 0;
00498 }
00499 }
00500
00501 inline int vtkMultiProcessController::Receive(char* data, int length,
00502 int remoteThreadId, int tag)
00503 {
00504 if (this->Communicator)
00505 {
00506 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00507 }
00508 else
00509 {
00510 return 0;
00511 }
00512 }
00513
00514 inline int vtkMultiProcessController::Receive(unsigned char* data, int length,
00515 int remoteThreadId, int tag)
00516 {
00517 if (this->Communicator)
00518 {
00519 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00520 }
00521 else
00522 {
00523 return 0;
00524 }
00525 }
00526
00527 inline int vtkMultiProcessController::Receive(float* data, int length,
00528 int remoteThreadId, int tag)
00529 {
00530 if (this->Communicator)
00531 {
00532 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00533 }
00534 else
00535 {
00536 return 0;
00537 }
00538 }
00539
00540 inline int vtkMultiProcessController::Receive(double* data, int length,
00541 int remoteThreadId, int tag)
00542 {
00543 if (this->Communicator)
00544 {
00545 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00546 }
00547 else
00548 {
00549 return 0;
00550 }
00551 }
00552
00553 #ifdef VTK_USE_64BIT_IDS
00554 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length,
00555 int remoteThreadId, int tag)
00556 {
00557 if (this->Communicator)
00558 {
00559 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00560 }
00561 else
00562 {
00563 return 0;
00564 }
00565 }
00566 #endif
00567
00568 #endif