From 9ba04fc9c74a32d37bbadee06864b87efe774ec8 Mon Sep 17 00:00:00 2001 From: Jean-Francois Dockes Date: Mon, 26 Nov 2012 09:16:06 +0100 Subject: [PATCH] timing and traces --- src/mk/Linux | 2 +- src/rcldb/rcldb.cpp | 18 +++++++++++++---- src/rcldb/rcldb_p.h | 4 ++-- src/utils/workqueue.h | 45 +++++++++++++++++++++++-------------------- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/mk/Linux b/src/mk/Linux index a74ca3e1..9f5b035e 100644 --- a/src/mk/Linux +++ b/src/mk/Linux @@ -5,5 +5,5 @@ include $(depth)/mk/localdefs ALL_CXXFLAGS = $(CXXFLAGS) $(COMMONCXXFLAGS) $(LOCALCXXFLAGS) \ -D_GNU_SOURCE -LIBSYS = -lpthread -ldl +LIBSYS = -lpthread -ldl -lrt LIBSYSTHREADS = -lrt diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index b4207cfa..3252248d 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -801,11 +801,12 @@ void *DbUpdWorker(void* vdbp) DbUpdTask *tsk; for (;;) { - if (!tqp->take(&tsk)) { + size_t qsz; + if (!tqp->take(&tsk, &qsz)) { tqp->workerExit(); return (void*)1; } - LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size()))); + LOGDEB(("DbUpdWorker: got task, ql %d\n", int(qsz))); if (!dbp->m_ndb->addOrUpdateWrite(tsk->udi, tsk->uniterm, tsk->doc, tsk->txtlen)) { LOGERR(("DbUpdWorker: addOrUpdateWrite failed\n")); @@ -1163,7 +1164,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, // Test if we're over the flush threshold (limit memory usage): bool ret = m_rcldb->maybeflush(textlen); #ifdef IDX_THREADS - m_totalworkus += chron.micros(); + m_totalworkns += chron.nanos(); #endif return ret; } @@ -1171,9 +1172,18 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, #ifdef IDX_THREADS void Db::waitUpdIdle() { + Chrono chron; m_ndb->m_wqueue.waitIdle(); + string ermsg; + try { + m_ndb->xwdb.flush(); + } XCATCHERROR(ermsg); + if (!ermsg.empty()) { + LOGERR(("Db::waitUpdIdle: flush() failed: %s\n", ermsg.c_str())); + } + m_ndb->m_totalworkns += chron.nanos(); LOGDEB(("Db::waitUpdIdle: total work %lld mS\n", - m_ndb->m_totalworkus/1000)); + m_ndb->m_totalworkns/1000000)); } #endif diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index d5af6152..4eaecea5 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -63,7 +63,7 @@ class Db::Native { WorkQueue m_wqueue; int m_loglevel; PTMutexInit m_mutex; - long long m_totalworkus; + long long m_totalworkns; #endif // IDX_THREADS // Indexing @@ -79,7 +79,7 @@ class Db::Native { : m_rcldb(db), m_isopen(false), m_iswritable(false), m_noversionwrite(false) #ifdef IDX_THREADS - , m_wqueue("DbUpd", 2), m_totalworkus(0LL) + , m_wqueue("DbUpd", 2), m_totalworkns(0LL) #endif // IDX_THREADS { LOGDEB2(("Native::Native: me %p\n", this)); diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h index 7f02e43c..b6478bb2 100644 --- a/src/utils/workqueue.h +++ b/src/utils/workqueue.h @@ -31,6 +31,7 @@ using std::string; #include "debuglog.h" +/// Just an initialized timespec. Not really used any more. class WQTData { public: WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;} @@ -40,7 +41,7 @@ class WQTData { /** * A WorkQueue manages the synchronisation around a queue of work items, * where a number of client threads queue tasks and a number of worker - * threads takes and executes them. The goal is to introduce some level + * threads take and execute them. The goal is to introduce some level * of parallelism between the successive steps of a previously single * threaded pipe-line (data extraction / data preparation / index * update). @@ -60,9 +61,10 @@ public: * @param lo minimum count of tasks before worker starts. Default 1. */ WorkQueue(const string& name, int hi = 0, int lo = 1) - : m_name(name), m_high(hi), m_low(lo), m_size(0), + : m_name(name), m_high(hi), m_low(lo), m_workers_waiting(0), m_workers_exited(0), - m_clients_waiting(0), m_tottasks(0), m_nowake(0) + m_clients_waiting(0), m_tottasks(0), m_nowake(0), + m_workersleeps(0) { m_ok = (pthread_cond_init(&m_ccond, 0) == 0) && (pthread_cond_init(&m_wcond, 0) == 0) && @@ -112,15 +114,14 @@ public: // Keep the order: we test ok() AFTER the sleep... m_clients_waiting++; if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) { - pthread_mutex_unlock(&m_mutex); m_clients_waiting--; + pthread_mutex_unlock(&m_mutex); return false; } m_clients_waiting--; } m_queue.push(t); - ++m_size; if (m_workers_waiting > 0) { // Just wake one worker, there is only one new task. pthread_cond_signal(&m_wcond); @@ -155,9 +156,9 @@ public: m_clients_waiting++; if (pthread_cond_wait(&m_ccond, &m_mutex)) { m_clients_waiting--; - pthread_mutex_unlock(&m_mutex); m_ok = false; LOGERR(("WorkQueue::waitIdle: cond_wait failed\n")); + pthread_mutex_unlock(&m_mutex); return false; } m_clients_waiting--; @@ -196,8 +197,8 @@ public: m_clients_waiting--; } - LOGDEB(("%s: %u tasks %u nowakes\n", m_name.c_str(), m_tottasks, - m_nowake)); + LOGDEB(("%s: %u tasks %u nowakes %u wsleeps \n", m_name.c_str(), + m_tottasks, m_nowake, m_workersleeps)); // Perform the thread joins and compute overall status // Workers return (void*)1 if ok void *statusall = (void*)1; @@ -210,8 +211,8 @@ public: statusall = status; m_worker_threads.erase(it); } - pthread_mutex_unlock(&m_mutex); LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str())); + pthread_mutex_unlock(&m_mutex); return statusall; } @@ -220,12 +221,13 @@ public: * Sleeps if there are not enough. Signal if we go * to sleep on empty queue: client may be waiting for our going idle. */ - bool take(T* tp) + bool take(T* tp, size_t *szp = 0) { if (!ok() || pthread_mutex_lock(&m_mutex) != 0) return false; while (ok() && m_queue.size() < m_low) { + m_workersleeps++; m_workers_waiting++; if (m_queue.empty()) pthread_cond_broadcast(&m_ccond); @@ -234,8 +236,8 @@ public: if (ok()) LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", m_name.c_str())); - pthread_mutex_unlock(&m_mutex); m_workers_waiting--; + pthread_mutex_unlock(&m_mutex); return false; } m_workers_waiting--; @@ -243,8 +245,9 @@ public: m_tottasks++; *tp = m_queue.front(); + if (szp) + *szp = m_queue.size(); m_queue.pop(); - --m_size; if (m_clients_waiting > 0) { // No reason to wake up more than one client thread pthread_cond_signal(&m_ccond); @@ -272,15 +275,12 @@ public: pthread_mutex_unlock(&m_mutex); } - /** Return current queue size. Debug only. - * - * As the size is returned while the queue is unlocked, there - * is no warranty on its consistency. Not that we use the member - * size, not the container size() call which would need locking. - */ - size_t size() + size_t qsize() { - return m_size; + pthread_mutex_lock(&m_mutex); + size_t sz = m_queue.size(); + pthread_mutex_unlock(&m_mutex); + return sz; } private: @@ -299,11 +299,13 @@ private: string m_name; size_t m_high; size_t m_low; - size_t m_size; + /* Worker threads currently waiting for a job */ unsigned int m_workers_waiting; unsigned int m_workers_exited; + // Per-thread data. The data is not used currently, this could be + // a set unordered_map m_worker_threads; queue m_queue; pthread_cond_t m_ccond; @@ -312,6 +314,7 @@ private: unsigned int m_clients_waiting; unsigned int m_tottasks; unsigned int m_nowake; + unsigned int m_workersleeps; bool m_ok; };