Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Parallel/vtkMultiProcessController.h

Go to the documentation of this file.
00001 /*=========================================================================
00002 
00003   Program:   Visualization Toolkit
00004   Module:    $RCSfile: vtkMultiProcessController.h,v $
00005   Language:  C++
00006 
00007   Copyright (c) 1993-2002 Ken Martin, Will Schroeder, Bill Lorensen 
00008   All rights reserved.
00009   See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
00010 
00011      This software is distributed WITHOUT ANY WARRANTY; without even 
00012      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR 
00013      PURPOSE.  See the above copyright notice for more information.
00014 
00015 =========================================================================*/
00060 #ifndef __vtkMultiProcessController_h
00061 #define __vtkMultiProcessController_h
00062 
00063 #include "vtkObject.h"
00064 
00065 #include "vtkCommunicator.h" // Needed for direct access to communicator
00066 
00067 class vtkDataSet;
00068 class vtkImageData;
00069 class vtkCollection;
00070 class vtkOutputWindow;
00071 class vtkDataObject;
00072 class vtkMultiProcessController;
00073 
00074 //BTX
00075 // The type of function that gets called when new processes are initiated.
00076 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 
00077                                        void *userData);
00078 
00079 // The type of function that gets called when an RMI is triggered.
00080 typedef void (*vtkRMIFunctionType)(void *localArg, 
00081                                    void *remoteArg, int remoteArgLength, 
00082                                    int remoteProcessId);
00083 //ETX
00084 
00085 
00086 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00087 {
00088 public:
00089   static vtkMultiProcessController *New();
00090   vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00091   void PrintSelf(ostream& os, vtkIndent indent);
00092 
00096   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00097 
00099 
00102   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00103                           int initializedExternally)=0;
00105 
00108   virtual void Finalize()=0;
00109 
00113   virtual void Finalize(int finalizedExternally)=0;
00114 
00116 
00119   virtual void SetNumberOfProcesses(int num);
00120   vtkGetMacro( NumberOfProcesses, int );
00122 
00123   //BTX
00125   /*! Set the SingleMethod to f() and the UserData of the for the method to
00126       be executed by all of the processes when SingleMethodExecute is
00127       called.  All the processes will start by calling this function. */
00128   void SetSingleMethod(vtkProcessFunctionType, void *data);
00129   //ETX
00131 
00135   virtual void SingleMethodExecute() = 0;
00136   
00137   //BTX
00139   /*! Set the MultipleMethod to f() and the UserData of the for the method
00140       to be executed by the process index when MultipleMethodExecute is
00141       called.  This is for having each process start with a different
00142       function and data argument. */
00143   void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 
00144   //ETX
00146 
00150   virtual void MultipleMethodExecute() = 0;
00151 
00153   virtual int GetLocalProcessId() { return this->LocalProcessId; }
00154 
00159   static vtkMultiProcessController *GetGlobalController();
00160 
00163   virtual void CreateOutputWindow() = 0;
00164   
00166 
00171   vtkSetMacro(ForceDeepCopy, int);
00172   vtkGetMacro(ForceDeepCopy, int);
00173   vtkBooleanMacro(ForceDeepCopy, int);
00175 
00176   
00177   //------------------ RMIs --------------------
00178   //BTX
00184   void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00185   
00187 
00188   void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00189     {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00190   //ETX
00192   
00194   void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00195 
00198   void TriggerBreakRMIs();
00199 
00201 
00202   void TriggerRMI(int remoteProcessId, char *arg, int tag) 
00203     { this->TriggerRMI(remoteProcessId, (void*)arg, 
00204                        static_cast<int>(strlen(arg))+1, tag); }
00206 
00208 
00209   void TriggerRMI(int remoteProcessId, int tag)
00210     { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00212 
00215   void ProcessRMIs();
00216 
00218 
00221   vtkSetMacro(BreakFlag, int);
00222   vtkGetMacro(BreakFlag, int);
00224 
00226   vtkGetObjectMacro(Communicator, vtkCommunicator);
00228   
00229 //BTX
00230 
00231   enum Consts {
00232     MAX_PROCESSES=8192,
00233     ANY_SOURCE=-1,
00234     INVALID_SOURCE=-2,
00235     RMI_TAG=315167,
00236     RMI_ARG_TAG=315168,
00237     BREAK_RMI_TAG=239954
00238   };
00239 
00240 //ETX
00241 
00243   virtual void Barrier() = 0;
00244 
00245   static void SetGlobalController(vtkMultiProcessController *controller);
00246 
00247   //------------------ Communication --------------------
00248   
00250 
00252   int Send(int* data, int length, int remoteProcessId, int tag);
00253   int Send(unsigned long* data, int length, int remoteProcessId, 
00254            int tag);
00255   int Send(char* data, int length, int remoteProcessId, int tag);
00256   int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00257   int Send(float* data, int length, int remoteProcessId, int tag);
00258   int Send(double* data, int length, int remoteProcessId, int tag);
00259 #ifdef VTK_USE_64BIT_IDS
00260   int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00262 #endif
00263   int Send(vtkDataObject *data, int remoteId, int tag);
00264   int Send(vtkDataArray *data, int remoteId, int tag);
00265 
00267 
00270   int Receive(int* data, int length, int remoteProcessId, int tag);
00271   int Receive(unsigned long* data, int length, int remoteProcessId, 
00272               int tag);
00273   int Receive(char* data, int length, int remoteProcessId, int tag);
00274   int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00275   int Receive(float* data, int length, int remoteProcessId, int tag);
00276   int Receive(double* data, int length, int remoteProcessId, int tag);
00277 #ifdef VTK_USE_64BIT_IDS
00278   int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00280 #endif
00281   int Receive(vtkDataObject* data, int remoteId, int tag);
00282   int Receive(vtkDataArray* data, int remoteId, int tag);
00283 
00284 // Internally implemented RMI to break the process loop.
00285 
00286 protected:
00287   vtkMultiProcessController();
00288   ~vtkMultiProcessController();
00289   
00290   int MaximumNumberOfProcesses;
00291   int NumberOfProcesses;
00292 
00293   int LocalProcessId;
00294   
00295   vtkProcessFunctionType      SingleMethod;
00296   void                       *SingleData;
00297   vtkProcessFunctionType      MultipleMethod[MAX_PROCESSES];
00298   void                       *MultipleData[MAX_PROCESSES];  
00299   
00300   vtkCollection *RMIs;
00301   
00302   // This is a flag that can be used by the ports to break
00303   // their update loop. (same as ProcessRMIs)
00304   int BreakFlag;
00305 
00306   void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00307 
00308   // This method implements "GetGlobalController".  
00309   // It needs to be virtual and static.
00310   virtual vtkMultiProcessController *GetLocalController();
00311 
00312   
00313   // This flag can force deep copies during send.
00314   int ForceDeepCopy;
00315 
00316   vtkOutputWindow* OutputWindow;
00317 
00318   // Note that since the communicators can be created differently
00319   // depending on the type of controller, the subclasses are
00320   // responsible of deleting them.
00321   vtkCommunicator* Communicator;
00322 
00323   // Communicator which is a copy of the current user
00324   // level communicator except the context; i.e. even if the tags 
00325   // are the same, the RMI messages will not interfere with user 
00326   // level messages. (This only works with MPI. When using threads,
00327   // the tags have to be unique.)
00328   // Note that since the communicators can be created differently
00329   // depending on the type of controller, the subclasses are
00330   // responsible of deleting them.
00331   vtkCommunicator* RMICommunicator;
00332 
00333 private:
00334   vtkMultiProcessController(const vtkMultiProcessController&);  // Not implemented.
00335   void operator=(const vtkMultiProcessController&);  // Not implemented.
00336 };
00337 
00338 
00339 inline int vtkMultiProcessController::Send(vtkDataObject *data, 
00340                                            int remoteThreadId, int tag)
00341 {
00342   if (this->Communicator)
00343     {
00344     return this->Communicator->Send(data, remoteThreadId, tag);
00345     }
00346   else
00347     {
00348     return 0;
00349     }
00350 }
00351 
00352 inline int vtkMultiProcessController::Send(vtkDataArray *data, 
00353                                            int remoteThreadId, int tag)
00354 {
00355   if (this->Communicator)
00356     {
00357     return this->Communicator->Send(data, remoteThreadId, tag);
00358     }
00359   else
00360     {
00361     return 0;
00362     }
00363 }
00364 
00365 inline int vtkMultiProcessController::Send(int* data, int length, 
00366                                            int remoteThreadId, int tag)
00367 {
00368   if (this->Communicator)
00369     {
00370     return this->Communicator->Send(data, length, remoteThreadId, tag);
00371     }
00372   else
00373     {
00374     return 0;
00375     }
00376 }
00377 
00378 inline int vtkMultiProcessController::Send(unsigned long* data, 
00379                                            int length, int remoteThreadId, 
00380                                            int tag)
00381 {
00382   if (this->Communicator)
00383     {
00384     return this->Communicator->Send(data, length, remoteThreadId, tag);
00385     }
00386   else
00387     {
00388     return 0;
00389     }
00390 }
00391 
00392 inline int vtkMultiProcessController::Send(char* data, int length, 
00393                                            int remoteThreadId, int tag)
00394 {
00395   if (this->Communicator)
00396     {
00397     return this->Communicator->Send(data, length, remoteThreadId, tag);
00398     }
00399   else
00400     {
00401     return 0;
00402     }
00403 }
00404 
00405 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 
00406                                            int remoteThreadId, int tag)
00407 {
00408   if (this->Communicator)
00409     {
00410     return this->Communicator->Send(data, length, remoteThreadId, tag);
00411     }
00412   else
00413     {
00414     return 0;
00415     }
00416 }
00417 
00418 inline int vtkMultiProcessController::Send(float* data, int length, 
00419                                            int remoteThreadId, int tag)
00420 {
00421   if (this->Communicator)
00422     {
00423     return this->Communicator->Send(data, length, remoteThreadId, tag);
00424     }
00425   else
00426     {
00427     return 0;
00428     }
00429 }
00430 
00431 inline int vtkMultiProcessController::Send(double* data, int length, 
00432                                            int remoteThreadId, int tag)
00433 {
00434   if (this->Communicator)
00435     {
00436     return this->Communicator->Send(data, length, remoteThreadId, tag);
00437     }
00438   else
00439     {
00440     return 0;
00441     }
00442 }
00443 
00444 #ifdef VTK_USE_64BIT_IDS
00445 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 
00446                                            int remoteThreadId, int tag)
00447 {
00448   if (this->Communicator)
00449     {
00450     return this->Communicator->Send(data, length, remoteThreadId, tag);
00451     }
00452   else
00453     {
00454     return 0;
00455     }
00456 }
00457 #endif
00458 
00459 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 
00460                                               int remoteThreadId, int tag)
00461 {
00462   if (this->Communicator)
00463     {
00464     return this->Communicator->Receive(data, remoteThreadId, tag);
00465     }
00466   else
00467     {
00468     return 0;
00469     }
00470 }
00471 
00472 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 
00473                                               int remoteThreadId, int tag)
00474 {
00475   if (this->Communicator)
00476     {
00477     return this->Communicator->Receive(data, remoteThreadId, tag);
00478     }
00479   else
00480     {
00481     return 0;
00482     }
00483 }
00484 
00485 inline int vtkMultiProcessController::Receive(int* data, int length, 
00486                                               int remoteThreadId, int tag)
00487 {
00488   if (this->Communicator)
00489     {
00490     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00491     }
00492   else
00493     {
00494     return 0;
00495     }
00496 }
00497 
00498 inline int vtkMultiProcessController::Receive(unsigned long* data, 
00499                                               int length,int remoteThreadId, 
00500                                               int tag)
00501 {
00502   if (this->Communicator)
00503     {
00504     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00505     }
00506   else
00507     {
00508     return 0;
00509     }
00510 }
00511 
00512 inline int vtkMultiProcessController::Receive(char* data, int length, 
00513                                               int remoteThreadId, int tag)
00514 {
00515   if (this->Communicator)
00516     {
00517     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00518     }
00519   else
00520     {
00521     return 0;
00522     }
00523 }
00524 
00525 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 
00526                                               int remoteThreadId, int tag)
00527 {
00528   if (this->Communicator)
00529     {
00530     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00531     }
00532   else
00533     {
00534     return 0;
00535     }
00536 }
00537 
00538 inline int vtkMultiProcessController::Receive(float* data, int length, 
00539                                               int remoteThreadId, int tag)
00540 {
00541   if (this->Communicator)
00542     {
00543     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00544     }
00545   else
00546     {
00547     return 0;
00548     }
00549 }
00550 
00551 inline int vtkMultiProcessController::Receive(double* data, int length, 
00552                                               int remoteThreadId, int tag)
00553 {
00554   if (this->Communicator)
00555     {
00556     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00557     }
00558   else
00559     {
00560     return 0;
00561     }
00562 }
00563 
00564 #ifdef VTK_USE_64BIT_IDS
00565 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 
00566                                               int remoteThreadId, int tag)
00567 {
00568   if (this->Communicator)
00569     {
00570     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00571     }
00572   else
00573     {
00574     return 0;
00575     }
00576 }
00577 #endif
00578 
00579 #endif