Experimented with multithreading the indexing pipeline. Left undef'd as 15%-30% improvement of indexing time does not seem worth the complexity

This commit is contained in:
Jean-Francois Dockes 2012-02-21 17:09:02 +01:00
parent 26914a823c
commit 9bc2fc8958
8 changed files with 487 additions and 9 deletions

View File

@ -66,10 +66,23 @@ using namespace std;
FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
: m_config(cnf), m_db(db), m_updater(updfunc), m_missing(new FIMissingStore)
#ifdef IDX_THREADS
, m_wqueue(10)
#endif // IDX_THREADS
{
m_havelocalfields = m_config->hasNameAnywhere("localfields");
#ifdef IDX_THREADS
if (!m_wqueue.start(FsIndexerIndexWorker, this)) {
LOGERR(("FsIndexer::FsIndexer: worker start failed\n"));
return;
}
#endif // IDX_THREADS
}
FsIndexer::~FsIndexer() {
#ifdef IDX_THREADS
void *status = m_wqueue.setTerminateAndWait();
LOGERR(("FsIndexer: worker status: %ld\n", long(status)));
#endif // IDX_THREADS
delete m_missing;
}
@ -127,6 +140,9 @@ bool FsIndexer::index()
}
}
#ifdef IDX_THREADS
m_wqueue.waitIdle();
#endif // IDX_THREADS
string missing;
FileInterner::getMissingDescription(m_missing, missing);
if (!missing.empty()) {
@ -289,6 +305,28 @@ void FsIndexer::makesig(const struct stat *stp, string& out)
out = cbuf;
}
#ifdef IDX_THREADS
void *FsIndexerIndexWorker(void * fsp)
{
FsIndexer *fip = (FsIndexer*)fsp;
WorkQueue<IndexingTask*> *tqp = &fip->m_wqueue;
IndexingTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("FsIndexerIndexWorker: got task, ql %d\n", int(tqp->size())));
if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) {
tqp->setTerminateAndWait();
tqp->workerExit();
return (void*)0;
}
delete tsk;
}
}
#endif // IDX_THREADS
/// This method gets called for every file and directory found by the
/// tree walker.
///
@ -443,8 +481,16 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
// of the file document.
string udi;
make_udi(fn, doc.ipath, udi);
#ifdef IDX_THREADS
IndexingTask *tp = new IndexingTask(udi, doc.ipath.empty() ?
cstr_null : parent_udi, doc);
if (!m_wqueue.put(tp))
return FsTreeWalker::FtwError;
#else
if (!m_db->addOrUpdate(udi, doc.ipath.empty() ? cstr_null : parent_udi, doc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
// Tell what we are doing and check for interrupt request
if (m_updater) {
@ -476,8 +522,14 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
fileDoc.fbytes = cbuf;
// Document signature for up to date checks.
makesig(stp, fileDoc.sig);
#ifdef IDX_THREADS
IndexingTask *tp = new IndexingTask(parent_udi, cstr_null, fileDoc);
if (!m_wqueue.put(tp))
return FsTreeWalker::FtwError;
#else
if (!m_db->addOrUpdate(parent_udi, cstr_null, fileDoc))
return FsTreeWalker::FtwError;
#endif // IDX_THREADS
}
return FsTreeWalker::FtwOk;

View File

@ -24,11 +24,27 @@ using std::list;
#include "indexer.h"
#include "fstreewalk.h"
#ifdef IDX_THREADS
#include "workqueue.h"
#endif // IDX_THREADS
class DbIxStatusUpdater;
class FIMissingStore;
struct stat;
#ifdef IDX_THREADS
class IndexingTask {
public:
IndexingTask(const string& u, const string& p, const Rcl::Doc& d)
:udi(u), parent_udi(p), doc(d)
{}
string udi;
string parent_udi;
Rcl::Doc doc;
};
extern void *FsIndexerIndexWorker(void*);
#endif // IDX_THREADS
/** Index selected parts of the file system
Tree indexing: we inherits FsTreeWalkerCB so that, the processone()
@ -89,6 +105,11 @@ class FsIndexer : public FsTreeWalkerCB {
bool m_havelocalfields;
map<string, string> m_localfields;
#ifdef IDX_THREADS
friend void *FsIndexerIndexWorker(void*);
WorkQueue<IndexingTask*> m_wqueue;
#endif // IDX_THREADS
bool init();
void localfieldsfromconf();
void setlocalfields(Rcl::Doc& doc);

View File

@ -34,6 +34,7 @@ using namespace std;
#include "xapian.h"
#include "rclconfig.h"
#include "debuglog.h"
#include "rcldb.h"
#include "rcldb_p.h"
#include "stemdb.h"
@ -41,7 +42,6 @@ using namespace std;
#include "transcode.h"
#include "unacpp.h"
#include "conftree.h"
#include "debuglog.h"
#include "pathut.h"
#include "smallut.h"
#include "utf8iter.h"
@ -571,6 +571,12 @@ Db::Db(RclConfig *cfp)
m_config->getConfParam("maxfsoccuppc", &m_maxFsOccupPc);
m_config->getConfParam("idxflushmb", &m_flushMb);
}
#ifdef IDX_THREADS
if (m_ndb && !m_ndb->m_wqueue.start(DbUpdWorker, this)) {
LOGERR(("Db::Db: Worker start failed\n"));
return;
}
#endif // IDX_THREADS
}
Db::~Db()
@ -862,7 +868,6 @@ bool Db::fieldToTraits(const string& fld, const FieldTraits **ftpp)
// fields and position jumps to separate fields
class TextSplitDb : public TextSplitP {
public:
Xapian::WritableDatabase db;
Xapian::Document &doc; // Xapian document
// Base for document section. Gets large increment when we change
// sections, to avoid cross-section proximity matches.
@ -875,10 +880,9 @@ class TextSplitDb : public TextSplitP {
// to compute the first position of the next section.
Xapian::termpos curpos;
TextSplitDb(Xapian::WritableDatabase idb,
Xapian::Document &d, TermProc *prc)
TextSplitDb(Xapian::Document &d, TermProc *prc)
: TextSplitP(prc),
db(idb), doc(d), basepos(1), curpos(0), wdfinc(1)
doc(d), basepos(1), curpos(0), wdfinc(1)
{}
// Reimplement text_to_words to add start and end special terms
virtual bool text_to_words(const string &in);
@ -1003,6 +1007,58 @@ static const string cstr_nc("\n\r\x0c");
#define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";}
#ifdef IDX_THREADS
void *DbUpdWorker(void* vdbp)
{
Db *dbp = (Db *)vdbp;
WorkQueue<DbUpdTask*> *tqp = &(dbp->m_ndb->m_wqueue);
DbUpdTask *tsk;
for (;;) {
if (!tqp->take(&tsk)) {
tqp->workerExit();
return (void*)1;
}
LOGDEB(("DbUpdWorker: got task, ql %d\n", int(tqp->size())));
const char *fnc = tsk->udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
dbp->m_ndb->xwdb.replace_document(tsk->uniterm,
tsk->doc);
if (did < dbp->updated.size()) {
dbp->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
}
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
ermsg.erase();
// FIXME: is this ever actually needed?
try {
dbp->m_ndb->xwdb.add_document(tsk->doc);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
tqp->workerExit();
return (void*)0;
}
}
dbp->maybeflush(tsk->txtlen);
delete tsk;
}
}
#endif // IDX_THREADS
// Add document in internal form to the database: index the terms in
// the title abstract and body and add special terms for file name,
// date, mime type etc. , create the document data record (more
@ -1039,7 +1095,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
// TermProcCommongrams tpcommon(nxt, m_stops); nxt = &tpcommon;
TermProcPrep tpprep(nxt); nxt = &tpprep;
TextSplitDb splitter(m_ndb->xwdb, newdocument, nxt);
TextSplitDb splitter(newdocument, nxt);
tpidx.setTSD(&splitter);
// Split and index file name as document term(s)
@ -1271,6 +1327,13 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
LOGDEB0(("Rcl::Db::add: new doc record:\n%s\n", record.c_str()));
newdocument.set_data(record);
#ifdef IDX_THREADS
DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length());
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
return false;
}
#else
const char *fnc = udi.c_str();
string ermsg;
@ -1303,6 +1366,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi,
// Test if we're over the flush threshold (limit memory usage):
maybeflush(doc.text.length());
#endif // IDX_THREADS
return true;
}
@ -1455,6 +1519,10 @@ bool Db::purge()
if (m_ndb->m_isopen == false || m_ndb->m_iswritable == false)
return false;
#ifdef IDX_THREADS
m_ndb->m_wqueue.waitIdle();
#endif // IDX_THREADS
// For xapian versions up to 1.0.1, deleting a non-existant
// document would trigger an exception that would discard any
// pending update. This could lose both previous added documents
@ -1520,6 +1588,11 @@ bool Db::purgeFile(const string &udi, bool *existed)
LOGDEB(("Db:purgeFile: [%s]\n", udi.c_str()));
if (m_ndb == 0 || !m_ndb->m_iswritable)
return false;
#ifdef IDX_THREADS
m_ndb->m_wqueue.waitIdle();
#endif // IDX_THREADS
Xapian::WritableDatabase db = m_ndb->xwdb;
string uniterm = make_uniterm(udi);
string ermsg;

View File

@ -83,6 +83,9 @@ public:
double dbavgdoclen;
};
#ifdef IDX_THREADS
extern void *DbUpdWorker(void*);
#endif // IDX_THREADS
/**
* Wrapper class for the native database.
*/
@ -91,6 +94,9 @@ class Db {
// A place for things we don't want visible here.
class Native;
friend class Native;
#ifdef IDX_THREADS
friend void *DbUpdWorker(void*);
#endif // IDX_THREADS
/* General stuff (valid for query or update) ****************************/
Db(RclConfig *cfp);

View File

@ -18,6 +18,9 @@
#ifndef _rcldb_p_h_included_
#define _rcldb_p_h_included_
#ifdef IDX_THREADS
#include "workqueue.h"
#endif // IDX_THREADS
#include "xapian.h"
namespace Rcl {
@ -63,6 +66,20 @@ enum value_slot {
class Query;
#ifdef IDX_THREADS
class DbUpdTask {
public:
DbUpdTask(const string& ud, const string& un, const Xapian::Document &d,
size_t tl)
: udi(ud), uniterm(un), doc(d), txtlen(tl)
{}
string udi;
string uniterm;
Xapian::Document doc;
size_t txtlen;
};
#endif // IDX_THREADS
// A class for data and methods that would have to expose
// Xapian-specific stuff if they were in Rcl::Db. There could actually be
// 2 different ones for indexing or query as there is not much in
@ -73,6 +90,9 @@ class Db::Native {
bool m_isopen;
bool m_iswritable;
bool m_noversionwrite; //Set if open failed because of version mismatch!
#ifdef IDX_THREADS
WorkQueue<DbUpdTask*> m_wqueue;
#endif // IDX_THREADS
// Indexing
Xapian::WritableDatabase xwdb;
@ -86,9 +106,18 @@ class Db::Native {
Native(Db *db)
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue(10)
#endif // IDX_THREADS
{ }
~Native() {
~Native() {
#ifdef IDX_THREADS
if (m_iswritable) {
void *status = m_wqueue.setTerminateAndWait();
LOGDEB(("Native: worker status %ld\n", long(status)));
}
#endif // IDX_THREADS
}
vector<string> makeAbstract(Xapian::docid id, Query *query);

View File

@ -25,15 +25,15 @@
#include "xapian.h"
#include "cstr.h"
#include "rclconfig.h"
#include "debuglog.h"
#include "rcldb.h"
#include "rcldb_p.h"
#include "rclquery.h"
#include "rclquery_p.h"
#include "debuglog.h"
#include "conftree.h"
#include "smallut.h"
#include "searchdata.h"
#include "rclconfig.h"
#include "unacpp.h"
#ifndef NO_NAMESPACES

102
src/utils/workqueue.cpp Normal file
View File

@ -0,0 +1,102 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "workqueue.h"
static char *thisprog;
static char usage [] =
" \n\n"
;
static void
Usage(void)
{
fprintf(stderr, "%s: usage:\n%s", thisprog, usage);
exit(1);
}
static int op_flags;
#define OPT_MOINS 0x1
#define OPT_s 0x2
#define OPT_b 0x4
class Task {
public:
Task()
: m_id(o_id++)
{}
int m_id;
static int o_id;
};
int Task::o_id;
void *worker(void *vtp)
{
fprintf(stderr, "Worker working\n");
WorkQueue<Task> *tqp = (WorkQueue<Task> *)vtp;
Task tsk;
for (;;) {
if (!tqp->take(&tsk)) {
fprintf(stderr, "Worker: take failed\n");
return (void*)0;
}
fprintf(stderr, "WORKER: got task %d\n", tsk.m_id);
if (tsk.m_id > 20) {
tqp->workerExit();
break;
}
}
return (void*)1;
}
int main(int argc, char **argv)
{
int count = 10;
thisprog = argv[0];
argc--; argv++;
while (argc > 0 && **argv == '-') {
(*argv)++;
if (!(**argv))
/* Cas du "adb - core" */
Usage();
while (**argv)
switch (*(*argv)++) {
case 's': op_flags |= OPT_s; break;
case 'b': op_flags |= OPT_b; if (argc < 2) Usage();
if ((sscanf(*(++argv), "%d", &count)) != 1)
Usage();
argc--;
goto b1;
default: Usage(); break;
}
b1: argc--; argv++;
}
if (argc != 0)
Usage();
WorkQueue<Task> wq(10);
if (!wq.start(&worker, &wq)) {
fprintf(stderr, "Start failed\n");
exit(1);
}
for (;;) {
Task tsk;
fprintf(stderr, "BOSS: put task %d\n", tsk.m_id);
if (!wq.put(tsk)) {
fprintf(stderr, "Boss: put failed\n");
exit(1);
}
if ((tsk.m_id % 10) == 0)
sleep(1);
}
exit(0);
}

195
src/utils/workqueue.h Normal file
View File

@ -0,0 +1,195 @@
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the
* Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
#include "pthread.h"
#include <string>
#include <queue>
using std::queue;
using std::string;
/**
* A WorkQueue manages the synchronisation around a queue of work items,
* where a single client thread queues tasks and a single worker takes
* and executes them. The goal is to introduce some level of
* parallelism between the successive steps of a previously single
* threaded pipe-line (data extraction / data preparation / index
* update).
*
* There is no individual task status return. In case of fatal error,
* the client or worker sets an end condition on the queue. A second
* queue could conceivably be used for returning individual task
* status.
*/
template <class T> class WorkQueue {
public:
WorkQueue(int hi = 0, int lo = 1)
: m_high(hi), m_low(lo), m_size(0), m_worker_up(false),
m_worker_waiting(false), m_jobcnt(0), m_lenacc(0)
{
m_ok = (pthread_cond_init(&m_cond, 0) == 0) &&
(pthread_mutex_init(&m_mutex, 0) == 0);
}
~WorkQueue()
{
if (m_worker_up)
setTerminateAndWait();
}
/** Start the worker thread. The start_routine will loop
* taking and executing tasks. */
bool start(void *(*start_routine)(void *), void *arg)
{
bool status = pthread_create(&m_worker_thread, 0,
start_routine, arg) == 0;
if (status)
m_worker_up = true;
return status;
}
/**
* Add item to work queue. Sleep if there are already too many.
* Called from client.
*/
bool put(T t)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false;
while (ok() && m_high > 0 && m_queue.size() >= m_high) {
// Keep the order: we test ok() AFTER the sleep...
if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
pthread_mutex_unlock(&m_mutex);
return false;
}
}
m_queue.push(t);
++m_size;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
return true;
}
/** Wait until the queue is empty and the worker is
* back waiting for task. Called from the client when it needs to
* perform work that couldn't be done in parallel with the
* worker's tasks.
*/
bool waitIdle()
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false;
// We're done when the queue is empty AND the worker is back
// for a task (has finished the last)
while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) {
if (pthread_cond_wait(&m_cond, &m_mutex)) {
pthread_mutex_unlock(&m_mutex);
return false;
}
}
pthread_mutex_unlock(&m_mutex);
return ok();
}
/** Tell the worker to exit, and wait for it. There may still
be tasks on the queue. */
void* setTerminateAndWait()
{
if (!m_worker_up)
return (void *)0;
pthread_mutex_lock(&m_mutex);
m_ok = false;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
void *status;
pthread_join(m_worker_thread, &status);
m_worker_up = false;
return status;
}
/** Remove task from queue. Sleep if there are not enough. Signal if we go
to sleep on empty queue: client may be waiting for our going idle */
bool take(T* tp)
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return false;
while (ok() && m_queue.size() < m_low) {
m_worker_waiting = true;
if (m_queue.empty())
pthread_cond_broadcast(&m_cond);
if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
pthread_mutex_unlock(&m_mutex);
m_worker_waiting = false;
return false;
}
m_worker_waiting = false;
}
++m_jobcnt;
m_lenacc += m_size;
*tp = m_queue.front();
m_queue.pop();
--m_size;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
return true;
}
/** Take note of the worker exit. This would normally happen after an
unrecoverable error */
void workerExit()
{
if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
return;
m_ok = false;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}
/** Debug only: as the size is returned while the queue is unlocked, there
* is no warranty on its consistency. Not that we use the member size, not
* the container size() call which would need locking.
*/
size_t size() {return m_size;}
private:
bool ok() {return m_ok && m_worker_up;}
size_t m_high;
size_t m_low;
size_t m_size;
bool m_worker_up;
bool m_worker_waiting;
int m_jobcnt;
int m_lenacc;
pthread_t m_worker_thread;
queue<T> m_queue;
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;
bool m_ok;
};
#endif /* _WORKQUEUE_H_INCLUDED_ */