00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00060 #ifndef __vtkMultiProcessController_h
00061 #define __vtkMultiProcessController_h
00062 
00063 #include "vtkObject.h"
00064 
00065 #include "vtkCommunicator.h" 
00066 
00067 class vtkDataSet;
00068 class vtkImageData;
00069 class vtkCollection;
00070 class vtkOutputWindow;
00071 class vtkDataObject;
00072 class vtkMultiProcessController;
00073 
00074 
00075 
00076 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 
00077                                        void *userData);
00078 
00079 
00080 typedef void (*vtkRMIFunctionType)(void *localArg, 
00081                                    void *remoteArg, int remoteArgLength, 
00082                                    int remoteProcessId);
00083 
00084 
00085 
00086 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00087 {
00088 public:
00089   static vtkMultiProcessController *New();
00090   vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00091   void PrintSelf(ostream& os, vtkIndent indent);
00092 
00096   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00097 
00099 
00102   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00103                           int initializedExternally)=0;
00105 
00108   virtual void Finalize()=0;
00109 
00113   virtual void Finalize(int finalizedExternally)=0;
00114 
00116 
00119   virtual void SetNumberOfProcesses(int num);
00120   vtkGetMacro( NumberOfProcesses, int );
00122 
00123   
00125 
00126       be executed by all of the processes when SingleMethodExecute is
00127       called.  All the processes will start by calling this function. */
00128   void SetSingleMethod(vtkProcessFunctionType, void *data);
00129   
00131 
00135   virtual void SingleMethodExecute() = 0;
00136   
00137   
00139 
00140       to be executed by the process index when MultipleMethodExecute is
00141       called.  This is for having each process start with a different
00142       function and data argument. */
00143   void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 
00144   
00146 
00150   virtual void MultipleMethodExecute() = 0;
00151 
00153   virtual int GetLocalProcessId() { return this->LocalProcessId; }
00154 
00159   static vtkMultiProcessController *GetGlobalController();
00160 
00163   virtual void CreateOutputWindow() = 0;
00164   
00166 
00171   vtkSetMacro(ForceDeepCopy, int);
00172   vtkGetMacro(ForceDeepCopy, int);
00173   vtkBooleanMacro(ForceDeepCopy, int);
00175 
00176   
00177   
00178   
00184   void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00185   
00187 
00188   void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00189     {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00190   
00192 
00194   void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00195 
00198   void TriggerBreakRMIs();
00199 
00201 
00202   void TriggerRMI(int remoteProcessId, char *arg, int tag) 
00203     { this->TriggerRMI(remoteProcessId, (void*)arg, 
00204                        static_cast<int>(strlen(arg))+1, tag); }
00206 
00208 
00209   void TriggerRMI(int remoteProcessId, int tag)
00210     { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00212 
00215   void ProcessRMIs();
00216 
00218 
00221   vtkSetMacro(BreakFlag, int);
00222   vtkGetMacro(BreakFlag, int);
00224 
00226   vtkGetObjectMacro(Communicator, vtkCommunicator);
00228   
00229 
00230 
00231   enum Consts {
00232     MAX_PROCESSES=8192,
00233     ANY_SOURCE=-1,
00234     INVALID_SOURCE=-2,
00235     RMI_TAG=315167,
00236     RMI_ARG_TAG=315168,
00237     BREAK_RMI_TAG=239954
00238   };
00239 
00240 
00241 
00243   virtual void Barrier() = 0;
00244 
00245   static void SetGlobalController(vtkMultiProcessController *controller);
00246 
00247   
00248   
00250 
00252   int Send(int* data, int length, int remoteProcessId, int tag);
00253   int Send(unsigned long* data, int length, int remoteProcessId, 
00254            int tag);
00255   int Send(char* data, int length, int remoteProcessId, int tag);
00256   int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00257   int Send(float* data, int length, int remoteProcessId, int tag);
00258   int Send(double* data, int length, int remoteProcessId, int tag);
00259 #ifdef VTK_USE_64BIT_IDS
00260   int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00262 #endif
00263   int Send(vtkDataObject *data, int remoteId, int tag);
00264   int Send(vtkDataArray *data, int remoteId, int tag);
00265 
00267 
00270   int Receive(int* data, int length, int remoteProcessId, int tag);
00271   int Receive(unsigned long* data, int length, int remoteProcessId, 
00272               int tag);
00273   int Receive(char* data, int length, int remoteProcessId, int tag);
00274   int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00275   int Receive(float* data, int length, int remoteProcessId, int tag);
00276   int Receive(double* data, int length, int remoteProcessId, int tag);
00277 #ifdef VTK_USE_64BIT_IDS
00278   int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00280 #endif
00281   int Receive(vtkDataObject* data, int remoteId, int tag);
00282   int Receive(vtkDataArray* data, int remoteId, int tag);
00283 
00284 
00285 
00286 protected:
00287   vtkMultiProcessController();
00288   ~vtkMultiProcessController();
00289   
00290   int MaximumNumberOfProcesses;
00291   int NumberOfProcesses;
00292 
00293   int LocalProcessId;
00294   
00295   vtkProcessFunctionType      SingleMethod;
00296   void                       *SingleData;
00297   vtkProcessFunctionType      MultipleMethod[MAX_PROCESSES];
00298   void                       *MultipleData[MAX_PROCESSES];  
00299   
00300   vtkCollection *RMIs;
00301   
00302   
00303   
00304   int BreakFlag;
00305 
00306   void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00307 
00308   
00309   
00310   virtual vtkMultiProcessController *GetLocalController();
00311 
00312   
00313   
00314   int ForceDeepCopy;
00315 
00316   vtkOutputWindow* OutputWindow;
00317 
00318   
00319   
00320   
00321   vtkCommunicator* Communicator;
00322 
00323   
00324   
00325   
00326   
00327   
00328   
00329   
00330   
00331   vtkCommunicator* RMICommunicator;
00332 
00333 private:
00334   vtkMultiProcessController(const vtkMultiProcessController&);  
00335   void operator=(const vtkMultiProcessController&);  
00336 };
00337 
00338 
00339 inline int vtkMultiProcessController::Send(vtkDataObject *data, 
00340                                            int remoteThreadId, int tag)
00341 {
00342   if (this->Communicator)
00343     {
00344     return this->Communicator->Send(data, remoteThreadId, tag);
00345     }
00346   else
00347     {
00348     return 0;
00349     }
00350 }
00351 
00352 inline int vtkMultiProcessController::Send(vtkDataArray *data, 
00353                                            int remoteThreadId, int tag)
00354 {
00355   if (this->Communicator)
00356     {
00357     return this->Communicator->Send(data, remoteThreadId, tag);
00358     }
00359   else
00360     {
00361     return 0;
00362     }
00363 }
00364 
00365 inline int vtkMultiProcessController::Send(int* data, int length, 
00366                                            int remoteThreadId, int tag)
00367 {
00368   if (this->Communicator)
00369     {
00370     return this->Communicator->Send(data, length, remoteThreadId, tag);
00371     }
00372   else
00373     {
00374     return 0;
00375     }
00376 }
00377 
00378 inline int vtkMultiProcessController::Send(unsigned long* data, 
00379                                            int length, int remoteThreadId, 
00380                                            int tag)
00381 {
00382   if (this->Communicator)
00383     {
00384     return this->Communicator->Send(data, length, remoteThreadId, tag);
00385     }
00386   else
00387     {
00388     return 0;
00389     }
00390 }
00391 
00392 inline int vtkMultiProcessController::Send(char* data, int length, 
00393                                            int remoteThreadId, int tag)
00394 {
00395   if (this->Communicator)
00396     {
00397     return this->Communicator->Send(data, length, remoteThreadId, tag);
00398     }
00399   else
00400     {
00401     return 0;
00402     }
00403 }
00404 
00405 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 
00406                                            int remoteThreadId, int tag)
00407 {
00408   if (this->Communicator)
00409     {
00410     return this->Communicator->Send(data, length, remoteThreadId, tag);
00411     }
00412   else
00413     {
00414     return 0;
00415     }
00416 }
00417 
00418 inline int vtkMultiProcessController::Send(float* data, int length, 
00419                                            int remoteThreadId, int tag)
00420 {
00421   if (this->Communicator)
00422     {
00423     return this->Communicator->Send(data, length, remoteThreadId, tag);
00424     }
00425   else
00426     {
00427     return 0;
00428     }
00429 }
00430 
00431 inline int vtkMultiProcessController::Send(double* data, int length, 
00432                                            int remoteThreadId, int tag)
00433 {
00434   if (this->Communicator)
00435     {
00436     return this->Communicator->Send(data, length, remoteThreadId, tag);
00437     }
00438   else
00439     {
00440     return 0;
00441     }
00442 }
00443 
00444 #ifdef VTK_USE_64BIT_IDS
00445 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 
00446                                            int remoteThreadId, int tag)
00447 {
00448   if (this->Communicator)
00449     {
00450     return this->Communicator->Send(data, length, remoteThreadId, tag);
00451     }
00452   else
00453     {
00454     return 0;
00455     }
00456 }
00457 #endif
00458 
00459 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 
00460                                               int remoteThreadId, int tag)
00461 {
00462   if (this->Communicator)
00463     {
00464     return this->Communicator->Receive(data, remoteThreadId, tag);
00465     }
00466   else
00467     {
00468     return 0;
00469     }
00470 }
00471 
00472 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 
00473                                               int remoteThreadId, int tag)
00474 {
00475   if (this->Communicator)
00476     {
00477     return this->Communicator->Receive(data, remoteThreadId, tag);
00478     }
00479   else
00480     {
00481     return 0;
00482     }
00483 }
00484 
00485 inline int vtkMultiProcessController::Receive(int* data, int length, 
00486                                               int remoteThreadId, int tag)
00487 {
00488   if (this->Communicator)
00489     {
00490     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00491     }
00492   else
00493     {
00494     return 0;
00495     }
00496 }
00497 
00498 inline int vtkMultiProcessController::Receive(unsigned long* data, 
00499                                               int length,int remoteThreadId, 
00500                                               int tag)
00501 {
00502   if (this->Communicator)
00503     {
00504     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00505     }
00506   else
00507     {
00508     return 0;
00509     }
00510 }
00511 
00512 inline int vtkMultiProcessController::Receive(char* data, int length, 
00513                                               int remoteThreadId, int tag)
00514 {
00515   if (this->Communicator)
00516     {
00517     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00518     }
00519   else
00520     {
00521     return 0;
00522     }
00523 }
00524 
00525 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 
00526                                               int remoteThreadId, int tag)
00527 {
00528   if (this->Communicator)
00529     {
00530     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00531     }
00532   else
00533     {
00534     return 0;
00535     }
00536 }
00537 
00538 inline int vtkMultiProcessController::Receive(float* data, int length, 
00539                                               int remoteThreadId, int tag)
00540 {
00541   if (this->Communicator)
00542     {
00543     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00544     }
00545   else
00546     {
00547     return 0;
00548     }
00549 }
00550 
00551 inline int vtkMultiProcessController::Receive(double* data, int length, 
00552                                               int remoteThreadId, int tag)
00553 {
00554   if (this->Communicator)
00555     {
00556     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00557     }
00558   else
00559     {
00560     return 0;
00561     }
00562 }
00563 
00564 #ifdef VTK_USE_64BIT_IDS
00565 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 
00566                                               int remoteThreadId, int tag)
00567 {
00568   if (this->Communicator)
00569     {
00570     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00571     }
00572   else
00573     {
00574     return 0;
00575     }
00576 }
00577 #endif
00578 
00579 #endif