00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00072 #ifndef __vtkMultiProcessController_h
00073 #define __vtkMultiProcessController_h
00074
00075 #include "vtkObject.h"
00076 #include "vtkDataObject.h"
00077 #include "vtkCommunicator.h"
00078
00079 class vtkDataSet;
00080 class vtkImageData;
00081 class vtkCollection;
00082 class vtkOutputWindow;
00083
00084 class vtkMultiProcessController;
00085
00086
00087
00088
00089 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller,
00090 void *userData);
00091
00092
00093 typedef void (*vtkRMIFunctionType)(void *localArg,
00094 void *remoteArg, int remoteArgLength,
00095 int remoteProcessId);
00096
00097
00098
00099 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00100 {
00101 public:
00102 static vtkMultiProcessController *New();
00103 vtkTypeMacro(vtkMultiProcessController,vtkObject);
00104 void PrintSelf(ostream& os, vtkIndent indent);
00105
00109 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(arcv))=0;
00110
00113 virtual void Finalize()=0;
00114
00116
00119 virtual void SetNumberOfProcesses(int num);
00120 vtkGetMacro( NumberOfProcesses, int );
00122
00123
00125
00126 be executed by all of the processes when SingleMethodExecute is
00127 called. All the processes will start by calling this function. */
00128 void SetSingleMethod(vtkProcessFunctionType, void *data);
00129
00131
00135 virtual void SingleMethodExecute() = 0;
00136
00137
00139
00140 to be executed by the process index when MultipleMethodExecute is
00141 called. This is for having each process start with a different
00142 function and data argument. */
00143 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data);
00144
00146
00150 virtual void MultipleMethodExecute() = 0;
00151
00153 virtual int GetLocalProcessId() { return this->LocalProcessId; }
00154
00159 static vtkMultiProcessController *GetGlobalController();
00160
00163 virtual void CreateOutputWindow() = 0;
00164
00166
00171 vtkSetMacro(ForceDeepCopy, int);
00172 vtkGetMacro(ForceDeepCopy, int);
00173 vtkBooleanMacro(ForceDeepCopy, int);
00175
00176
00177
00178
00184 void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00185
00187
00188 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00189 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00190
00192
00194 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00195
00197
00198 void TriggerRMI(int remoteProcessId, char *arg, int tag)
00199 { this->TriggerRMI(remoteProcessId, (void*)arg, strlen(arg)+1, tag); }
00201
00203
00204 void TriggerRMI(int remoteProcessId, int tag)
00205 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00207
00210 void ProcessRMIs();
00211
00213
00216 vtkSetMacro(BreakFlag, int);
00217 vtkGetMacro(BreakFlag, int);
00219
00221 vtkGetObjectMacro(Communicator, vtkCommunicator);
00223
00224
00225
00226 enum Consts {
00227 MAX_PROCESSES=8192,
00228 ANY_SOURCE=-1,
00229 INVALID_SOURCE=-2,
00230 RMI_TAG=315167,
00231 RMI_ARG_TAG=315168,
00232 BREAK_RMI_TAG=239954
00233 };
00234
00235
00236
00238 virtual void Barrier() = 0;
00239
00240 static void SetGlobalController(vtkMultiProcessController *controller);
00241
00242
00243
00245
00247 int Send(int* data, int length, int remoteProcessId, int tag);
00248 int Send(unsigned long* data, int length, int remoteProcessId,
00249 int tag);
00250 int Send(char* data, int length, int remoteProcessId, int tag);
00251 int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00252 int Send(float* data, int length, int remoteProcessId, int tag);
00253 int Send(double* data, int length, int remoteProcessId, int tag);
00254 #ifdef VTK_USE_64BIT_IDS
00255 int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00257 #endif
00258 int Send(vtkDataObject *data, int remoteId, int tag);
00259 int Send(vtkDataArray *data, int remoteId, int tag);
00260
00262
00265 int Receive(int* data, int length, int remoteProcessId, int tag);
00266 int Receive(unsigned long* data, int length, int remoteProcessId,
00267 int tag);
00268 int Receive(char* data, int length, int remoteProcessId, int tag);
00269 int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00270 int Receive(float* data, int length, int remoteProcessId, int tag);
00271 int Receive(double* data, int length, int remoteProcessId, int tag);
00272 #ifdef VTK_USE_64BIT_IDS
00273 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00275 #endif
00276 int Receive(vtkDataObject* data, int remoteId, int tag);
00277 int Receive(vtkDataArray* data, int remoteId, int tag);
00278
00279
00280
00281 protected:
00282 vtkMultiProcessController();
00283 ~vtkMultiProcessController();
00284
00285 int MaximumNumberOfProcesses;
00286 int NumberOfProcesses;
00287
00288 int LocalProcessId;
00289
00290 vtkProcessFunctionType SingleMethod;
00291 void *SingleData;
00292 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES];
00293 void *MultipleData[MAX_PROCESSES];
00294
00295 vtkCollection *RMIs;
00296
00297
00298
00299 int BreakFlag;
00300
00301 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00302
00303
00304
00305 virtual vtkMultiProcessController *GetLocalController();
00306
00307
00308
00309 int ForceDeepCopy;
00310
00311 vtkOutputWindow* OutputWindow;
00312
00313
00314
00315
00316 vtkCommunicator* Communicator;
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326 vtkCommunicator* RMICommunicator;
00327
00328 private:
00329 vtkMultiProcessController(const vtkMultiProcessController&);
00330 void operator=(const vtkMultiProcessController&);
00331 };
00332
00333
00334 inline int vtkMultiProcessController::Send(vtkDataObject *data,
00335 int remoteThreadId, int tag)
00336 {
00337 if (this->Communicator)
00338 {
00339 return this->Communicator->Send(data, remoteThreadId, tag);
00340 }
00341 else
00342 {
00343 return 0;
00344 }
00345 }
00346
00347 inline int vtkMultiProcessController::Send(vtkDataArray *data,
00348 int remoteThreadId, int tag)
00349 {
00350 if (this->Communicator)
00351 {
00352 return this->Communicator->Send(data, remoteThreadId, tag);
00353 }
00354 else
00355 {
00356 return 0;
00357 }
00358 }
00359
00360 inline int vtkMultiProcessController::Send(int* data, int length,
00361 int remoteThreadId, int tag)
00362 {
00363 if (this->Communicator)
00364 {
00365 return this->Communicator->Send(data, length, remoteThreadId, tag);
00366 }
00367 else
00368 {
00369 return 0;
00370 }
00371 }
00372
00373 inline int vtkMultiProcessController::Send(unsigned long* data,
00374 int length, int remoteThreadId,
00375 int tag)
00376 {
00377 if (this->Communicator)
00378 {
00379 return this->Communicator->Send(data, length, remoteThreadId, tag);
00380 }
00381 else
00382 {
00383 return 0;
00384 }
00385 }
00386
00387 inline int vtkMultiProcessController::Send(char* data, int length,
00388 int remoteThreadId, int tag)
00389 {
00390 if (this->Communicator)
00391 {
00392 return this->Communicator->Send(data, length, remoteThreadId, tag);
00393 }
00394 else
00395 {
00396 return 0;
00397 }
00398 }
00399
00400 inline int vtkMultiProcessController::Send(unsigned char* data, int length,
00401 int remoteThreadId, int tag)
00402 {
00403 if (this->Communicator)
00404 {
00405 return this->Communicator->Send(data, length, remoteThreadId, tag);
00406 }
00407 else
00408 {
00409 return 0;
00410 }
00411 }
00412
00413 inline int vtkMultiProcessController::Send(float* data, int length,
00414 int remoteThreadId, int tag)
00415 {
00416 if (this->Communicator)
00417 {
00418 return this->Communicator->Send(data, length, remoteThreadId, tag);
00419 }
00420 else
00421 {
00422 return 0;
00423 }
00424 }
00425
00426 inline int vtkMultiProcessController::Send(double* data, int length,
00427 int remoteThreadId, int tag)
00428 {
00429 if (this->Communicator)
00430 {
00431 return this->Communicator->Send(data, length, remoteThreadId, tag);
00432 }
00433 else
00434 {
00435 return 0;
00436 }
00437 }
00438
00439 #ifdef VTK_USE_64BIT_IDS
00440 inline int vtkMultiProcessController::Send(vtkIdType* data, int length,
00441 int remoteThreadId, int tag)
00442 {
00443 if (this->Communicator)
00444 {
00445 return this->Communicator->Send(data, length, remoteThreadId, tag);
00446 }
00447 else
00448 {
00449 return 0;
00450 }
00451 }
00452 #endif
00453
00454 inline int vtkMultiProcessController::Receive(vtkDataObject* data,
00455 int remoteThreadId, int tag)
00456 {
00457 if (this->Communicator)
00458 {
00459 return this->Communicator->Receive(data, remoteThreadId, tag);
00460 }
00461 else
00462 {
00463 return 0;
00464 }
00465 }
00466
00467 inline int vtkMultiProcessController::Receive(vtkDataArray* data,
00468 int remoteThreadId, int tag)
00469 {
00470 if (this->Communicator)
00471 {
00472 return this->Communicator->Receive(data, remoteThreadId, tag);
00473 }
00474 else
00475 {
00476 return 0;
00477 }
00478 }
00479
00480 inline int vtkMultiProcessController::Receive(int* data, int length,
00481 int remoteThreadId, int tag)
00482 {
00483 if (this->Communicator)
00484 {
00485 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00486 }
00487 else
00488 {
00489 return 0;
00490 }
00491 }
00492
00493 inline int vtkMultiProcessController::Receive(unsigned long* data,
00494 int length,int remoteThreadId,
00495 int tag)
00496 {
00497 if (this->Communicator)
00498 {
00499 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00500 }
00501 else
00502 {
00503 return 0;
00504 }
00505 }
00506
00507 inline int vtkMultiProcessController::Receive(char* data, int length,
00508 int remoteThreadId, int tag)
00509 {
00510 if (this->Communicator)
00511 {
00512 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00513 }
00514 else
00515 {
00516 return 0;
00517 }
00518 }
00519
00520 inline int vtkMultiProcessController::Receive(unsigned char* data, int length,
00521 int remoteThreadId, int tag)
00522 {
00523 if (this->Communicator)
00524 {
00525 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00526 }
00527 else
00528 {
00529 return 0;
00530 }
00531 }
00532
00533 inline int vtkMultiProcessController::Receive(float* data, int length,
00534 int remoteThreadId, int tag)
00535 {
00536 if (this->Communicator)
00537 {
00538 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00539 }
00540 else
00541 {
00542 return 0;
00543 }
00544 }
00545
00546 inline int vtkMultiProcessController::Receive(double* data, int length,
00547 int remoteThreadId, int tag)
00548 {
00549 if (this->Communicator)
00550 {
00551 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00552 }
00553 else
00554 {
00555 return 0;
00556 }
00557 }
00558
00559 #ifdef VTK_USE_64BIT_IDS
00560 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length,
00561 int remoteThreadId, int tag)
00562 {
00563 if (this->Communicator)
00564 {
00565 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00566 }
00567 else
00568 {
00569 return 0;
00570 }
00571 }
00572 #endif
00573
00574 #endif