Fix problems which occurred when using functions like open-parents with multiple indexes containing identical paths (udis)

This commit is contained in:
Jean-Francois Dockes 2013-05-25 11:26:57 +02:00
parent 11e0ccaa4e
commit a1b7018cfd
10 changed files with 99 additions and 59 deletions

View File

@ -120,14 +120,12 @@ void FileInterner::reapXAttrs(const string& path)
// This is used when the user wants to retrieve a search result doc's parent // This is used when the user wants to retrieve a search result doc's parent
// (ie message having a given attachment) // (ie message having a given attachment)
bool FileInterner::getEnclosing(const string &url, const string &ipath, bool FileInterner::getEnclosingUDI(const Rcl::Doc &doc, string& udi)
string &eurl, string &eipath, string& udi)
{ {
eurl = url; LOGDEB(("FileInterner::getEnclosingUDI(): url [%s] ipath [%s]\n",
eipath = ipath; doc.url.c_str(), doc.ipath.c_str()));
string eipath = doc.ipath;
string::size_type colon; string::size_type colon;
LOGDEB(("FileInterner::getEnclosing(): url [%s] ipath [%s]\n",
url.c_str(), eipath.c_str()));
if (eipath.empty()) if (eipath.empty())
return false; return false;
if ((colon = eipath.find_last_of(cstr_isep)) != string::npos) { if ((colon = eipath.find_last_of(cstr_isep)) != string::npos) {
@ -135,10 +133,8 @@ bool FileInterner::getEnclosing(const string &url, const string &ipath,
} else { } else {
eipath.erase(); eipath.erase();
} }
make_udi(url_gpath(eurl), eipath, udi);
LOGDEB(("FileInterner::getEnclosing() after: [%s]\n", eipath.c_str())); make_udi(url_gpath(doc.idxurl.empty() ? doc.url : doc.idxurl), eipath, udi);
return true;
} }
string FileInterner::getLastIpathElt(const string& ipath) string FileInterner::getLastIpathElt(const string& ipath)

View File

@ -195,14 +195,14 @@ class FileInterner {
} }
/** /**
* Get immediate parent for document. * Get UDI for immediate parent for document.
* *
* This is not in general the same as the "parent" document used * This is not in general the same as the "parent" document used
* with Rcl::Db::addOrUpdate(). The latter is the enclosing file, * with Rcl::Db::addOrUpdate(). The latter is the enclosing file,
* this would be for exemple the email containing the attachment. * this would be for exemple the email containing the attachment.
* This is in internfile because of the ipath computation.
*/ */
static bool getEnclosing(const string &url, const string &ipath, static bool getEnclosingUDI(const Rcl::Doc &doc, string& udi);
string &eurl, string &eipath, string& udi);
/** Return last element in ipath, like basename */ /** Return last element in ipath, like basename */
static std::string getLastIpathElt(const std::string& ipath); static std::string getLastIpathElt(const std::string& ipath);

View File

@ -430,7 +430,8 @@ void RclMain::viewUrl()
(const char *)qurl.fragment().toLocal8Bit(), udi); (const char *)qurl.fragment().toLocal8Bit(), udi);
Rcl::Doc doc; Rcl::Doc doc;
if (!rcldb->getDoc(udi, doc) || doc.pc == -1) Rcl::Doc idxdoc; // idxdoc.idxi == 0 -> works with base index only
if (!rcldb->getDoc(udi, idxdoc, doc) || doc.pc == -1)
return; return;
// StartNativeViewer needs a db source to call getEnclosing() on. // StartNativeViewer needs a db source to call getEnclosing() on.

View File

@ -38,14 +38,16 @@ int DocSequence::getSeqSlice(int offs, int cnt, vector<ResListEntry>& result)
bool DocSequence::getEnclosing(Rcl::Doc& doc, Rcl::Doc& pdoc) bool DocSequence::getEnclosing(Rcl::Doc& doc, Rcl::Doc& pdoc)
{ {
// Note: no need for setQuery here, we're just passing through a Rcl::Db *db = getDb();
// query-independant request if (db == 0) {
LOGERR(("DocSequence::getEnclosing: no db\n"));
return false;
}
string udi; string udi;
if (!FileInterner::getEnclosing(doc.url, doc.ipath, pdoc.url, pdoc.ipath, if (!FileInterner::getEnclosingUDI(doc, udi))
udi))
return false; return false;
bool dbret = getDb()->getDoc(udi, pdoc);
bool dbret = db->getDoc(udi, doc, pdoc);
return dbret && pdoc.pc != -1; return dbret && pdoc.pc != -1;
} }

View File

@ -136,7 +136,10 @@ bool DocSequenceHistory::getDoc(int num, Rcl::Doc &doc, string *sh)
} else } else
sh->erase(); sh->erase();
} }
bool ret = m_db->getDoc(m_it->udi, doc);
// For now history does not store an index id. Use empty doc as ref.
Rcl::Doc idxdoc;
bool ret = m_db->getDoc(m_it->udi, idxdoc, doc);
if (!ret || doc.pc == -1) { if (!ret || doc.pc == -1) {
doc.url = "UNKNOWN"; doc.url = "UNKNOWN";
doc.ipath = ""; doc.ipath = "";

View File

@ -219,20 +219,25 @@ void Db::Native::maybeStartThreads()
/* See comment in class declaration: return all subdocuments of a /* See comment in class declaration: return all subdocuments of a
* document given by its unique id. * document given by its unique id.
*/ */
bool Db::Native::subDocs(const string &udi, vector<Xapian::docid>& docids) bool Db::Native::subDocs(const string &udi, int idxi,
vector<Xapian::docid>& docids)
{ {
LOGDEB2(("subDocs: [%s]\n", uniterm.c_str())); LOGDEB2(("subDocs: [%s]\n", uniterm.c_str()));
string pterm = make_parentterm(udi); string pterm = make_parentterm(udi);
vector<Xapian::docid> candidates;
XAPTRY(docids.clear(); XAPTRY(docids.clear();
docids.insert(docids.begin(), xrdb.postlist_begin(pterm), candidates.insert(candidates.begin(), xrdb.postlist_begin(pterm),
xrdb.postlist_end(pterm)), xrdb.postlist_end(pterm)),
xrdb, m_rcldb->m_reason); xrdb, m_rcldb->m_reason);
if (!m_rcldb->m_reason.empty()) { if (!m_rcldb->m_reason.empty()) {
LOGERR(("Rcl::Db::subDocs: %s\n", m_rcldb->m_reason.c_str())); LOGERR(("Rcl::Db::subDocs: %s\n", m_rcldb->m_reason.c_str()));
return false; return false;
} else { } else {
for (unsigned int i = 0; i < candidates.size(); i++) {
if (whatDbIdx(candidates[i]) == (size_t)idxi) {
docids.push_back(candidates[i]);
}
}
LOGDEB0(("Db::Native::subDocs: returning %d ids\n", docids.size())); LOGDEB0(("Db::Native::subDocs: returning %d ids\n", docids.size()));
return true; return true;
} }
@ -259,11 +264,11 @@ bool Db::Native::xdocToUdi(Xapian::Document& xdoc, string &udi)
} }
// Check if doc given by udi is indexed by term // Check if doc given by udi is indexed by term
bool Db::Native::hasTerm(const string& udi, const string& term) bool Db::Native::hasTerm(const string& udi, int idxi, const string& term)
{ {
LOGDEB2(("Native::hasTerm: udi [%s] term [%s]\n",udi.c_str(),term.c_str())); LOGDEB2(("Native::hasTerm: udi [%s] term [%s]\n",udi.c_str(),term.c_str()));
Xapian::Document xdoc; Xapian::Document xdoc;
if (getDoc(udi, xdoc)) { if (getDoc(udi, idxi, xdoc)) {
Xapian::TermIterator xit; Xapian::TermIterator xit;
XAPTRY(xit = xdoc.termlist_begin(); XAPTRY(xit = xdoc.termlist_begin();
xit.skip_to(term);, xit.skip_to(term);,
@ -279,20 +284,23 @@ bool Db::Native::hasTerm(const string& udi, const string& term)
return false; return false;
} }
// Retrieve Xapian document, given udi // Retrieve Xapian document, given udi. There may be several identical udis
Xapian::docid Db::Native::getDoc(const string& udi, Xapian::Document& xdoc) // if we are using multiple indexes.
Xapian::docid Db::Native::getDoc(const string& udi, int idxi,
Xapian::Document& xdoc)
{ {
string uniterm = make_uniterm(udi); string uniterm = make_uniterm(udi);
for (int tries = 0; tries < 2; tries++) { for (int tries = 0; tries < 2; tries++) {
try { try {
Xapian::PostingIterator docid = xrdb.postlist_begin(uniterm); Xapian::PostingIterator docid;
if (docid == xrdb.postlist_end(uniterm)) { for (docid = xrdb.postlist_begin(uniterm);
// Udi not in Db. docid != xrdb.postlist_end(uniterm); docid++) {
return 0;
} else {
xdoc = xrdb.get_document(*docid); xdoc = xrdb.get_document(*docid);
return *docid; if (whatDbIdx(*docid) == (size_t)idxi)
return *docid;
} }
// Udi not in Db.
return 0;
} catch (const Xapian::DatabaseModifiedError &e) { } catch (const Xapian::DatabaseModifiedError &e) {
m_rcldb->m_reason = e.get_msg(); m_rcldb->m_reason = e.get_msg();
xrdb.reopen(); xrdb.reopen();
@ -314,23 +322,27 @@ bool Db::Native::dbDataToRclDoc(Xapian::docid docid, std::string &data,
if (!parms.ok()) if (!parms.ok())
return false; return false;
// Set xdocid at once so that we can call whatDbIdx()
doc.xdocid = docid; doc.xdocid = docid;
doc.haspages = hasPages(docid); doc.haspages = hasPages(docid);
// Compute what index this comes from, and check for path translations // Compute what index this comes from, and check for path translations
string dbdir = m_rcldb->m_basedir; string dbdir = m_rcldb->m_basedir;
doc.idxi = 0;
if (!m_rcldb->m_extraDbs.empty()) { if (!m_rcldb->m_extraDbs.empty()) {
unsigned int idxi = m_rcldb->whatDbIdx(doc); unsigned int idxi = whatDbIdx(docid);
// idxi is in [0, extraDbs.size()]. 0 is for the main index, // idxi is in [0, extraDbs.size()]. 0 is for the main index,
// idxi-1 indexes into the additional dbs array. // idxi-1 indexes into the additional dbs array.
if (idxi) { if (idxi) {
dbdir = m_rcldb->m_extraDbs[idxi - 1]; dbdir = m_rcldb->m_extraDbs[idxi - 1];
doc.idxi = idxi;
} }
} }
parms.get(Doc::keyurl, doc.url); parms.get(Doc::keyurl, doc.idxurl);
doc.url = doc.idxurl;
m_rcldb->m_config->urlrewrite(dbdir, doc.url); m_rcldb->m_config->urlrewrite(dbdir, doc.url);
if (!doc.url.compare(doc.idxurl))
doc.idxurl.clear();
// Special cases: // Special cases:
parms.get(Doc::keytp, doc.mimetype); parms.get(Doc::keytp, doc.mimetype);
@ -549,7 +561,7 @@ bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi,
xwdb.delete_document(*docid); xwdb.delete_document(*docid);
} }
vector<Xapian::docid> docids; vector<Xapian::docid> docids;
subDocs(udi, docids); subDocs(udi, 0, docids);
LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size())); LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size()));
for (vector<Xapian::docid>::iterator it = docids.begin(); for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) { it != docids.end(); it++) {
@ -864,14 +876,19 @@ bool Db::rmQueryDb(const string &dir)
// modulo of the docid against the db count. Ref: // modulo of the docid against the db count. Ref:
// http://trac.xapian.org/wiki/FAQ/MultiDatabaseDocumentID // http://trac.xapian.org/wiki/FAQ/MultiDatabaseDocumentID
size_t Db::whatDbIdx(const Doc& doc) size_t Db::whatDbIdx(const Doc& doc)
{
return m_ndb->whatDbIdx(doc.xdocid);
}
size_t Db::Native::whatDbIdx(Xapian::docid id)
{ {
LOGDEB1(("Db::whatDbIdx: xdocid %lu, %u extraDbs\n", LOGDEB1(("Db::whatDbIdx: xdocid %lu, %u extraDbs\n",
(unsigned long)doc.xdocid, m_extraDbs.size())); (unsigned long)id, m_extraDbs.size()));
if (doc.xdocid == 0) if (id == 0)
return (size_t)-1; return (size_t)-1;
if (m_extraDbs.size() == 0) if (m_rcldb->m_extraDbs.size() == 0)
return 0; return 0;
return (doc.xdocid - 1) % (m_extraDbs.size() + 1); return (id - 1) % (m_rcldb->m_extraDbs.size() + 1);
} }
bool Db::testDbDir(const string &dir, bool *stripped_p) bool Db::testDbDir(const string &dir, bool *stripped_p)
@ -1556,7 +1573,7 @@ bool Db::needUpdate(const string &udi, const string& sig, bool *existed)
// Set the existence flag for all the subdocs (if any) // Set the existence flag for all the subdocs (if any)
vector<Xapian::docid> docids; vector<Xapian::docid> docids;
if (!m_ndb->subDocs(udi, docids)) { if (!m_ndb->subDocs(udi, 0, docids)) {
LOGERR(("Rcl::Db::needUpdate: can't get subdocs\n")); LOGERR(("Rcl::Db::needUpdate: can't get subdocs\n"));
return true; return true;
} }
@ -1808,7 +1825,7 @@ bool Db::dbStats(DbStats& res)
// by the GUI history feature and by open parent/getenclosing // by the GUI history feature and by open parent/getenclosing
// ! The return value is always true except for fatal errors. Document // ! The return value is always true except for fatal errors. Document
// existence should be tested by looking at doc.pc // existence should be tested by looking at doc.pc
bool Db::getDoc(const string &udi, Doc &doc) bool Db::getDoc(const string &udi, const Doc& idxdoc, Doc &doc)
{ {
LOGDEB(("Db:getDoc: [%s]\n", udi.c_str())); LOGDEB(("Db:getDoc: [%s]\n", udi.c_str()));
if (m_ndb == 0) if (m_ndb == 0)
@ -1820,7 +1837,8 @@ bool Db::getDoc(const string &udi, Doc &doc)
doc.pc = 100; doc.pc = 100;
Xapian::Document xdoc; Xapian::Document xdoc;
Xapian::docid docid; Xapian::docid docid;
if ((docid = m_ndb->getDoc(udi, xdoc))) { int idxi = idxdoc.idxi;
if ((docid = m_ndb->getDoc(udi, idxi, xdoc))) {
string data = xdoc.get_data(); string data = xdoc.get_data();
doc.meta[Rcl::Doc::keyudi] = udi; doc.meta[Rcl::Doc::keyudi] = udi;
return m_ndb->dbDataToRclDoc(docid, data, doc); return m_ndb->dbDataToRclDoc(docid, data, doc);
@ -1845,7 +1863,7 @@ bool Db::hasSubDocs(const Doc &idoc)
return false; return false;
} }
vector<Xapian::docid> docids; vector<Xapian::docid> docids;
if (!m_ndb->subDocs(inudi, docids)) { if (!m_ndb->subDocs(inudi, idoc.idxi, docids)) {
LOGDEB(("Db:getSubDocs: lower level subdocs failed\n")); LOGDEB(("Db:getSubDocs: lower level subdocs failed\n"));
return false; return false;
} }
@ -1853,7 +1871,7 @@ bool Db::hasSubDocs(const Doc &idoc)
return true; return true;
// Check if doc has an has_children term // Check if doc has an has_children term
if (m_ndb->hasTerm(inudi, has_children_term)) if (m_ndb->hasTerm(inudi, idoc.idxi, has_children_term))
return true; return true;
return false; return false;
} }
@ -1879,7 +1897,7 @@ bool Db::getSubDocs(const Doc &idoc, vector<Doc>& subdocs)
} else { } else {
// See if we have a parent term // See if we have a parent term
Xapian::Document xdoc; Xapian::Document xdoc;
if (!m_ndb->getDoc(inudi, xdoc)) { if (!m_ndb->getDoc(inudi, idoc.idxi, xdoc)) {
LOGERR(("Db::getSubDocs: can't get Xapian document\n")); LOGERR(("Db::getSubDocs: can't get Xapian document\n"));
return false; return false;
} }
@ -1902,7 +1920,7 @@ bool Db::getSubDocs(const Doc &idoc, vector<Doc>& subdocs)
// Retrieve all subdoc xapian ids for the root // Retrieve all subdoc xapian ids for the root
vector<Xapian::docid> docids; vector<Xapian::docid> docids;
if (!m_ndb->subDocs(rootudi, docids)) { if (!m_ndb->subDocs(rootudi, idoc.idxi, docids)) {
LOGDEB(("Db:getSubDocs: lower level subdocs failed\n")); LOGDEB(("Db:getSubDocs: lower level subdocs failed\n"));
return false; return false;
} }

View File

@ -228,9 +228,12 @@ class Db {
/* Update-related methods ******************************************/ /* Update-related methods ******************************************/
/** Test if the db entry for the given udi is up to date (by /** Test if the db entry for the given udi is up to date (by
* comparing the input and stored sigs). * comparing the input and stored sigs). This is used both when
* Side-effect: set the existence flag for the file document * indexing and querying (before opening a document using stale info),
* and all subdocs if any (for later use by 'purge()') * **This assumes that the udi pertains to the main index (idxi==0).**
* Side-effect when the db is writeable: set the existence flag
* for the file document and all subdocs if any (for later use by
* 'purge()')
*/ */
bool needUpdate(const string &udi, const string& sig, bool *existed=0); bool needUpdate(const string &udi, const string& sig, bool *existed=0);
@ -355,8 +358,13 @@ class Db {
/** Get document for given udi /** Get document for given udi
* *
* Used by the 'history' feature, and to retrieve ancestor documents. * Used by the 'history' feature, and to retrieve ancestor documents.
* @param udi the unique document identifier
* @param idxdoc used when there are several index as an opaque way to pass
* the index id. Use a doc from the same index
* (e.g.: when looking for parent),
* @param doc the output doc
*/ */
bool getDoc(const string &udi, Doc &doc); bool getDoc(const string &udi, const Doc& idxdoc, Doc &doc);
/** Test if documents has sub-documents. /** Test if documents has sub-documents.
* *

View File

@ -120,12 +120,14 @@ class Db::Native {
bool dbDataToRclDoc(Xapian::docid docid, std::string &data, Doc &doc); bool dbDataToRclDoc(Xapian::docid docid, std::string &data, Doc &doc);
size_t whatDbIdx(Xapian::docid id);
/** Retrieve Xapian::docid, given unique document identifier, /** Retrieve Xapian::docid, given unique document identifier,
* using the posting list for the derived term. * using the posting list for the derived term.
* *
* @return 0 if not found * @return 0 if not found
*/ */
Xapian::docid getDoc(const string& udi, Xapian::Document& xdoc); Xapian::docid getDoc(const string& udi, int idxi, Xapian::Document& xdoc);
/** Retrieve unique document identifier for given Xapian document, /** Retrieve unique document identifier for given Xapian document,
* using the document termlist * using the document termlist
@ -133,7 +135,7 @@ class Db::Native {
bool xdocToUdi(Xapian::Document& xdoc, string &udi); bool xdocToUdi(Xapian::Document& xdoc, string &udi);
/** Check if doc is indexed by term */ /** Check if doc is indexed by term */
bool hasTerm(const string& udi, const string& term); bool hasTerm(const string& udi, int idxi, const string& term);
/** Compute list of subdocuments for a given udi. We look for documents /** Compute list of subdocuments for a given udi. We look for documents
* indexed by a parent term matching the udi, the posting list for the * indexed by a parent term matching the udi, the posting list for the
@ -149,7 +151,7 @@ class Db::Native {
* indexer (rcldb user), using the ipath. * indexer (rcldb user), using the ipath.
* *
*/ */
bool subDocs(const string &udi, vector<Xapian::docid>& docids); bool subDocs(const string &udi, int idxi, vector<Xapian::docid>& docids);
/** Check if a page position list is defined */ /** Check if a page position list is defined */
bool hasPages(Xapian::docid id); bool hasPages(Xapian::docid id);

View File

@ -48,6 +48,7 @@ namespace Rcl {
void Doc::dump(bool dotext) const void Doc::dump(bool dotext) const
{ {
LOGDEB(("Rcl::Doc::dump: url: [%s]\n", url.c_str())); LOGDEB(("Rcl::Doc::dump: url: [%s]\n", url.c_str()));
LOGDEB(("Rcl::Doc::dump: idxurl: [%s]\n", idxurl.c_str()));
LOGDEB(("Rcl::Doc::dump: ipath: [%s]\n", ipath.c_str())); LOGDEB(("Rcl::Doc::dump: ipath: [%s]\n", ipath.c_str()));
LOGDEB(("Rcl::Doc::dump: mimetype: [%s]\n", mimetype.c_str())); LOGDEB(("Rcl::Doc::dump: mimetype: [%s]\n", mimetype.c_str()));
LOGDEB(("Rcl::Doc::dump: fmtime: [%s]\n", fmtime.c_str())); LOGDEB(("Rcl::Doc::dump: fmtime: [%s]\n", fmtime.c_str()));

View File

@ -48,6 +48,12 @@ class Doc {
// Query: from doc data. // Query: from doc data.
string url; string url;
// When we do path translation for documents from external indexes, we
// save the original path:
string idxurl;
// And the originating db. 0 is base, 1 first external etc.
int idxi;
// Internal path for multi-doc files. Ascii // Internal path for multi-doc files. Ascii
// Set by FsIndexer::processone // Set by FsIndexer::processone
string ipath; string ipath;
@ -142,13 +148,16 @@ class Doc {
sig.erase(); sig.erase();
text.erase(); text.erase();
pc = 0; pc = 0;
xdocid = 0; xdocid = 0;
idxi = 0;
haspages = false; haspages = false;
haschildren = false; haschildren = false;
} }
Doc() Doc()
: syntabs(false), pc(0), xdocid(0), haspages(false), haschildren(false) : idxi(0), syntabs(false), pc(0), xdocid(0),
haspages(false), haschildren(false)
{ {
} }
/** Get value for named field. If value pointer is 0, just test existence */ /** Get value for named field. If value pointer is 0, just test existence */