mt indexing: do not pass copies of Xapian::Document between threads: the reference counting is not mt-safe. Replace with pointers

This commit is contained in:
Jean-Francois Dockes 2014-05-05 10:59:50 +02:00
parent e1b746bcfd
commit deb4cc8d12
2 changed files with 29 additions and 12 deletions

View File

@ -559,13 +559,18 @@ int Db::Native::getPageNumberForPosition(const vector<int>& pbreaks,
return it - pbreaks.begin() + 1;
}
// Note: we're passed a Xapian::Document* because Xapian can't do
// reference-counting properly. We take ownership and need to delete
// it before returning.
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& newdocument, size_t textlen)
Xapian::Document *newdocument_ptr,
size_t textlen)
{
#ifdef IDX_THREADS
Chrono chron;
PTMutexLocker lock(m_mutex);
#endif
RefCntr<Xapian::Document> doc_cleaner(newdocument_ptr);
// Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in
@ -590,8 +595,11 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
// Add db entry or update existing entry:
try {
Xapian::docid did =
xwdb.replace_document(uniterm, newdocument);
xwdb.replace_document(uniterm, *newdocument_ptr);
if (did < m_rcldb->updated.size()) {
// This is necessary because only the file-level docs are tested
// by needUpdate(), so the subdocs existence flags are only set
// here.
m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
@ -604,7 +612,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
ermsg.erase();
// FIXME: is this ever actually needed?
try {
xwdb.add_document(newdocument);
xwdb.add_document(*newdocument_ptr);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
@ -1245,7 +1253,12 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
if (m_ndb == 0)
return false;
Xapian::Document newdocument;
// This document is potentially going to be passed to the index
// update thread. The reference counters are not mt-safe, so we
// need to do this through a pointer. The reference is just there
// to avoid changing too much code (the previous version passed a copy).
Xapian::Document *newdocument_ptr = new Xapian::Document;
Xapian::Document &newdocument(*newdocument_ptr);
// The term processing pipeline:
TermProcIdx tpidx;
@ -1270,8 +1283,10 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
// first. This is so different from the normal processing that
// it uses a fully separate code path (with some duplication
// unfortunately)
if (!m_ndb->docToXdocXattrOnly(&splitter, udi, doc, newdocument))
if (!m_ndb->docToXdocXattrOnly(&splitter, udi, doc, newdocument)) {
delete newdocument_ptr;
return false;
}
} else {
// If the ipath is like a path, index the last element. This is
@ -1365,6 +1380,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
"count %d avglen %.4f sigma %.4f url [%s] ipath [%s] text %s\n",
v.count, v.avglen, v.sigma, doc.url.c_str(),
doc.ipath.c_str(), doc.text.c_str()));
delete newdocument_ptr;
return true;
}
#endif
@ -1551,9 +1567,10 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
DbUpdTask *tp = new DbUpdTask(DbUpdTask::AddOrUpdate, udi, uniterm,
newdocument, doc.text.length());
newdocument_ptr, doc.text.length());
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::addOrUpdate:Cant queue task\n"));
delete newdocument_ptr;
return false;
} else {
return true;
@ -1561,7 +1578,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
}
#endif
return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument,
return m_ndb->addOrUpdateWrite(udi, uniterm, newdocument_ptr,
doc.text.length());
}
@ -1959,7 +1976,7 @@ bool Db::purgeFile(const string &udi, bool *existed)
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
DbUpdTask *tp = new DbUpdTask(DbUpdTask::Delete, udi, uniterm,
Xapian::Document(), (size_t)-1);
0, (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::purgeFile:Cant queue task\n"));
return false;
@ -1986,7 +2003,7 @@ bool Db::purgeOrphans(const string &udi)
#ifdef IDX_THREADS
if (m_ndb->m_havewriteq) {
DbUpdTask *tp = new DbUpdTask(DbUpdTask::PurgeOrphans, udi, uniterm,
Xapian::Document(), (size_t)-1);
0, (size_t)-1);
if (!m_ndb->m_wqueue.put(tp)) {
LOGERR(("Db::purgeFile:Cant queue task\n"));
return false;

View File

@ -51,14 +51,14 @@ public:
// passed both just to avoid recomputing uniterm which is
// available on the caller site.
DbUpdTask(Op _op, const string& ud, const string& un,
const Xapian::Document &d, size_t tl)
Xapian::Document *d, size_t tl)
: op(_op), udi(ud), uniterm(un), doc(d), txtlen(tl)
{}
// Udi and uniterm equivalently designate the doc
Op op;
string udi;
string uniterm;
Xapian::Document doc;
Xapian::Document *doc;
// txtlen is used to update the flush interval. It's -1 for a
// purge because we actually don't know it, and the code fakes a
// text length based on the term count.
@ -101,7 +101,7 @@ class Db::Native {
// Final steps of doc update, part which need to be single-threaded
bool addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& doc, size_t txtlen);
Xapian::Document *doc, size_t txtlen);
/** Delete all documents which are contained in the input document,
* which must be a file-level one.