00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00060 #ifndef __vtkMultiProcessController_h
00061 #define __vtkMultiProcessController_h
00062
00063 #include "vtkObject.h"
00064
00065 #include "vtkCommunicator.h"
00066
00067 class vtkDataSet;
00068 class vtkImageData;
00069 class vtkCollection;
00070 class vtkOutputWindow;
00071 class vtkDataObject;
00072 class vtkMultiProcessController;
00073
00074
00075
00076 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller,
00077 void *userData);
00078
00079
00080 typedef void (*vtkRMIFunctionType)(void *localArg,
00081 void *remoteArg, int remoteArgLength,
00082 int remoteProcessId);
00083
00084
00085
00086 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00087 {
00088 public:
00089 static vtkMultiProcessController *New();
00090 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00091 void PrintSelf(ostream& os, vtkIndent indent);
00092
00096 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00097
00099
00102 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00103 int initializedExternally)=0;
00105
00108 virtual void Finalize()=0;
00109
00113 virtual void Finalize(int finalizedExternally)=0;
00114
00116
00119 virtual void SetNumberOfProcesses(int num);
00120 vtkGetMacro( NumberOfProcesses, int );
00122
00123
00125
00126 be executed by all of the processes when SingleMethodExecute is
00127 called. All the processes will start by calling this function. */
00128 void SetSingleMethod(vtkProcessFunctionType, void *data);
00129
00131
00135 virtual void SingleMethodExecute() = 0;
00136
00137
00139
00140 to be executed by the process index when MultipleMethodExecute is
00141 called. This is for having each process start with a different
00142 function and data argument. */
00143 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data);
00144
00146
00150 virtual void MultipleMethodExecute() = 0;
00151
00153 virtual int GetLocalProcessId() { return this->LocalProcessId; }
00154
00159 static vtkMultiProcessController *GetGlobalController();
00160
00163 virtual void CreateOutputWindow() = 0;
00164
00166
00171 vtkSetMacro(ForceDeepCopy, int);
00172 vtkGetMacro(ForceDeepCopy, int);
00173 vtkBooleanMacro(ForceDeepCopy, int);
00175
00176
00177
00178
00184 void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00185
00187
00188 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00189 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00190
00192
00194 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00195
00198 void TriggerBreakRMIs();
00199
00201
00202 void TriggerRMI(int remoteProcessId, char *arg, int tag)
00203 { this->TriggerRMI(remoteProcessId, (void*)arg,
00204 static_cast<int>(strlen(arg))+1, tag); }
00206
00208
00209 void TriggerRMI(int remoteProcessId, int tag)
00210 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00212
00215 void ProcessRMIs();
00216
00218
00221 vtkSetMacro(BreakFlag, int);
00222 vtkGetMacro(BreakFlag, int);
00224
00226 vtkGetObjectMacro(Communicator, vtkCommunicator);
00228
00229
00230
00231 enum Consts {
00232 MAX_PROCESSES=8192,
00233 ANY_SOURCE=-1,
00234 INVALID_SOURCE=-2,
00235 RMI_TAG=315167,
00236 RMI_ARG_TAG=315168,
00237 BREAK_RMI_TAG=239954
00238 };
00239
00240
00241
00243 virtual void Barrier() = 0;
00244
00245 static void SetGlobalController(vtkMultiProcessController *controller);
00246
00247
00248
00250
00252 int Send(int* data, int length, int remoteProcessId, int tag);
00253 int Send(unsigned long* data, int length, int remoteProcessId,
00254 int tag);
00255 int Send(char* data, int length, int remoteProcessId, int tag);
00256 int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00257 int Send(float* data, int length, int remoteProcessId, int tag);
00258 int Send(double* data, int length, int remoteProcessId, int tag);
00259 #ifdef VTK_USE_64BIT_IDS
00260 int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00262 #endif
00263 int Send(vtkDataObject *data, int remoteId, int tag);
00264 int Send(vtkDataArray *data, int remoteId, int tag);
00265
00267
00270 int Receive(int* data, int length, int remoteProcessId, int tag);
00271 int Receive(unsigned long* data, int length, int remoteProcessId,
00272 int tag);
00273 int Receive(char* data, int length, int remoteProcessId, int tag);
00274 int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00275 int Receive(float* data, int length, int remoteProcessId, int tag);
00276 int Receive(double* data, int length, int remoteProcessId, int tag);
00277 #ifdef VTK_USE_64BIT_IDS
00278 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00280 #endif
00281 int Receive(vtkDataObject* data, int remoteId, int tag);
00282 int Receive(vtkDataArray* data, int remoteId, int tag);
00283
00284
00285
00286 protected:
00287 vtkMultiProcessController();
00288 ~vtkMultiProcessController();
00289
00290 int MaximumNumberOfProcesses;
00291 int NumberOfProcesses;
00292
00293 int LocalProcessId;
00294
00295 vtkProcessFunctionType SingleMethod;
00296 void *SingleData;
00297 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES];
00298 void *MultipleData[MAX_PROCESSES];
00299
00300 vtkCollection *RMIs;
00301
00302
00303
00304 int BreakFlag;
00305
00306 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00307
00308
00309
00310 virtual vtkMultiProcessController *GetLocalController();
00311
00312
00313
00314 int ForceDeepCopy;
00315
00316 vtkOutputWindow* OutputWindow;
00317
00318
00319
00320
00321 vtkCommunicator* Communicator;
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331 vtkCommunicator* RMICommunicator;
00332
00333 private:
00334 vtkMultiProcessController(const vtkMultiProcessController&);
00335 void operator=(const vtkMultiProcessController&);
00336 };
00337
00338
00339 inline int vtkMultiProcessController::Send(vtkDataObject *data,
00340 int remoteThreadId, int tag)
00341 {
00342 if (this->Communicator)
00343 {
00344 return this->Communicator->Send(data, remoteThreadId, tag);
00345 }
00346 else
00347 {
00348 return 0;
00349 }
00350 }
00351
00352 inline int vtkMultiProcessController::Send(vtkDataArray *data,
00353 int remoteThreadId, int tag)
00354 {
00355 if (this->Communicator)
00356 {
00357 return this->Communicator->Send(data, remoteThreadId, tag);
00358 }
00359 else
00360 {
00361 return 0;
00362 }
00363 }
00364
00365 inline int vtkMultiProcessController::Send(int* data, int length,
00366 int remoteThreadId, int tag)
00367 {
00368 if (this->Communicator)
00369 {
00370 return this->Communicator->Send(data, length, remoteThreadId, tag);
00371 }
00372 else
00373 {
00374 return 0;
00375 }
00376 }
00377
00378 inline int vtkMultiProcessController::Send(unsigned long* data,
00379 int length, int remoteThreadId,
00380 int tag)
00381 {
00382 if (this->Communicator)
00383 {
00384 return this->Communicator->Send(data, length, remoteThreadId, tag);
00385 }
00386 else
00387 {
00388 return 0;
00389 }
00390 }
00391
00392 inline int vtkMultiProcessController::Send(char* data, int length,
00393 int remoteThreadId, int tag)
00394 {
00395 if (this->Communicator)
00396 {
00397 return this->Communicator->Send(data, length, remoteThreadId, tag);
00398 }
00399 else
00400 {
00401 return 0;
00402 }
00403 }
00404
00405 inline int vtkMultiProcessController::Send(unsigned char* data, int length,
00406 int remoteThreadId, int tag)
00407 {
00408 if (this->Communicator)
00409 {
00410 return this->Communicator->Send(data, length, remoteThreadId, tag);
00411 }
00412 else
00413 {
00414 return 0;
00415 }
00416 }
00417
00418 inline int vtkMultiProcessController::Send(float* data, int length,
00419 int remoteThreadId, int tag)
00420 {
00421 if (this->Communicator)
00422 {
00423 return this->Communicator->Send(data, length, remoteThreadId, tag);
00424 }
00425 else
00426 {
00427 return 0;
00428 }
00429 }
00430
00431 inline int vtkMultiProcessController::Send(double* data, int length,
00432 int remoteThreadId, int tag)
00433 {
00434 if (this->Communicator)
00435 {
00436 return this->Communicator->Send(data, length, remoteThreadId, tag);
00437 }
00438 else
00439 {
00440 return 0;
00441 }
00442 }
00443
00444 #ifdef VTK_USE_64BIT_IDS
00445 inline int vtkMultiProcessController::Send(vtkIdType* data, int length,
00446 int remoteThreadId, int tag)
00447 {
00448 if (this->Communicator)
00449 {
00450 return this->Communicator->Send(data, length, remoteThreadId, tag);
00451 }
00452 else
00453 {
00454 return 0;
00455 }
00456 }
00457 #endif
00458
00459 inline int vtkMultiProcessController::Receive(vtkDataObject* data,
00460 int remoteThreadId, int tag)
00461 {
00462 if (this->Communicator)
00463 {
00464 return this->Communicator->Receive(data, remoteThreadId, tag);
00465 }
00466 else
00467 {
00468 return 0;
00469 }
00470 }
00471
00472 inline int vtkMultiProcessController::Receive(vtkDataArray* data,
00473 int remoteThreadId, int tag)
00474 {
00475 if (this->Communicator)
00476 {
00477 return this->Communicator->Receive(data, remoteThreadId, tag);
00478 }
00479 else
00480 {
00481 return 0;
00482 }
00483 }
00484
00485 inline int vtkMultiProcessController::Receive(int* data, int length,
00486 int remoteThreadId, int tag)
00487 {
00488 if (this->Communicator)
00489 {
00490 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00491 }
00492 else
00493 {
00494 return 0;
00495 }
00496 }
00497
00498 inline int vtkMultiProcessController::Receive(unsigned long* data,
00499 int length,int remoteThreadId,
00500 int tag)
00501 {
00502 if (this->Communicator)
00503 {
00504 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00505 }
00506 else
00507 {
00508 return 0;
00509 }
00510 }
00511
00512 inline int vtkMultiProcessController::Receive(char* data, int length,
00513 int remoteThreadId, int tag)
00514 {
00515 if (this->Communicator)
00516 {
00517 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00518 }
00519 else
00520 {
00521 return 0;
00522 }
00523 }
00524
00525 inline int vtkMultiProcessController::Receive(unsigned char* data, int length,
00526 int remoteThreadId, int tag)
00527 {
00528 if (this->Communicator)
00529 {
00530 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00531 }
00532 else
00533 {
00534 return 0;
00535 }
00536 }
00537
00538 inline int vtkMultiProcessController::Receive(float* data, int length,
00539 int remoteThreadId, int tag)
00540 {
00541 if (this->Communicator)
00542 {
00543 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00544 }
00545 else
00546 {
00547 return 0;
00548 }
00549 }
00550
00551 inline int vtkMultiProcessController::Receive(double* data, int length,
00552 int remoteThreadId, int tag)
00553 {
00554 if (this->Communicator)
00555 {
00556 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00557 }
00558 else
00559 {
00560 return 0;
00561 }
00562 }
00563
00564 #ifdef VTK_USE_64BIT_IDS
00565 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length,
00566 int remoteThreadId, int tag)
00567 {
00568 if (this->Communicator)
00569 {
00570 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00571 }
00572 else
00573 {
00574 return 0;
00575 }
00576 }
00577 #endif
00578
00579 #endif