VTK  9.4.20250102
vtkThreadedCallbackQueue.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
2// SPDX-License-Identifier: BSD-3-Clause
23#ifndef vtkThreadedCallbackQueue_h
24#define vtkThreadedCallbackQueue_h
25
26#include "vtkCommonCoreModule.h" // For export macro
27#include "vtkObject.h"
28#include "vtkSmartPointer.h" // For vtkSmartPointer
29
30#include <atomic> // For atomic_bool
31#include <condition_variable> // For condition variable
32#include <deque> // For deque
33#include <functional> // For greater
34#include <memory> // For unique_ptr
35#include <mutex> // For mutex
36#include <thread> // For thread
37#include <unordered_map> // For unordered_map
38#include <unordered_set> // For unordered_set
39#include <vector> // For vector
40
41#if !defined(__WRAP__)
42
43VTK_ABI_NAMESPACE_BEGIN
44
45class VTKCOMMONCORE_EXPORT vtkThreadedCallbackQueue : public vtkObject
46{
47private:
51 template <class FT>
52 struct Signature;
53
58 template <class T, class DummyT = std::nullptr_t>
59 struct Dereference
60 {
61 struct Type;
62 };
63
67 template <class T>
68 using DereferencedType = typename std::decay<typename Dereference<T>::Type>::type;
69
73 template <class FT>
74 using InvokeResult = typename Signature<DereferencedType<FT>>::InvokeResult;
75
79 template <class ReturnT, bool IsLValueReference = std::is_lvalue_reference<ReturnT>::value>
80 class ReturnValueWrapper
81 {
82 class ReturnLValueRef;
83 class ReturnConstLValueRef;
84 };
85
86public:
89 void PrintSelf(ostream& os, vtkIndent indent) override;
90
92
97
103 {
104 public:
106
108 : NumberOfPriorSharedFuturesRemaining(0)
109 , Status(CONSTRUCTING)
110 {
111 }
112
116 virtual void Wait() const
117 {
118 if (this->Status == READY)
119 {
120 return;
121 }
122 std::unique_lock<std::mutex> lock(this->Mutex);
123 if (this->Status != READY)
124 {
125 this->ConditionVariable.wait(lock, [this] { return this->Status == READY; });
126 }
127 }
128
130
131 private:
135 virtual void operator()() = 0;
136
140 std::atomic_int NumberOfPriorSharedFuturesRemaining;
141
147 std::atomic_int Status;
148
154 vtkIdType InvokerIndex;
155
160 bool IsHighPriority = false;
161
166 std::vector<vtkSmartPointer<vtkSharedFutureBase>> Dependents;
167
168 mutable std::mutex Mutex;
169 mutable std::condition_variable ConditionVariable;
170
171 vtkSharedFutureBase(const vtkSharedFutureBase& other) = delete;
172 void operator=(const vtkSharedFutureBase& other) = delete;
173 };
174
178 template <class ReturnT>
180 {
181 public:
183
184 using ReturnLValueRef = typename ReturnValueWrapper<ReturnT>::ReturnLValueRef;
185 using ReturnConstLValueRef = typename ReturnValueWrapper<ReturnT>::ReturnConstLValueRef;
186
187 vtkSharedFuture() = default;
188
194
200
202
203 private:
204 ReturnValueWrapper<ReturnT> ReturnValue;
205
206 vtkSharedFuture(const vtkSharedFuture<ReturnT>& other) = delete;
207 void operator=(const vtkSharedFuture<ReturnT>& other) = delete;
208 };
209
211 template <class ReturnT>
213
275 template <class FT, class... ArgsT>
277
286 template <class SharedFutureContainerT, class FT, class... ArgsT>
288 SharedFutureContainerT&& priorSharedFutures, FT&& f, ArgsT&&... args);
289
304 template <class SharedFutureContainerT>
305 void Wait(SharedFutureContainerT&& priorSharedFuture);
306
308
316 template <class ReturnT>
318 template <class ReturnT>
320 const SharedFuturePointer<ReturnT>& future);
322
331 void SetNumberOfThreads(int numberOfThreads);
332
340 int GetNumberOfThreads() const { return this->NumberOfThreads; }
341
342private:
344
348 template <class FT, class... ArgsT>
349 class vtkInvoker;
351
352 struct InvokerImpl;
353
354 template <class FT, class... ArgsT>
355 using InvokerPointer = vtkSmartPointer<vtkInvoker<FT, ArgsT...>>;
356
357 class ThreadWorker;
358
359 friend class ThreadWorker;
360
366 enum Status
367 {
374 CONSTRUCTING = 0x00,
375
379 ON_HOLD = 0x01,
380
385 ENQUEUED = 0x02,
386
390 RUNNING = 0x04,
391
395 READY = 0x08
396 };
397
404 void Sync(int startId = 0);
405
410 void PopFrontNullptr();
411
417 void SignalDependentSharedFutures(vtkSharedFutureBase* invoker);
418
424 template <class SharedFutureContainerT, class InvokerT>
425 void HandleDependentInvoker(SharedFutureContainerT&& priorSharedFutures, InvokerT&& invoker);
426
430 void Invoke(vtkSharedFutureBase* invoker);
431
436 bool TryInvoke(vtkSharedFutureBase* invoker);
437
442 template <class FT, class... ArgsT>
443 void PushControl(FT&& f, ArgsT&&... args);
444
448 template <class SharedFutureContainerT>
449 static bool MustWait(SharedFutureContainerT&& priorSharedFutures);
450
454 std::deque<SharedFutureBasePointer> InvokerQueue;
455
459 std::mutex Mutex;
460
464 std::mutex ControlMutex;
465
470 std::mutex DestroyMutex;
471
475 std::mutex ThreadIdToIndexMutex;
476
477 std::condition_variable ConditionVariable;
478
483 std::atomic_bool Destroying{ false };
484
488 std::atomic_int NumberOfThreads;
489
490 std::vector<std::thread> Threads;
491
499 std::unordered_map<std::thread::id, std::shared_ptr<std::atomic_int>> ThreadIdToIndex;
500
505 std::unordered_set<SharedFutureBasePointer> ControlFutures;
506
508 void operator=(const vtkThreadedCallbackQueue&) = delete;
509};
510
511VTK_ABI_NAMESPACE_END
512
513#include "vtkThreadedCallbackQueue.txx"
514
515#endif
516#endif
517// VTK-HeaderTest-Exclude: vtkThreadedCallbackQueue.h
a simple class to control print indentation
Definition vtkIndent.h:108
abstract base class for most VTK objects
abstract base class for most VTK objects
Definition vtkObject.h:162
Hold a reference to a vtkObjectBase instance.
vtkSharedFutureBase is the base block to store, run, get the returned value of the tasks that are pus...
virtual void Wait() const
Blocks current thread until the task associated with this future has terminated.
vtkBaseTypeMacro(vtkSharedFutureBase, vtkObjectBase)
A vtkSharedFuture is an object returned by the methods Push and PushDependent.
vtkAbstractTypeMacro(vtkSharedFuture< ReturnT >, vtkSharedFutureBase)
ReturnConstLValueRef Get() const
This returns the return value of the pushed function.
ReturnLValueRef Get()
This returns the return value of the pushed function.
typename ReturnValueWrapper< ReturnT >::ReturnLValueRef ReturnLValueRef
typename ReturnValueWrapper< ReturnT >::ReturnConstLValueRef ReturnConstLValueRef
simple threaded callback queue
int GetNumberOfThreads() const
Returns the number of allocated threads.
void PrintSelf(ostream &os, vtkIndent indent) override
Methods invoked by print to print information about the object including superclasses.
vtkSharedFuture< ReturnT >::ReturnConstLValueRef Get(const SharedFuturePointer< ReturnT > &future)
Get the returned value from the task associated with the input future.
SharedFuturePointer< InvokeResult< FT > > Push(FT &&f, ArgsT &&... args)
Pushes a function f to be passed args... as arguments.
void Wait(SharedFutureContainerT &&priorSharedFuture)
This method blocks the current thread until all the tasks associated with each shared future inside p...
static vtkThreadedCallbackQueue * New()
SharedFuturePointer< InvokeResult< FT > > PushDependent(SharedFutureContainerT &&priorSharedFutures, FT &&f, ArgsT &&... args)
This method behaves the same way Push does, with the addition of a container of futures.
void SetNumberOfThreads(int numberOfThreads)
Sets the number of threads.
~vtkThreadedCallbackQueue() override
Any remaining function that was not executed yet will be executed in this destructor.
vtkSharedFuture< ReturnT >::ReturnLValueRef Get(SharedFuturePointer< ReturnT > &future)
Get the returned value from the task associated with the input future.
int vtkIdType
Definition vtkType.h:315