From 9bc2fc89585f05184b54d41f4fe16e600da0e0c5 Mon Sep 17 00:00:00 2001 From: Jean-Francois Dockes Date: Tue, 21 Feb 2012 17:09:02 +0100 Subject: [PATCH] Experimented with multithreading the indexing pipeline. Left undef'd as 15%-30% improvement of indexing time does not seem worth the complexity --- src/index/fsindexer.cpp | 52 +++++++++++ src/index/fsindexer.h | 21 +++++ src/rcldb/rcldb.cpp | 85 ++++++++++++++++-- src/rcldb/rcldb.h | 6 ++ src/rcldb/rcldb_p.h | 31 ++++++- src/rcldb/rclquery.cpp | 4 +- src/utils/workqueue.cpp | 102 +++++++++++++++++++++ src/utils/workqueue.h | 195 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 487 insertions(+), 9 deletions(-) create mode 100644 src/utils/workqueue.cpp create mode 100644 src/utils/workqueue.h diff --git a/src/index/fsindexer.cpp b/src/index/fsindexer.cpp index af238cb9..997cae48 100644 --- a/src/index/fsindexer.cpp +++ b/src/index/fsindexer.cpp @@ -66,10 +66,23 @@ using namespace std; FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc) : m_config(cnf), m_db(db), m_updater(updfunc), m_missing(new FIMissingStore) +#ifdef IDX_THREADS + , m_wqueue(10) +#endif // IDX_THREADS { m_havelocalfields = m_config->hasNameAnywhere("localfields"); +#ifdef IDX_THREADS + if (!m_wqueue.start(FsIndexerIndexWorker, this)) { + LOGERR(("FsIndexer::FsIndexer: worker start failed\n")); + return; + } +#endif // IDX_THREADS } FsIndexer::~FsIndexer() { +#ifdef IDX_THREADS + void *status = m_wqueue.setTerminateAndWait(); + LOGERR(("FsIndexer: worker status: %ld\n", long(status))); +#endif // IDX_THREADS delete m_missing; } @@ -127,6 +140,9 @@ bool FsIndexer::index() } } +#ifdef IDX_THREADS + m_wqueue.waitIdle(); +#endif // IDX_THREADS string missing; FileInterner::getMissingDescription(m_missing, missing); if (!missing.empty()) { @@ -289,6 +305,28 @@ void FsIndexer::makesig(const struct stat *stp, string& out) out = cbuf; } +#ifdef IDX_THREADS +void *FsIndexerIndexWorker(void * fsp) +{ + FsIndexer *fip = (FsIndexer*)fsp; + WorkQueue *tqp = &fip->m_wqueue; + IndexingTask *tsk; + for (;;) { + if (!tqp->take(&tsk)) { + tqp->workerExit(); + return (void*)1; + } + LOGDEB(("FsIndexerIndexWorker: got task, ql %d\n", int(tqp->size()))); + if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) { + tqp->setTerminateAndWait(); + tqp->workerExit(); + return (void*)0; + } + delete tsk; + } +} +#endif // IDX_THREADS + /// This method gets called for every file and directory found by the /// tree walker. /// @@ -443,8 +481,16 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp, // of the file document. string udi; make_udi(fn, doc.ipath, udi); + +#ifdef IDX_THREADS + IndexingTask *tp = new IndexingTask(udi, doc.ipath.empty() ? + cstr_null : parent_udi, doc); + if (!m_wqueue.put(tp)) + return FsTreeWalker::FtwError; +#else if (!m_db->addOrUpdate(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc)) return FsTreeWalker::FtwError; +#endif // IDX_THREADS // Tell what we are doing and check for interrupt request if (m_updater) { @@ -476,8 +522,14 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp, fileDoc.fbytes = cbuf; // Document signature for up to date checks. makesig(stp, fileDoc.sig); +#ifdef IDX_THREADS + IndexingTask *tp = new IndexingTask(parent_udi, cstr_null, fileDoc); + if (!m_wqueue.put(tp)) + return FsTreeWalker::FtwError; +#else if (!m_db->addOrUpdate(parent_udi, cstr_null, fileDoc)) return FsTreeWalker::FtwError; +#endif // IDX_THREADS } return FsTreeWalker::FtwOk; diff --git a/src/index/fsindexer.h b/src/index/fsindexer.h index 7a254ce4..97602ed7 100644 --- a/src/index/fsindexer.h +++ b/src/index/fsindexer.h @@ -24,11 +24,27 @@ using std::list; #include "indexer.h" #include "fstreewalk.h" +#ifdef IDX_THREADS +#include "workqueue.h" +#endif // IDX_THREADS class DbIxStatusUpdater; class FIMissingStore; struct stat; +#ifdef IDX_THREADS +class IndexingTask { +public: + IndexingTask(const string& u, const string& p, const Rcl::Doc& d) + :udi(u), parent_udi(p), doc(d) + {} + string udi; + string parent_udi; + Rcl::Doc doc; +}; +extern void *FsIndexerIndexWorker(void*); +#endif // IDX_THREADS + /** Index selected parts of the file system Tree indexing: we inherits FsTreeWalkerCB so that, the processone() @@ -89,6 +105,11 @@ class FsIndexer : public FsTreeWalkerCB { bool m_havelocalfields; map m_localfields; +#ifdef IDX_THREADS + friend void *FsIndexerIndexWorker(void*); + WorkQueue m_wqueue; +#endif // IDX_THREADS + bool init(); void localfieldsfromconf(); void setlocalfields(Rcl::Doc& doc); diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index 8c2a656b..622eb0f2 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -34,6 +34,7 @@ using namespace std; #include "xapian.h" #include "rclconfig.h" +#include "debuglog.h" #include "rcldb.h" #include "rcldb_p.h" #include "stemdb.h" @@ -41,7 +42,6 @@ using namespace std; #include "transcode.h" #include "unacpp.h" #include "conftree.h" -#include "debuglog.h" #include "pathut.h" #include "smallut.h" #include "utf8iter.h" @@ -571,6 +571,12 @@ Db::Db(RclConfig *cfp) m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc); m_config->getConfParam("idxflushmb", &m_flushMb); } +#ifdef IDX_THREADS + if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) { + LOGERR(("Db::Db: Worker start failed\n")); + return; + } +#endif // IDX_THREADS } Db::~Db() @@ -862,7 +868,6 @@ bool Db::fieldToTraits(const string& fld, const FieldTraits **ftpp) // fields and position jumps to separate fields class TextSplitDb : public TextSplitP { public: - Xapian::WritableDatabase db; Xapian::Document &doc; // Xapian document // Base for document section. Gets large increment when we change // sections, to avoid cross-section proximity matches. @@ -875,10 +880,9 @@ class TextSplitDb : public TextSplitP { // to compute the first position of the next section. Xapian::termpos curpos; - TextSplitDb(Xapian::WritableDatabase idb, - Xapian::Document &d, TermProc *prc) + TextSplitDb(Xapian::Document &d, TermProc *prc) : TextSplitP(prc), - db(idb), doc(d), basepos(1), curpos(0), wdfinc(1) + doc(d), basepos(1), curpos(0), wdfinc(1) {} // Reimplement text_to_words to add start and end special terms virtual bool text_to_words(const string &in); @@ -1003,6 +1007,58 @@ static const string cstr_nc("\n\r\x0c"); #define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";} +#ifdef IDX_THREADS +void *DbUpdWorker(void* vdbp) +{ + Db *dbp = (Db *)vdbp; + WorkQueue *tqp = &(dbp->m_ndb->m_wqueue); + DbUpdTask *tsk; + + for (;;) { + if (!tqp->take(&tsk)) { + tqp->workerExit(); + return (void*)1; + } + LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size()))); + + const char *fnc = tsk->udi.c_str(); + string ermsg; + + // Add db entry or update existing entry: + try { + Xapian::docid did = + dbp->m_ndb->xwdb.replace_document(tsk->uniterm, + tsk->doc); + if (did < dbp->updated.size()) { + dbp->updated[did] = true; + LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc)); + } else { + LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc)); + } + } XCATCHERROR(ermsg); + + if (!ermsg.empty()) { + LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str())); + ermsg.erase(); + // FIXME: is this ever actually needed? + try { + dbp->m_ndb->xwdb.add_document(tsk->doc); + LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n", + fnc)); + } XCATCHERROR(ermsg); + if (!ermsg.empty()) { + LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str())); + tqp->workerExit(); + return (void*)0; + } + } + dbp->maybeflush(tsk->txtlen); + + delete tsk; + } +} +#endif // IDX_THREADS + // Add document in internal form to the database: index the terms in // the title abstract and body and add special terms for file name, // date, mime type etc. , create the document data record (more @@ -1039,7 +1095,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, // TermProcCommongrams tpcommon(nxt, m_stops); nxt = &tpcommon; TermProcPrep tpprep(nxt); nxt = &tpprep; - TextSplitDb splitter(m_ndb->xwdb, newdocument, nxt); + TextSplitDb splitter(newdocument, nxt); tpidx.setTSD(&splitter); // Split and index file name as document term(s) @@ -1271,6 +1327,13 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str())); newdocument.set_data(record); +#ifdef IDX_THREADS + DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length()); + if (!m_ndb->m_wqueue.put(tp)) { + LOGERR(("Db::addOrUpdate:Cant queue task\n")); + return false; + } +#else const char *fnc = udi.c_str(); string ermsg; @@ -1303,6 +1366,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, // Test if we're over the flush threshold (limit memory usage): maybeflush(doc.text.length()); +#endif // IDX_THREADS return true; } @@ -1455,6 +1519,10 @@ bool Db::purge() if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false) return false; +#ifdef IDX_THREADS + m_ndb->m_wqueue.waitIdle(); +#endif // IDX_THREADS + // For xapian versions up to 1.0.1, deleting a non-existant // document would trigger an exception that would discard any // pending update. This could lose both previous added documents @@ -1520,6 +1588,11 @@ bool Db::purgeFile(const string &udi, bool *existed) LOGDEB(("Db:purgeFile: [%s]\n", udi.c_str())); if (m_ndb == 0 || !m_ndb->m_iswritable) return false; + +#ifdef IDX_THREADS + m_ndb->m_wqueue.waitIdle(); +#endif // IDX_THREADS + Xapian::WritableDatabase db = m_ndb->xwdb; string uniterm = make_uniterm(udi); string ermsg; diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h index 559500e3..271d28ce 100644 --- a/src/rcldb/rcldb.h +++ b/src/rcldb/rcldb.h @@ -83,6 +83,9 @@ public: double dbavgdoclen; }; +#ifdef IDX_THREADS +extern void *DbUpdWorker(void*); +#endif // IDX_THREADS /** * Wrapper class for the native database. */ @@ -91,6 +94,9 @@ class Db { // A place for things we don't want visible here. class Native; friend class Native; +#ifdef IDX_THREADS + friend void *DbUpdWorker(void*); +#endif // IDX_THREADS /* General stuff (valid for query or update) ****************************/ Db(RclConfig *cfp); diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index 3a5ef8e8..42a23bd4 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -18,6 +18,9 @@ #ifndef _rcldb_p_h_included_ #define _rcldb_p_h_included_ +#ifdef IDX_THREADS +#include "workqueue.h" +#endif // IDX_THREADS #include "xapian.h" namespace Rcl { @@ -63,6 +66,20 @@ enum value_slot { class Query; +#ifdef IDX_THREADS +class DbUpdTask { +public: + DbUpdTask(const string& ud, const string& un, const Xapian::Document &d, + size_t tl) + : udi(ud), uniterm(un), doc(d), txtlen(tl) + {} + string udi; + string uniterm; + Xapian::Document doc; + size_t txtlen; +}; +#endif // IDX_THREADS + // A class for data and methods that would have to expose // Xapian-specific stuff if they were in Rcl::Db. There could actually be // 2 different ones for indexing or query as there is not much in @@ -73,6 +90,9 @@ class Db::Native { bool m_isopen; bool m_iswritable; bool m_noversionwrite; //Set if open failed because of version mismatch! +#ifdef IDX_THREADS + WorkQueue m_wqueue; +#endif // IDX_THREADS // Indexing Xapian::WritableDatabase xwdb; @@ -86,9 +106,18 @@ class Db::Native { Native(Db *db) : m_rcldb(db), m_isopen(false), m_iswritable(false), m_noversionwrite(false) +#ifdef IDX_THREADS + , m_wqueue(10) +#endif // IDX_THREADS { } - ~Native() { + ~Native() { +#ifdef IDX_THREADS + if (m_iswritable) { + void *status = m_wqueue.setTerminateAndWait(); + LOGDEB(("Native: worker status %ld\n", long(status))); + } +#endif // IDX_THREADS } vector makeAbstract(Xapian::docid id, Query *query); diff --git a/src/rcldb/rclquery.cpp b/src/rcldb/rclquery.cpp index c4046f73..6755d0be 100644 --- a/src/rcldb/rclquery.cpp +++ b/src/rcldb/rclquery.cpp @@ -25,15 +25,15 @@ #include "xapian.h" #include "cstr.h" +#include "rclconfig.h" +#include "debuglog.h" #include "rcldb.h" #include "rcldb_p.h" #include "rclquery.h" #include "rclquery_p.h" -#include "debuglog.h" #include "conftree.h" #include "smallut.h" #include "searchdata.h" -#include "rclconfig.h" #include "unacpp.h" #ifndef NO_NAMESPACES diff --git a/src/utils/workqueue.cpp b/src/utils/workqueue.cpp new file mode 100644 index 00000000..b3d6a2f7 --- /dev/null +++ b/src/utils/workqueue.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include + +#include "workqueue.h" + +static char *thisprog; + +static char usage [] = +" \n\n" +; +static void +Usage(void) +{ + fprintf(stderr, "%s: usage:\n%s", thisprog, usage); + exit(1); +} + +static int op_flags; +#define OPT_MOINS 0x1 +#define OPT_s 0x2 +#define OPT_b 0x4 + +class Task { +public: + Task() + : m_id(o_id++) + {} + int m_id; + static int o_id; +}; +int Task::o_id; + +void *worker(void *vtp) +{ + fprintf(stderr, "Worker working\n"); + WorkQueue *tqp = (WorkQueue *)vtp; + Task tsk; + for (;;) { + if (!tqp->take(&tsk)) { + fprintf(stderr, "Worker: take failed\n"); + return (void*)0; + } + fprintf(stderr, "WORKER: got task %d\n", tsk.m_id); + if (tsk.m_id > 20) { + tqp->workerExit(); + break; + } + } + return (void*)1; +} + +int main(int argc, char **argv) +{ + int count = 10; + + thisprog = argv[0]; + argc--; argv++; + + while (argc > 0 && **argv == '-') { + (*argv)++; + if (!(**argv)) + /* Cas du "adb - core" */ + Usage(); + while (**argv) + switch (*(*argv)++) { + case 's': op_flags |= OPT_s; break; + case 'b': op_flags |= OPT_b; if (argc < 2) Usage(); + if ((sscanf(*(++argv), "%d", &count)) != 1) + Usage(); + argc--; + goto b1; + default: Usage(); break; + } + b1: argc--; argv++; + } + + if (argc != 0) + Usage(); + + WorkQueue wq(10); + + if (!wq.start(&worker, &wq)) { + fprintf(stderr, "Start failed\n"); + exit(1); + } + + for (;;) { + Task tsk; + fprintf(stderr, "BOSS: put task %d\n", tsk.m_id); + if (!wq.put(tsk)) { + fprintf(stderr, "Boss: put failed\n"); + exit(1); + } + if ((tsk.m_id % 10) == 0) + sleep(1); + } + exit(0); +} + diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h new file mode 100644 index 00000000..946ae513 --- /dev/null +++ b/src/utils/workqueue.h @@ -0,0 +1,195 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ +#ifndef _WORKQUEUE_H_INCLUDED_ +#define _WORKQUEUE_H_INCLUDED_ + +#include "pthread.h" +#include +#include +using std::queue; +using std::string; + +/** + * A WorkQueue manages the synchronisation around a queue of work items, + * where a single client thread queues tasks and a single worker takes + * and executes them. The goal is to introduce some level of + * parallelism between the successive steps of a previously single + * threaded pipe-line (data extraction / data preparation / index + * update). + * + * There is no individual task status return. In case of fatal error, + * the client or worker sets an end condition on the queue. A second + * queue could conceivably be used for returning individual task + * status. + */ +template class WorkQueue { +public: + WorkQueue(int hi = 0, int lo = 1) + : m_high(hi), m_low(lo), m_size(0), m_worker_up(false), + m_worker_waiting(false), m_jobcnt(0), m_lenacc(0) + { + m_ok = (pthread_cond_init(&m_cond, 0) == 0) && + (pthread_mutex_init(&m_mutex, 0) == 0); + } + + ~WorkQueue() + { + if (m_worker_up) + setTerminateAndWait(); + } + + /** Start the worker thread. The start_routine will loop + * taking and executing tasks. */ + bool start(void *(*start_routine)(void *), void *arg) + { + bool status = pthread_create(&m_worker_thread, 0, + start_routine, arg) == 0; + if (status) + m_worker_up = true; + return status; + } + + /** + * Add item to work queue. Sleep if there are already too many. + * Called from client. + */ + bool put(T t) + { + if (!ok() || pthread_mutex_lock(&m_mutex) != 0) + return false; + + while (ok() && m_high > 0 && m_queue.size() >= m_high) { + // Keep the order: we test ok() AFTER the sleep... + if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) { + pthread_mutex_unlock(&m_mutex); + return false; + } + } + + m_queue.push(t); + ++m_size; + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); + return true; + } + + /** Wait until the queue is empty and the worker is + * back waiting for task. Called from the client when it needs to + * perform work that couldn't be done in parallel with the + * worker's tasks. + */ + bool waitIdle() + { + if (!ok() || pthread_mutex_lock(&m_mutex) != 0) + return false; + + // We're done when the queue is empty AND the worker is back + // for a task (has finished the last) + while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) { + if (pthread_cond_wait(&m_cond, &m_mutex)) { + pthread_mutex_unlock(&m_mutex); + return false; + } + } + pthread_mutex_unlock(&m_mutex); + return ok(); + } + + /** Tell the worker to exit, and wait for it. There may still + be tasks on the queue. */ + void* setTerminateAndWait() + { + if (!m_worker_up) + return (void *)0; + + pthread_mutex_lock(&m_mutex); + m_ok = false; + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); + void *status; + pthread_join(m_worker_thread, &status); + m_worker_up = false; + return status; + } + + /** Remove task from queue. Sleep if there are not enough. Signal if we go + to sleep on empty queue: client may be waiting for our going idle */ + bool take(T* tp) + { + if (!ok() || pthread_mutex_lock(&m_mutex) != 0) + return false; + + while (ok() && m_queue.size() < m_low) { + m_worker_waiting = true; + if (m_queue.empty()) + pthread_cond_broadcast(&m_cond); + if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) { + pthread_mutex_unlock(&m_mutex); + m_worker_waiting = false; + return false; + } + m_worker_waiting = false; + } + + ++m_jobcnt; + m_lenacc += m_size; + + *tp = m_queue.front(); + m_queue.pop(); + --m_size; + + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); + return true; + } + + /** Take note of the worker exit. This would normally happen after an + unrecoverable error */ + void workerExit() + { + if (!ok() || pthread_mutex_lock(&m_mutex) != 0) + return; + m_ok = false; + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); + } + + /** Debug only: as the size is returned while the queue is unlocked, there + * is no warranty on its consistency. Not that we use the member size, not + * the container size() call which would need locking. + */ + size_t size() {return m_size;} + +private: + bool ok() {return m_ok && m_worker_up;} + + size_t m_high; + size_t m_low; + size_t m_size; + bool m_worker_up; + bool m_worker_waiting; + int m_jobcnt; + int m_lenacc; + + pthread_t m_worker_thread; + queue m_queue; + pthread_cond_t m_cond; + pthread_mutex_t m_mutex; + bool m_ok; +}; + +#endif /* _WORKQUEUE_H_INCLUDED_ */