code moved around for clarity, no real changes

This commit is contained in:
Jean-Francois Dockes 2013-04-23 15:59:06 +02:00
parent fc4adfdbaa
commit 9aad81c387
3 changed files with 202 additions and 198 deletions

View File

@ -88,6 +88,8 @@ static const string unsplitfilename_prefix = "XSFS";
// Empty string md5s
static const string cstr_md5empty("d41d8cd98f00b204e9800998ecf8427e");
static const int MB = 1024 * 1024;
string version_string(){
return string("Recoll ") + string(rclversionstr) + string(" + Xapian ") +
string(Xapian::version_string());
@ -122,7 +124,8 @@ Db::Native::Native(Db *db)
: m_rcldb(db), m_isopen(false), m_iswritable(false),
m_noversionwrite(false)
#ifdef IDX_THREADS
, m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
, m_wqueue("DbUpd",
m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first),
m_loglevel(4),
m_totalworkns(0LL), m_havewriteq(false)
#endif // IDX_THREADS
@ -352,6 +355,142 @@ int Db::Native::getPageNumberForPosition(const vector<int>& pbreaks,
return it - pbreaks.begin() + 1;
}
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& newdocument, size_t textlen)
{
#ifdef IDX_THREADS
Chrono chron;
// In the case where there is a separate (single) db update
// thread, we only need to protect the update map update below
// (against interaction with threads calling needUpdate()). Else,
// all threads from above need to synchronize here
PTMutexLocker lock(m_mutex, m_havewriteq);
#endif
// Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in
// the single-threaded section.
if (m_rcldb->m_maxFsOccupPc > 0 &&
(m_rcldb->m_occFirstCheck ||
(m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
LOGDEB(("Db::add: checking file system usage\n"));
int pc;
m_rcldb->m_occFirstCheck = 0;
if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
LOGERR(("Db::add: stop indexing: file system "
"%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
return false;
}
m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
}
const char *fnc = udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
xwdb.replace_document(uniterm, newdocument);
#ifdef IDX_THREADS
// Need to protect against interaction with the up-to-date checks
// which also update the existence map
PTMutexLocker lock(m_mutex, !m_havewriteq);
#endif
if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
}
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
ermsg.erase();
// FIXME: is this ever actually needed?
try {
xwdb.add_document(newdocument);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
return false;
}
}
// Test if we're over the flush threshold (limit memory usage):
bool ret = m_rcldb->maybeflush(textlen);
#ifdef IDX_THREADS
m_totalworkns += chron.nanos();
#endif
return ret;
}
bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi,
const string& uniterm)
{
#if defined(IDX_THREADS)
// We need a mutex even if we have a write queue (so we can only
// be called by a single thread) to protect about multiple acces
// to xrdb from subDocs() which is also called from needupdate()
// (called from outside the write thread !
PTMutexLocker lock(m_mutex);
#endif // IDX_THREADS
string ermsg;
try {
Xapian::PostingIterator docid = xwdb.postlist_begin(uniterm);
if (docid == xwdb.postlist_end(uniterm)) {
return true;
}
if (m_rcldb->m_flushMb > 0) {
Xapian::termcount trms = xwdb.get_doclength(*docid);
m_rcldb->maybeflush(trms * 5);
}
string sig;
if (orphansOnly) {
Xapian::Document doc = xwdb.get_document(*docid);
sig = doc.get_value(VALUE_SIG);
if (sig.empty()) {
LOGINFO(("purgeFileWrite: got empty sig\n"));
return false;
}
} else {
LOGDEB(("purgeFile: delete docid %d\n", *docid));
xwdb.delete_document(*docid);
}
vector<Xapian::docid> docids;
subDocs(udi, docids);
LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size()));
for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) {
if (m_rcldb->m_flushMb > 0) {
Xapian::termcount trms = xwdb.get_doclength(*it);
m_rcldb->maybeflush(trms * 5);
}
string subdocsig;
if (orphansOnly) {
Xapian::Document doc = xwdb.get_document(*it);
subdocsig = doc.get_value(VALUE_SIG);
if (subdocsig.empty()) {
LOGINFO(("purgeFileWrite: got empty sig for subdoc??\n"));
continue;
}
}
if (!orphansOnly || sig != subdocsig) {
LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it));
xwdb.delete_document(*it);
}
}
return true;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::purgeFileWrite: %s\n", ermsg.c_str()));
}
return false;
}
/* Rcl::Db methods ///////////////////////////////// */
@ -515,7 +654,8 @@ bool Db::i_close(bool final)
bool w = m_ndb->m_iswritable;
if (w) {
if (!m_ndb->m_noversionwrite)
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY,
cstr_RCL_IDX_VERSION);
LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n"));
#ifdef IDX_THREADS
waitUpdIdle();
@ -699,8 +839,45 @@ class TextSplitDb : public TextSplitP {
: TextSplitP(prc),
doc(d), basepos(1), curpos(0), wdfinc(1)
{}
// Reimplement text_to_words to add start and end special terms
virtual bool text_to_words(const string &in);
// Reimplement text_to_words to insert the begin and end anchor terms.
virtual bool text_to_words(const string &in)
{
bool ret = false;
string ermsg;
try {
// Index the possibly prefixed start term.
doc.add_posting(prefix + start_of_field_term, basepos, wdfinc);
++basepos;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db: xapian add_posting error %s\n", ermsg.c_str()));
goto out;
}
if (!TextSplitP::text_to_words(in)) {
LOGDEB(("TextSplitDb: TextSplit::text_to_words failed\n"));
goto out;
}
try {
// Index the possibly prefixed end term.
doc.add_posting(prefix + end_of_field_term, basepos + curpos + 1,
wdfinc);
++basepos;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db: xapian add_posting error %s\n", ermsg.c_str()));
goto out;
}
ret = true;
out:
basepos += curpos + 100;
return true;
}
void setprefix(const string& pref)
{
@ -725,44 +902,6 @@ private:
int wdfinc;
};
// Reimplement text_to_words to insert the begin and end anchor terms.
bool TextSplitDb::text_to_words(const string &in)
{
bool ret = false;
string ermsg;
try {
// Index the possibly prefixed start term.
doc.add_posting(prefix + start_of_field_term, basepos, wdfinc);
++basepos;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db: xapian add_posting error %s\n", ermsg.c_str()));
goto out;
}
if (!TextSplitP::text_to_words(in)) {
LOGDEB(("TextSplitDb: TextSplit::text_to_words failed\n"));
goto out;
}
try {
// Index the possibly prefixed end term.
doc.add_posting(prefix + end_of_field_term, basepos+curpos+1, wdfinc);
++basepos;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db: xapian add_posting error %s\n", ermsg.c_str()));
goto out;
}
ret = true;
out:
basepos += curpos + 100;
return true;
}
class TermProcIdx : public TermProc {
public:
TermProcIdx() : TermProc(0), m_ts(0), m_lastpagepos(0), m_pageincr(0) {}
@ -880,9 +1019,7 @@ void Db::setAbstractParams(int idxtrunc, int syntlen, int syntctxlen)
m_synthAbsWordCtxLen = syntctxlen;
}
static const int MB = 1024 * 1024;
static const string cstr_nc("\n\r\x0c");
#define RECORD_APPEND(R, NM, VAL) {R += NM + "=" + VAL + "\n";}
// Add document in internal form to the database: index the terms in
@ -1177,78 +1314,6 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc)
doc.text.length());
}
bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm,
Xapian::Document& newdocument, size_t textlen)
{
#ifdef IDX_THREADS
Chrono chron;
// In the case where there is a separate (single) db update
// thread, we only need to protect the update map update below
// (against interaction with threads calling needUpdate()). Else,
// all threads from above need to synchronize here
PTMutexLocker lock(m_mutex, m_havewriteq);
#endif
// Check file system full every mbyte of indexed text. It's a bit wasteful
// to do this after having prepared the document, but it needs to be in
// the single-threaded section.
if (m_rcldb->m_maxFsOccupPc > 0 &&
(m_rcldb->m_occFirstCheck ||
(m_rcldb->m_curtxtsz - m_rcldb->m_occtxtsz) / MB >= 1)) {
LOGDEB(("Db::add: checking file system usage\n"));
int pc;
m_rcldb->m_occFirstCheck = 0;
if (fsocc(m_rcldb->m_basedir, &pc) && pc >= m_rcldb->m_maxFsOccupPc) {
LOGERR(("Db::add: stop indexing: file system "
"%d%% full > max %d%%\n", pc, m_rcldb->m_maxFsOccupPc));
return false;
}
m_rcldb->m_occtxtsz = m_rcldb->m_curtxtsz;
}
const char *fnc = udi.c_str();
string ermsg;
// Add db entry or update existing entry:
try {
Xapian::docid did =
xwdb.replace_document(uniterm, newdocument);
#ifdef IDX_THREADS
// Need to protect against interaction with the up-to-date checks
// which also update the existence map
PTMutexLocker lock(m_mutex, !m_havewriteq);
#endif
if (did < m_rcldb->updated.size()) {
m_rcldb->updated[did] = true;
LOGINFO(("Db::add: docid %d updated [%s]\n", did, fnc));
} else {
LOGINFO(("Db::add: docid %d added [%s]\n", did, fnc));
}
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: replace_document failed: %s\n", ermsg.c_str()));
ermsg.erase();
// FIXME: is this ever actually needed?
try {
xwdb.add_document(newdocument);
LOGDEB(("Db::add: %s added (failed re-seek for duplicate)\n",
fnc));
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::add: add_document failed: %s\n", ermsg.c_str()));
return false;
}
}
// Test if we're over the flush threshold (limit memory usage):
bool ret = m_rcldb->maybeflush(textlen);
#ifdef IDX_THREADS
m_totalworkns += chron.nanos();
#endif
return ret;
}
#ifdef IDX_THREADS
void Db::waitUpdIdle()
{
@ -1602,71 +1667,6 @@ bool Db::purgeOrphans(const string &udi)
return m_ndb->purgeFileWrite(true, udi, uniterm);
}
bool Db::Native::purgeFileWrite(bool orphansOnly, const string& udi,
const string& uniterm)
{
#if defined(IDX_THREADS)
// We need a mutex even if we have a write queue (so we can only
// be called by a single thread) to protect about multiple acces
// to xrdb from subDocs() which is also called from needupdate()
// (called from outside the write thread !
PTMutexLocker lock(m_mutex);
#endif // IDX_THREADS
string ermsg;
try {
Xapian::PostingIterator docid = xwdb.postlist_begin(uniterm);
if (docid == xwdb.postlist_end(uniterm)) {
return true;
}
if (m_rcldb->m_flushMb > 0) {
Xapian::termcount trms = xwdb.get_doclength(*docid);
m_rcldb->maybeflush(trms * 5);
}
string sig;
if (orphansOnly) {
Xapian::Document doc = xwdb.get_document(*docid);
sig = doc.get_value(VALUE_SIG);
if (sig.empty()) {
LOGINFO(("purgeFileWrite: got empty sig\n"));
return false;
}
} else {
LOGDEB(("purgeFile: delete docid %d\n", *docid));
xwdb.delete_document(*docid);
}
vector<Xapian::docid> docids;
subDocs(udi, docids);
LOGDEB(("purgeFile: subdocs cnt %d\n", docids.size()));
for (vector<Xapian::docid>::iterator it = docids.begin();
it != docids.end(); it++) {
if (m_rcldb->m_flushMb > 0) {
Xapian::termcount trms = xwdb.get_doclength(*it);
m_rcldb->maybeflush(trms * 5);
}
string subdocsig;
if (orphansOnly) {
Xapian::Document doc = xwdb.get_document(*it);
subdocsig = doc.get_value(VALUE_SIG);
if (subdocsig.empty()) {
LOGINFO(("purgeFileWrite: got empty sig for subdoc??\n"));
continue;
}
}
if (!orphansOnly || sig != subdocsig) {
LOGDEB(("Db::purgeFile: delete subdoc %d\n", *it));
xwdb.delete_document(*it);
}
}
return true;
} XCATCHERROR(ermsg);
if (!ermsg.empty()) {
LOGERR(("Db::purgeFileWrite: %s\n", ermsg.c_str()));
}
return false;
}
bool Db::dbStats(DbStats& res)
{
if (!m_ndb || !m_ndb->m_isopen)

View File

@ -19,31 +19,32 @@
#include "debuglog.h"
namespace Rcl {
const string Doc::keyurl("url");
const string Doc::keyabs("abstract");
const string Doc::keyanc("rclanc");
const string Doc::keyapptg("rclaptg");
const string Doc::keyau("author");
const string Doc::keybcknd("rclbes");
const string Doc::keybght("beagleHitType");
const string Doc::keycc("collapsecount");
const string Doc::keychildurl("childurl");
const string Doc::keyfn("filename");
const string Doc::keyipt("ipath");
const string Doc::keytp("mtype");
const string Doc::keyfmt("fmtime");
const string Doc::keydmt("dmtime");
const string Doc::keyds("dbytes");
const string Doc::keyfmt("fmtime");
const string Doc::keyfn("filename");
const string Doc::keyfs("fbytes");
const string Doc::keyipt("ipath");
const string Doc::keykw("keywords");
const string Doc::keymd5("md5");
const string Doc::keymt("mtime");
const string Doc::keyoc("origcharset");
const string Doc::keypcs("pcbytes");
const string Doc::keyfs("fbytes");
const string Doc::keyds("dbytes");
const string Doc::keysz("size");
const string Doc::keysig("sig");
const string Doc::keyrr("relevancyrating");
const string Doc::keycc("collapsecount");
const string Doc::keyabs("abstract");
const string Doc::keyau("author");
const string Doc::keysig("sig");
const string Doc::keysz("size");
const string Doc::keytp("mtype");
const string Doc::keytt("title");
const string Doc::keykw("keywords");
const string Doc::keymd5("md5");
const string Doc::keybcknd("rclbes");
const string Doc::keyudi("rcludi");
const string Doc::keyapptg("rclaptg");
const string Doc::keybght("beagleHitType");
const string Doc::keyurl("url");
void Doc::dump(bool dotext) const
{

View File

@ -225,6 +225,9 @@ class Doc {
static const string keyudi;
static const string keyapptg; // apptag. Set from localfields (fsindexer)
static const string keybght; // beagle hit type ("beagleHitType")
// Boolean used to indicate if the doc has descendants in the ipath sense
// (different from the file/contend parent_udi thing).
static const string keyanc;
};