00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00040 #ifndef __vtkMultiProcessController_h
00041 #define __vtkMultiProcessController_h
00042
00043 #include "vtkObject.h"
00044
00045 #include "vtkCommunicator.h"
00046
00047 class vtkDataSet;
00048 class vtkImageData;
00049 class vtkCollection;
00050 class vtkOutputWindow;
00051 class vtkDataObject;
00052 class vtkMultiProcessController;
00053
00054
00055
00056 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller,
00057 void *userData);
00058
00059
00060 typedef void (*vtkRMIFunctionType)(void *localArg,
00061 void *remoteArg, int remoteArgLength,
00062 int remoteProcessId);
00063
00064
00065
00066 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00067 {
00068 public:
00069 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00070 void PrintSelf(ostream& os, vtkIndent indent);
00071
00075 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00076
00078
00081 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00082 int initializedExternally)=0;
00084
00087 virtual void Finalize()=0;
00088
00092 virtual void Finalize(int finalizedExternally)=0;
00093
00095
00098 virtual void SetNumberOfProcesses(int num);
00099 vtkGetMacro( NumberOfProcesses, int );
00101
00102
00104
00107 void SetSingleMethod(vtkProcessFunctionType, void *data);
00108
00110
00114 virtual void SingleMethodExecute() = 0;
00115
00116
00118
00122 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data);
00123
00125
00129 virtual void MultipleMethodExecute() = 0;
00130
00132
00133 vtkGetMacro(LocalProcessId, int);
00135
00140 static vtkMultiProcessController *GetGlobalController();
00141
00144 virtual void CreateOutputWindow() = 0;
00145
00146
00147
00153 void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00154
00156 int RemoveFirstRMI(int tag);
00157
00159
00160 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00161 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00162
00164
00166 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00167
00170 void TriggerBreakRMIs();
00171
00173
00174 void TriggerRMI(int remoteProcessId, const char *arg, int tag)
00175 { this->TriggerRMI(remoteProcessId, (void*)arg,
00176 static_cast<int>(strlen(arg))+1, tag); }
00178
00180
00181 void TriggerRMI(int remoteProcessId, int tag)
00182 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00184
00186
00192 int ProcessRMIs(int reportErrors);
00193 int ProcessRMIs();
00195
00197
00200 vtkSetMacro(BreakFlag, int);
00201 vtkGetMacro(BreakFlag, int);
00203
00205 vtkGetObjectMacro(Communicator, vtkCommunicator);
00207
00208
00209
00210 enum Errors
00211 {
00212 RMI_NO_ERROR,
00213 RMI_TAG_ERROR,
00214 RMI_ARG_ERROR
00215 };
00216
00217 enum Consts
00218 {
00219 MAX_PROCESSES = 8192,
00220 ANY_SOURCE = -1,
00221 INVALID_SOURCE = -2,
00222 RMI_TAG = 315167,
00223 RMI_ARG_TAG = 315168,
00224 BREAK_RMI_TAG = 239954
00225 };
00226
00227
00228
00230 virtual void Barrier() = 0;
00231
00232 static void SetGlobalController(vtkMultiProcessController *controller);
00233
00234
00235
00237
00239 int Send(int* data, int length, int remoteProcessId, int tag);
00240 int Send(unsigned long* data, int length, int remoteProcessId,
00241 int tag);
00242 int Send(char* data, int length, int remoteProcessId, int tag);
00243 int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00244 int Send(float* data, int length, int remoteProcessId, int tag);
00245 int Send(double* data, int length, int remoteProcessId, int tag);
00246 #ifdef VTK_USE_64BIT_IDS
00247 int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00249 #endif
00250 int Send(vtkDataObject *data, int remoteId, int tag);
00251 int Send(vtkDataArray *data, int remoteId, int tag);
00252
00254
00257 int Receive(int* data, int length, int remoteProcessId, int tag);
00258 int Receive(unsigned long* data, int length, int remoteProcessId,
00259 int tag);
00260 int Receive(char* data, int length, int remoteProcessId, int tag);
00261 int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00262 int Receive(float* data, int length, int remoteProcessId, int tag);
00263 int Receive(double* data, int length, int remoteProcessId, int tag);
00264 #ifdef VTK_USE_64BIT_IDS
00265 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00267 #endif
00268 int Receive(vtkDataObject* data, int remoteId, int tag);
00269 int Receive(vtkDataArray* data, int remoteId, int tag);
00270
00271
00272
00273 protected:
00274 vtkMultiProcessController();
00275 ~vtkMultiProcessController();
00276
00277 int MaximumNumberOfProcesses;
00278 int NumberOfProcesses;
00279
00280 int LocalProcessId;
00281
00282 vtkProcessFunctionType SingleMethod;
00283 void *SingleData;
00284 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES];
00285 void *MultipleData[MAX_PROCESSES];
00286
00287 vtkCollection *RMIs;
00288
00289
00290
00291 int BreakFlag;
00292
00293 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00294
00295
00296
00297 virtual vtkMultiProcessController *GetLocalController();
00298
00299
00300
00301 int ForceDeepCopy;
00302
00303 vtkOutputWindow* OutputWindow;
00304
00305
00306
00307
00308 vtkCommunicator* Communicator;
00309
00310
00311
00312
00313
00314
00315
00316
00317 vtkCommunicator* RMICommunicator;
00318
00319 private:
00320 vtkMultiProcessController(const vtkMultiProcessController&);
00321 void operator=(const vtkMultiProcessController&);
00322 };
00323
00324
00325 inline int vtkMultiProcessController::Send(vtkDataObject *data,
00326 int remoteProcessId, int tag)
00327 {
00328 if (this->Communicator)
00329 {
00330 return this->Communicator->Send(data, remoteProcessId, tag);
00331 }
00332 else
00333 {
00334 return 0;
00335 }
00336 }
00337
00338 inline int vtkMultiProcessController::Send(vtkDataArray *data,
00339 int remoteProcessId, int tag)
00340 {
00341 if (this->Communicator)
00342 {
00343 return this->Communicator->Send(data, remoteProcessId, tag);
00344 }
00345 else
00346 {
00347 return 0;
00348 }
00349 }
00350
00351 inline int vtkMultiProcessController::Send(int* data, int length,
00352 int remoteProcessId, int tag)
00353 {
00354 if (this->Communicator)
00355 {
00356 return this->Communicator->Send(data, length, remoteProcessId, tag);
00357 }
00358 else
00359 {
00360 return 0;
00361 }
00362 }
00363
00364 inline int vtkMultiProcessController::Send(unsigned long* data,
00365 int length, int remoteProcessId,
00366 int tag)
00367 {
00368 if (this->Communicator)
00369 {
00370 return this->Communicator->Send(data, length, remoteProcessId, tag);
00371 }
00372 else
00373 {
00374 return 0;
00375 }
00376 }
00377
00378 inline int vtkMultiProcessController::Send(char* data, int length,
00379 int remoteProcessId, int tag)
00380 {
00381 if (this->Communicator)
00382 {
00383 return this->Communicator->Send(data, length, remoteProcessId, tag);
00384 }
00385 else
00386 {
00387 return 0;
00388 }
00389 }
00390
00391 inline int vtkMultiProcessController::Send(unsigned char* data, int length,
00392 int remoteProcessId, int tag)
00393 {
00394 if (this->Communicator)
00395 {
00396 return this->Communicator->Send(data, length, remoteProcessId, tag);
00397 }
00398 else
00399 {
00400 return 0;
00401 }
00402 }
00403
00404 inline int vtkMultiProcessController::Send(float* data, int length,
00405 int remoteProcessId, int tag)
00406 {
00407 if (this->Communicator)
00408 {
00409 return this->Communicator->Send(data, length, remoteProcessId, tag);
00410 }
00411 else
00412 {
00413 return 0;
00414 }
00415 }
00416
00417 inline int vtkMultiProcessController::Send(double* data, int length,
00418 int remoteProcessId, int tag)
00419 {
00420 if (this->Communicator)
00421 {
00422 return this->Communicator->Send(data, length, remoteProcessId, tag);
00423 }
00424 else
00425 {
00426 return 0;
00427 }
00428 }
00429
00430 #ifdef VTK_USE_64BIT_IDS
00431 inline int vtkMultiProcessController::Send(vtkIdType* data, int length,
00432 int remoteProcessId, int tag)
00433 {
00434 if (this->Communicator)
00435 {
00436 return this->Communicator->Send(data, length, remoteProcessId, tag);
00437 }
00438 else
00439 {
00440 return 0;
00441 }
00442 }
00443 #endif
00444
00445 inline int vtkMultiProcessController::Receive(vtkDataObject* data,
00446 int remoteProcessId, int tag)
00447 {
00448 if (this->Communicator)
00449 {
00450 return this->Communicator->Receive(data, remoteProcessId, tag);
00451 }
00452 else
00453 {
00454 return 0;
00455 }
00456 }
00457
00458 inline int vtkMultiProcessController::Receive(vtkDataArray* data,
00459 int remoteProcessId, int tag)
00460 {
00461 if (this->Communicator)
00462 {
00463 return this->Communicator->Receive(data, remoteProcessId, tag);
00464 }
00465 else
00466 {
00467 return 0;
00468 }
00469 }
00470
00471 inline int vtkMultiProcessController::Receive(int* data, int length,
00472 int remoteProcessId, int tag)
00473 {
00474 if (this->Communicator)
00475 {
00476 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00477 }
00478 else
00479 {
00480 return 0;
00481 }
00482 }
00483
00484 inline int vtkMultiProcessController::Receive(unsigned long* data,
00485 int length,int remoteProcessId,
00486 int tag)
00487 {
00488 if (this->Communicator)
00489 {
00490 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00491 }
00492 else
00493 {
00494 return 0;
00495 }
00496 }
00497
00498 inline int vtkMultiProcessController::Receive(char* data, int length,
00499 int remoteProcessId, int tag)
00500 {
00501 if (this->Communicator)
00502 {
00503 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00504 }
00505 else
00506 {
00507 return 0;
00508 }
00509 }
00510
00511 inline int vtkMultiProcessController::Receive(unsigned char* data, int length,
00512 int remoteProcessId, int tag)
00513 {
00514 if (this->Communicator)
00515 {
00516 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00517 }
00518 else
00519 {
00520 return 0;
00521 }
00522 }
00523
00524 inline int vtkMultiProcessController::Receive(float* data, int length,
00525 int remoteProcessId, int tag)
00526 {
00527 if (this->Communicator)
00528 {
00529 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00530 }
00531 else
00532 {
00533 return 0;
00534 }
00535 }
00536
00537 inline int vtkMultiProcessController::Receive(double* data, int length,
00538 int remoteProcessId, int tag)
00539 {
00540 if (this->Communicator)
00541 {
00542 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00543 }
00544 else
00545 {
00546 return 0;
00547 }
00548 }
00549
00550 #ifdef VTK_USE_64BIT_IDS
00551 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length,
00552 int remoteProcessId, int tag)
00553 {
00554 if (this->Communicator)
00555 {
00556 return this->Communicator->Receive(data, length, remoteProcessId, tag);
00557 }
00558 else
00559 {
00560 return 0;
00561 }
00562 }
00563 #endif
00564
00565 #endif