21 #ifndef FXWorkerThread_h
22 #define FXWorkerThread_h
27 #define WORKLOAD_INTERVAL 100
33 #ifdef WORKLOAD_PROFILING
97 #ifdef WORKLOAD_PROFILING
98 , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
101 #ifdef WORKLOAD_PROFILING
102 long long int timeDiff = 0;
103 for (
int i = 0; i < 100; i++) {
104 const auto begin = std::chrono::high_resolution_clock::now();
105 const auto end = std::chrono::high_resolution_clock::now();
106 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
110 while (numThreads > 0) {
151 #ifdef WORKLOAD_PROFILING
154 worker->startProfile();
156 myProfileStart = std::chrono::high_resolution_clock::now();
185 void waitAll(
const bool deleteFinished =
true) {
190 #ifdef WORKLOAD_PROFILING
192 const auto end = std::chrono::high_resolution_clock::now();
193 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
194 double minLoad = std::numeric_limits<double>::max();
197 const double load = worker->endProfile(elapsed);
198 minLoad =
MIN2(minLoad, load);
199 maxLoad =
MAX2(maxLoad, load);
201 #ifdef WORKLOAD_INTERVAL
202 myTotalMaxLoad += maxLoad;
203 myTotalSpread += maxLoad / minLoad;
213 if (deleteFinished) {
223 if (toRaise !=
nullptr) {
275 #ifdef WORKLOAD_PROFILING
279 double myTotalMaxLoad;
281 double myTotalSpread;
283 std::chrono::high_resolution_clock::time_point myProfileStart;
295 #ifdef WORKLOAD_PROFILING
296 , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
309 #ifdef WORKLOAD_PROFILING
310 const double load = 100. * myTotalBusyTime / myTotalTime;
312 " tasks and had a load of " +
toString(load) +
"% (" +
toString(myTotalBusyTime) +
313 "us / " +
toString(myTotalTime) +
"us), " +
toString(myTotalBusyTime / (
double)myCounter) +
" per task.");
348 #ifdef WORKLOAD_PROFILING
349 const auto before = std::chrono::high_resolution_clock::now();
352 #ifdef WORKLOAD_PROFILING
353 const auto after = std::chrono::high_resolution_clock::now();
354 myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
378 #ifdef WORKLOAD_PROFILING
379 void startProfile() {
383 double endProfile(
const long long int time) {
385 myTotalBusyTime += myBusyTime;
386 return time == 0 ? 100. : 100. * myBusyTime / time;
403 #ifdef WORKLOAD_PROFILING
407 long long int myBusyTime;
409 long long int myTotalBusyTime;
411 long long int myTotalTime;
#define WORKLOAD_INTERVAL
#define WRITE_MESSAGE(msg)
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
A pool of worker threads which distributes the tasks and collects the results.
bool isFull() const
Checks whether there are currently more pending tasks than threads.
int myRunningIndex
the running index for the next task
void clear()
Stops and deletes all worker threads.
void addFinished(std::list< Task * > &tasks)
Adds the given tasks to the list of finished tasks.
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
void lock()
locks the pool mutex
std::list< Task * > myFinishedTasks
list of finished tasks
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index....
FXMutex myMutex
the internal mutex for the task list
FXCondition myCondition
the semaphore to wait on for finishing all tasks
FXMutex myPoolMutex
the pool mutex for external sync
const std::vector< FXWorkerThread * > & getWorkers()
std::vector< FXWorkerThread * > myWorkers
the current worker threads
virtual ~Pool()
Destructor.
void unlock()
unlocks the pool mutex
ProcessError * myException
the exception from a child thread
int size() const
Returns the number of threads in the pool.
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
void setException(ProcessError &e)
Pool(int numThreads=0)
Constructor.
Abstract superclass of a task to be run with an index to keep track of pending tasks.
int myIndex
the index of the task, valid only after the task has been added to the pool
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
void setIndex(const int newIndex)
Sets the running index of this task.
virtual ~Task()
Desctructor.
A thread repeatingly calculating incoming tasks.
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
void add(Task *t)
Adds the given task to this thread to be calculated.
FXMutex myMutex
the mutex for the task list
virtual ~FXWorkerThread()
Destructor.
FXCondition myCondition
the semaphore when waiting for new tasks
FXWorkerThread(Pool &pool)
Constructor.
FXint run()
Main execution method of this thread.
void stop()
Stops the thread.
std::list< Task * > myTasks
the list of pending tasks
bool myStopped
whether we are still running
Pool & myPool
the pool for this thread