This commit is contained in:
Jean-Francois Dockes 2012-12-04 10:17:57 +01:00
parent 4ca419ce2b
commit 8bd3e35ac8

View File

@ -1,4 +1,4 @@
/* /* Copyright (C) 2012 J.F.Dockes
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or * the Free Software Foundation; either version 2 of the License, or
@ -32,7 +32,8 @@ using std::string;
#include "debuglog.h" #include "debuglog.h"
#include "ptmutex.h" #include "ptmutex.h"
/// Just an initialized timespec. Not really used any more. /// Store per-worker-thread data. Just an initialized timespec, and
/// used at the moment.
class WQTData { class WQTData {
public: public:
WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;} WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
@ -44,8 +45,8 @@ class WQTData {
* where a number of client threads queue tasks and a number of worker * where a number of client threads queue tasks and a number of worker
* threads take and execute them. The goal is to introduce some level * threads take and execute them. The goal is to introduce some level
* of parallelism between the successive steps of a previously single * of parallelism between the successive steps of a previously single
* threaded pipe-line (data extraction / data preparation / index * threaded pipeline. For example data extraction / data preparation / index
* update). * update, but this could have other uses.
* *
* There is no individual task status return. In case of fatal error, * There is no individual task status return. In case of fatal error,
* the client or worker sets an end condition on the queue. A second * the client or worker sets an end condition on the queue. A second
@ -81,7 +82,7 @@ public:
* *
* @param nworkers number of threads copies to start. * @param nworkers number of threads copies to start.
* @param start_routine thread function. It should loop * @param start_routine thread function. It should loop
* taking (QueueWorker::take() and executing tasks. * taking (QueueWorker::take()) and executing tasks.
* @param arg initial parameter to thread function. * @param arg initial parameter to thread function.
* @return true if ok. * @return true if ok.
*/ */
@ -142,9 +143,9 @@ public:
* back sleeping. Used by the client to wait for all current work * back sleeping. Used by the client to wait for all current work
* to be completed, when it needs to perform work that couldn't be * to be completed, when it needs to perform work that couldn't be
* done in parallel with the worker's tasks, or before shutting * done in parallel with the worker's tasks, or before shutting
* down. Work can be resumed after calling this. Note that the only thread * down. Work can be resumed after calling this. Note that the
* which can call it safely is the client just above (which can * only thread which can call it safely is the client just above
control the task flow), else there could be * (which can control the task flow), else there could be
* tasks in the intermediate queues. * tasks in the intermediate queues.
*/ */
bool waitIdle() bool waitIdle()
@ -175,9 +176,10 @@ public:
} }
/** Tell the workers to exit, and wait for them. Does not bother about /** Tell the workers to exit, and wait for them.
* tasks possibly remaining on the queue, so should be called *
* after waitIdle() for an orderly shutdown. * Does not bother about tasks possibly remaining on the queue, so
* should be called after waitIdle() for an orderly shutdown.
*/ */
void* setTerminateAndWait() void* setTerminateAndWait()
{ {
@ -230,8 +232,8 @@ public:
/** Take task from queue. Called from worker. /** Take task from queue. Called from worker.
* *
* Sleeps if there are not enough. Signal if we go * Sleeps if there are not enough. Signal if we go to sleep on empty
* to sleep on empty queue: client may be waiting for our going idle. * queue: client may be waiting for our going idle.
*/ */
bool take(T* tp, size_t *szp = 0) bool take(T* tp, size_t *szp = 0)
{ {
@ -272,11 +274,12 @@ public:
} }
/** Advertise exit and abort queue. Called from worker /** Advertise exit and abort queue. Called from worker
* This would normally happen after an unrecoverable error, or when *
* This would happen after an unrecoverable error, or when
* the queue is terminated by the client. Workers never exit normally, * the queue is terminated by the client. Workers never exit normally,
* except when the queue is shut down (at which point m_ok is set to false * except when the queue is shut down (at which point m_ok is set to
* by the shutdown code anyway). The thread must return/exit immediately * false by the shutdown code anyway). The thread must return/exit
* after calling this * immediately after calling this.
*/ */
void workerExit() void workerExit()
{ {