Avoid purging existing subdocuments on file indexing error (e.g.: maybe a file lock issue that will go away)

This commit is contained in:
Jean-Francois Dockes 2019-06-21 17:18:15 +02:00
parent db9fd248f3
commit ee8c5410bd
3 changed files with 336 additions and 327 deletions

View File

@ -56,10 +56,10 @@ class DbUpdTask {
public: public:
// Take some care to avoid sharing string data (if string impl is cow) // Take some care to avoid sharing string data (if string impl is cow)
DbUpdTask(const string& u, const string& p, const Rcl::Doc& d) DbUpdTask(const string& u, const string& p, const Rcl::Doc& d)
: udi(u.begin(), u.end()), parent_udi(p.begin(), p.end()) : udi(u.begin(), u.end()), parent_udi(p.begin(), p.end())
{ {
d.copyto(&doc); d.copyto(&doc);
} }
string udi; string udi;
string parent_udi; string parent_udi;
Rcl::Doc doc; Rcl::Doc doc;
@ -70,11 +70,11 @@ class InternfileTask {
public: public:
// Take some care to avoid sharing string data (if string impl is cow) // Take some care to avoid sharing string data (if string impl is cow)
InternfileTask(const std::string &f, const struct stat *i_stp, InternfileTask(const std::string &f, const struct stat *i_stp,
map<string,string> lfields) map<string,string> lfields)
: fn(f.begin(), f.end()), statbuf(*i_stp) : fn(f.begin(), f.end()), statbuf(*i_stp)
{ {
map_ss_cp_noshr(lfields, &localfields); map_ss_cp_noshr(lfields, &localfields);
} }
string fn; string fn;
struct stat statbuf; struct stat statbuf;
map<string,string> localfields; map<string,string> localfields;
@ -91,12 +91,12 @@ class FSIFIMissingStore : public FIMissingStore {
#endif #endif
public: public:
virtual void addMissing(const string& prog, const string& mt) virtual void addMissing(const string& prog, const string& mt)
{ {
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::unique_lock<std::mutex> locker(m_mutex); std::unique_lock<std::mutex> locker(m_mutex);
#endif #endif
FIMissingStore::addMissing(prog, mt); FIMissingStore::addMissing(prog, mt);
} }
}; };
FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc) FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
@ -118,20 +118,20 @@ FsIndexer::FsIndexer(RclConfig *cnf, Rcl::Db *db, DbIxStatusUpdater *updfunc)
int internqlen = cnf->getThrConf(RclConfig::ThrIntern).first; int internqlen = cnf->getThrConf(RclConfig::ThrIntern).first;
int internthreads = cnf->getThrConf(RclConfig::ThrIntern).second; int internthreads = cnf->getThrConf(RclConfig::ThrIntern).second;
if (internqlen >= 0) { if (internqlen >= 0) {
if (!m_iwqueue.start(internthreads, FsIndexerInternfileWorker, this)) { if (!m_iwqueue.start(internthreads, FsIndexerInternfileWorker, this)) {
LOGERR("FsIndexer::FsIndexer: intern worker start failed\n"); LOGERR("FsIndexer::FsIndexer: intern worker start failed\n");
return; return;
} }
m_haveInternQ = true; m_haveInternQ = true;
} }
int splitqlen = cnf->getThrConf(RclConfig::ThrSplit).first; int splitqlen = cnf->getThrConf(RclConfig::ThrSplit).first;
int splitthreads = cnf->getThrConf(RclConfig::ThrSplit).second; int splitthreads = cnf->getThrConf(RclConfig::ThrSplit).second;
if (splitqlen >= 0) { if (splitqlen >= 0) {
if (!m_dwqueue.start(splitthreads, FsIndexerDbUpdWorker, this)) { if (!m_dwqueue.start(splitthreads, FsIndexerDbUpdWorker, this)) {
LOGERR("FsIndexer::FsIndexer: split worker start failed\n"); LOGERR("FsIndexer::FsIndexer: split worker start failed\n");
return; return;
} }
m_haveSplitQ = true; m_haveSplitQ = true;
} }
LOGDEB("FsIndexer: threads: haveIQ " << m_haveInternQ << " iql " << LOGDEB("FsIndexer: threads: haveIQ " << m_haveInternQ << " iql " <<
internqlen << " iqts " << internthreads << " haveSQ " << internqlen << " iqts " << internthreads << " haveSQ " <<
@ -147,12 +147,12 @@ FsIndexer::~FsIndexer()
#ifdef IDX_THREADS #ifdef IDX_THREADS
void *status; void *status;
if (m_haveInternQ) { if (m_haveInternQ) {
status = m_iwqueue.setTerminateAndWait(); status = m_iwqueue.setTerminateAndWait();
LOGDEB0("FsIndexer: internfile wrkr status: "<< status << " (1->ok)\n"); LOGDEB0("FsIndexer: internfile wrkr status: "<< status << " (1->ok)\n");
} }
if (m_haveSplitQ) { if (m_haveSplitQ) {
status = m_dwqueue.setTerminateAndWait(); status = m_dwqueue.setTerminateAndWait();
LOGDEB0("FsIndexer: dbupd worker status: " << status << " (1->ok)\n"); LOGDEB0("FsIndexer: dbupd worker status: " << status << " (1->ok)\n");
} }
delete m_stableconfig; delete m_stableconfig;
#endif // IDX_THREADS #endif // IDX_THREADS
@ -179,23 +179,23 @@ bool FsIndexer::index(int flags)
m_noretryfailed = (flags & ConfIndexer::IxFNoRetryFailed) != 0; m_noretryfailed = (flags & ConfIndexer::IxFNoRetryFailed) != 0;
Chrono chron; Chrono chron;
if (!init()) if (!init())
return false; return false;
if (m_updater) { if (m_updater) {
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::unique_lock<std::mutex> locker(m_updater->m_mutex); std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif #endif
m_updater->status.dbtotdocs = m_db->docCnt(); m_updater->status.dbtotdocs = m_db->docCnt();
} }
m_walker.setSkippedPaths(m_config->getSkippedPaths()); m_walker.setSkippedPaths(m_config->getSkippedPaths());
if (quickshallow) { if (quickshallow) {
m_walker.setOpts(m_walker.getOpts() | FsTreeWalker::FtwSkipDotFiles); m_walker.setOpts(m_walker.getOpts() | FsTreeWalker::FtwSkipDotFiles);
m_walker.setMaxDepth(2); m_walker.setMaxDepth(2);
} }
for (const auto& topdir : m_tdl) { for (const auto& topdir : m_tdl) {
LOGDEB("FsIndexer::index: Indexing " << topdir << " into " << LOGDEB("FsIndexer::index: Indexing " << topdir << " into " <<
getDbDir() << "\n"); getDbDir() << "\n");
// If a topdirs member appears to be not here or not mounted // If a topdirs member appears to be not here or not mounted
@ -206,48 +206,48 @@ bool FsIndexer::index(int flags)
continue; continue;
} }
// Set the current directory in config so that subsequent // Set the current directory in config so that subsequent
// getConfParams() will get local values // getConfParams() will get local values
m_config->setKeyDir(topdir); m_config->setKeyDir(topdir);
// Adjust the "follow symlinks" option // Adjust the "follow symlinks" option
bool follow; bool follow;
int opts = m_walker.getOpts(); int opts = m_walker.getOpts();
if (m_config->getConfParam("followLinks", &follow) && follow) { if (m_config->getConfParam("followLinks", &follow) && follow) {
opts |= FsTreeWalker::FtwFollow; opts |= FsTreeWalker::FtwFollow;
} else { } else {
opts &= ~FsTreeWalker::FtwFollow; opts &= ~FsTreeWalker::FtwFollow;
} }
m_walker.setOpts(opts); m_walker.setOpts(opts);
int abslen; int abslen;
if (m_config->getConfParam("idxabsmlen", &abslen)) if (m_config->getConfParam("idxabsmlen", &abslen))
m_db->setAbstractParams(abslen, -1, -1); m_db->setAbstractParams(abslen, -1, -1);
// Walk the directory tree // Walk the directory tree
if (m_walker.walk(topdir, *this) != FsTreeWalker::FtwOk) { if (m_walker.walk(topdir, *this) != FsTreeWalker::FtwOk) {
LOGERR("FsIndexer::index: error while indexing " << topdir << LOGERR("FsIndexer::index: error while indexing " << topdir <<
": " << m_walker.getReason() << "\n"); ": " << m_walker.getReason() << "\n");
return false; return false;
} }
} }
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_haveInternQ) if (m_haveInternQ)
m_iwqueue.waitIdle(); m_iwqueue.waitIdle();
if (m_haveSplitQ) if (m_haveSplitQ)
m_dwqueue.waitIdle(); m_dwqueue.waitIdle();
m_db->waitUpdIdle(); m_db->waitUpdIdle();
#endif // IDX_THREADS #endif // IDX_THREADS
if (m_missing) { if (m_missing) {
string missing; string missing;
m_missing->getMissingDescription(missing); m_missing->getMissingDescription(missing);
if (!missing.empty()) { if (!missing.empty()) {
LOGINFO("FsIndexer::index missing helper program(s):\n" << LOGINFO("FsIndexer::index missing helper program(s):\n" <<
missing << "\n"); missing << "\n");
} }
m_config->storeMissingHelperDesc(missing); m_config->storeMissingHelperDesc(missing);
} }
LOGINFO("fsindexer index time: " << chron.millis() << " mS\n"); LOGINFO("fsindexer index time: " << chron.millis() << " mS\n");
return true; return true;
@ -342,7 +342,7 @@ bool FsIndexer::indexFiles(list<string>& files, int flags)
int abslen; int abslen;
if (m_config->getConfParam("idxabsmlen", &abslen)) if (m_config->getConfParam("idxabsmlen", &abslen))
m_db->setAbstractParams(abslen, -1, -1); m_db->setAbstractParams(abslen, -1, -1);
m_purgeCandidates.setRecord(true); m_purgeCandidates.setRecord(true);
@ -354,29 +354,29 @@ bool FsIndexer::indexFiles(list<string>& files, int flags)
LOGDEB2("FsIndexer::indexFiles: [" << it << "]\n"); LOGDEB2("FsIndexer::indexFiles: [" << it << "]\n");
m_config->setKeyDir(path_getfather(*it)); m_config->setKeyDir(path_getfather(*it));
if (m_havelocalfields) if (m_havelocalfields)
localfieldsfromconf(); localfieldsfromconf();
bool follow = false; bool follow = false;
m_config->getConfParam("followLinks", &follow); m_config->getConfParam("followLinks", &follow);
walker.setOnlyNames(m_config->getOnlyNames()); walker.setOnlyNames(m_config->getOnlyNames());
walker.setSkippedNames(m_config->getSkippedNames()); walker.setSkippedNames(m_config->getSkippedNames());
// Check path against indexed areas and skipped names/paths // Check path against indexed areas and skipped names/paths
if (!(flags & ConfIndexer::IxFIgnoreSkip) && if (!(flags & ConfIndexer::IxFIgnoreSkip) &&
matchesSkipped(m_tdl, walker, *it)) { matchesSkipped(m_tdl, walker, *it)) {
it++; it++;
continue; continue;
} }
struct stat stb; struct stat stb;
int ststat = path_fileprops(*it, &stb, follow); int ststat = path_fileprops(*it, &stb, follow);
if (ststat != 0) { if (ststat != 0) {
LOGERR("FsIndexer::indexFiles: (l)stat " << *it << ": " << LOGERR("FsIndexer::indexFiles: (l)stat " << *it << ": " <<
strerror(errno) << "\n"); strerror(errno) << "\n");
it++; it++;
continue; continue;
} }
if (!(flags & ConfIndexer::IxFIgnoreSkip) && if (!(flags & ConfIndexer::IxFIgnoreSkip) &&
(S_ISREG(stb.st_mode) || S_ISLNK(stb.st_mode))) { (S_ISREG(stb.st_mode) || S_ISLNK(stb.st_mode))) {
if (!walker.inOnlyNames(path_getsimple(*it))) { if (!walker.inOnlyNames(path_getsimple(*it))) {
@ -384,11 +384,11 @@ bool FsIndexer::indexFiles(list<string>& files, int flags)
continue; continue;
} }
} }
if (processone(*it, &stb, FsTreeWalker::FtwRegular) != if (processone(*it, &stb, FsTreeWalker::FtwRegular) !=
FsTreeWalker::FtwOk) { FsTreeWalker::FtwOk) {
LOGERR("FsIndexer::indexFiles: processone failed\n"); LOGERR("FsIndexer::indexFiles: processone failed\n");
goto out; goto out;
} }
it = files.erase(it); it = files.erase(it);
} }
@ -396,23 +396,23 @@ bool FsIndexer::indexFiles(list<string>& files, int flags)
out: out:
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_haveInternQ) if (m_haveInternQ)
m_iwqueue.waitIdle(); m_iwqueue.waitIdle();
if (m_haveSplitQ) if (m_haveSplitQ)
m_dwqueue.waitIdle(); m_dwqueue.waitIdle();
m_db->waitUpdIdle(); m_db->waitUpdIdle();
#endif // IDX_THREADS #endif // IDX_THREADS
// Purge possible orphan documents // Purge possible orphan documents
if (ret == true) { if (ret == true) {
LOGDEB("Indexfiles: purging orphans\n"); LOGDEB("Indexfiles: purging orphans\n");
const vector<string>& purgecandidates = m_purgeCandidates.getCandidates(); const vector<string>& purgecandidates = m_purgeCandidates.getCandidates();
for (vector<string>::const_iterator it = purgecandidates.begin(); for (vector<string>::const_iterator it = purgecandidates.begin();
it != purgecandidates.end(); it++) { it != purgecandidates.end(); it++) {
LOGDEB("Indexfiles: purging orphans for " << *it << "\n"); LOGDEB("Indexfiles: purging orphans for " << *it << "\n");
m_db->purgeOrphans(*it); m_db->purgeOrphans(*it);
} }
#ifdef IDX_THREADS #ifdef IDX_THREADS
m_db->waitUpdIdle(); m_db->waitUpdIdle();
#endif // IDX_THREADS #endif // IDX_THREADS
} }
@ -427,18 +427,18 @@ bool FsIndexer::purgeFiles(list<string>& files)
LOGDEB("FsIndexer::purgeFiles\n"); LOGDEB("FsIndexer::purgeFiles\n");
bool ret = false; bool ret = false;
if (!init()) if (!init())
return false; return false;
for (list<string>::iterator it = files.begin(); it != files.end(); ) { for (list<string>::iterator it = files.begin(); it != files.end(); ) {
string udi; string udi;
make_udi(*it, cstr_null, udi); make_udi(*it, cstr_null, udi);
// rcldb::purgefile returns true if the udi was either not // rcldb::purgefile returns true if the udi was either not
// found or deleted, false only in case of actual error // found or deleted, false only in case of actual error
bool existed; bool existed;
if (!m_db->purgeFile(udi, &existed)) { if (!m_db->purgeFile(udi, &existed)) {
LOGERR("FsIndexer::purgeFiles: Database error\n"); LOGERR("FsIndexer::purgeFiles: Database error\n");
goto out; goto out;
} }
// If we actually deleted something, take it off the list // If we actually deleted something, take it off the list
if (existed) { if (existed) {
it = files.erase(it); it = files.erase(it);
@ -451,9 +451,9 @@ bool FsIndexer::purgeFiles(list<string>& files)
out: out:
#ifdef IDX_THREADS #ifdef IDX_THREADS
if (m_haveInternQ) if (m_haveInternQ)
m_iwqueue.waitIdle(); m_iwqueue.waitIdle();
if (m_haveSplitQ) if (m_haveSplitQ)
m_dwqueue.waitIdle(); m_dwqueue.waitIdle();
m_db->waitUpdIdle(); m_db->waitUpdIdle();
#endif // IDX_THREADS #endif // IDX_THREADS
LOGDEB("FsIndexer::purgeFiles: done\n"); LOGDEB("FsIndexer::purgeFiles: done\n");
@ -468,12 +468,12 @@ void FsIndexer::localfieldsfromconf()
string sfields; string sfields;
m_config->getConfParam("localfields", sfields); m_config->getConfParam("localfields", sfields);
if (!sfields.compare(m_slocalfields)) if (!sfields.compare(m_slocalfields))
return; return;
m_slocalfields = sfields; m_slocalfields = sfields;
m_localfields.clear(); m_localfields.clear();
if (sfields.empty()) if (sfields.empty())
return; return;
string value; string value;
ConfSimple attrs; ConfSimple attrs;
@ -481,9 +481,9 @@ void FsIndexer::localfieldsfromconf()
vector<string> nmlst = attrs.getNames(cstr_null); vector<string> nmlst = attrs.getNames(cstr_null);
for (vector<string>::const_iterator it = nmlst.begin(); for (vector<string>::const_iterator it = nmlst.begin();
it != nmlst.end(); it++) { it != nmlst.end(); it++) {
string nm = m_config->fieldCanon(*it); string nm = m_config->fieldCanon(*it);
attrs.get(*it, m_localfields[nm]); attrs.get(*it, m_localfields[nm]);
LOGDEB2("FsIndexer::localfieldsfromconf: [" << nm << "]->[" << LOGDEB2("FsIndexer::localfieldsfromconf: [" << nm << "]->[" <<
m_localfields[nm] << "]\n"); m_localfields[nm] << "]\n");
} }
} }
@ -491,11 +491,11 @@ void FsIndexer::localfieldsfromconf()
void FsIndexer::setlocalfields(const map<string, string>& fields, Rcl::Doc& doc) void FsIndexer::setlocalfields(const map<string, string>& fields, Rcl::Doc& doc)
{ {
for (map<string, string>::const_iterator it = fields.begin(); for (map<string, string>::const_iterator it = fields.begin();
it != fields.end(); it++) { it != fields.end(); it++) {
// Being chosen by the user, localfields override values from // Being chosen by the user, localfields override values from
// the filter. The key is already canonic (see // the filter. The key is already canonic (see
// localfieldsfromconf()) // localfieldsfromconf())
doc.meta[it->first] = it->second; doc.meta[it->first] = it->second;
} }
} }
@ -518,18 +518,18 @@ void *FsIndexerDbUpdWorker(void * fsp)
DbUpdTask *tsk; DbUpdTask *tsk;
for (;;) { for (;;) {
size_t qsz; size_t qsz;
if (!tqp->take(&tsk, &qsz)) { if (!tqp->take(&tsk, &qsz)) {
tqp->workerExit(); tqp->workerExit();
return (void*)1; return (void*)1;
} }
LOGDEB0("FsIndexerDbUpdWorker: task ql " << qsz << "\n"); LOGDEB0("FsIndexerDbUpdWorker: task ql " << qsz << "\n");
if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) { if (!fip->m_db->addOrUpdate(tsk->udi, tsk->parent_udi, tsk->doc)) {
LOGERR("FsIndexerDbUpdWorker: addOrUpdate failed\n"); LOGERR("FsIndexerDbUpdWorker: addOrUpdate failed\n");
tqp->workerExit(); tqp->workerExit();
return (void*)0; return (void*)0;
} }
delete tsk; delete tsk;
} }
} }
@ -542,20 +542,20 @@ void *FsIndexerInternfileWorker(void * fsp)
InternfileTask *tsk = 0; InternfileTask *tsk = 0;
for (;;) { for (;;) {
if (!tqp->take(&tsk)) { if (!tqp->take(&tsk)) {
tqp->workerExit(); tqp->workerExit();
return (void*)1; return (void*)1;
} }
LOGDEB0("FsIndexerInternfileWorker: task fn " << tsk->fn << "\n"); LOGDEB0("FsIndexerInternfileWorker: task fn " << tsk->fn << "\n");
if (fip->processonefile(&myconf, tsk->fn, &tsk->statbuf, if (fip->processonefile(&myconf, tsk->fn, &tsk->statbuf,
tsk->localfields) != tsk->localfields) !=
FsTreeWalker::FtwOk) { FsTreeWalker::FtwOk) {
LOGERR("FsIndexerInternfileWorker: processone failed\n"); LOGERR("FsIndexerInternfileWorker: processone failed\n");
tqp->workerExit(); tqp->workerExit();
return (void*)0; return (void*)0;
} }
LOGDEB1("FsIndexerInternfileWorker: done fn " << tsk->fn << "\n"); LOGDEB1("FsIndexerInternfileWorker: done fn " << tsk->fn << "\n");
delete tsk; delete tsk;
} }
} }
#endif // IDX_THREADS #endif // IDX_THREADS
@ -573,33 +573,29 @@ void *FsIndexerInternfileWorker(void * fsp)
/// mostly contains pretty raw utf8 data. /// mostly contains pretty raw utf8 data.
FsTreeWalker::Status FsTreeWalker::Status
FsIndexer::processone(const std::string &fn, const struct stat *stp, FsIndexer::processone(const std::string &fn, const struct stat *stp,
FsTreeWalker::CbFlag flg) FsTreeWalker::CbFlag flg)
{ {
if (m_updater) { if (m_updater) {
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::unique_lock<std::mutex> locker(m_updater->m_mutex); std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif #endif
if (!m_updater->update()) { if (!m_updater->update()) {
return FsTreeWalker::FtwStop; return FsTreeWalker::FtwStop;
} }
} }
// If we're changing directories, possibly adjust parameters (set // If we're changing directories, possibly adjust parameters (set
// the current directory in configuration object) // the current directory in configuration object)
if (flg == FsTreeWalker::FtwDirEnter || if (flg == FsTreeWalker::FtwDirEnter || flg == FsTreeWalker::FtwDirReturn) {
flg == FsTreeWalker::FtwDirReturn) { m_config->setKeyDir(fn);
m_config->setKeyDir(fn); // Set up filter/skipped patterns for this subtree.
m_walker.setOnlyNames(m_config->getOnlyNames());
// Set up filter/skipped patterns for this subtree. m_walker.setSkippedNames(m_config->getSkippedNames());
m_walker.setOnlyNames(m_config->getOnlyNames());
m_walker.setSkippedNames(m_config->getSkippedNames());
// Adjust local fields from config for this subtree // Adjust local fields from config for this subtree
if (m_havelocalfields) if (m_havelocalfields)
localfieldsfromconf(); localfieldsfromconf();
if (flg == FsTreeWalker::FtwDirReturn)
if (flg == FsTreeWalker::FtwDirReturn) return FsTreeWalker::FtwOk;
return FsTreeWalker::FtwOk;
} }
#ifdef IDX_THREADS #ifdef IDX_THREADS
@ -608,7 +604,7 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
if (m_iwqueue.put(tp)) { if (m_iwqueue.put(tp)) {
return FsTreeWalker::FtwOk; return FsTreeWalker::FtwOk;
} else { } else {
return FsTreeWalker::FtwError; return FsTreeWalker::FtwError;
} }
} }
#endif #endif
@ -616,10 +612,29 @@ FsIndexer::processone(const std::string &fn, const struct stat *stp,
return processonefile(m_config, fn, stp, m_localfields); return processonefile(m_config, fn, stp, m_localfields);
} }
// Start db update, either by queueing or by direct call
bool FsIndexer::launchAddOrUpdate(const string& udi, const string& parent_udi,
Rcl::Doc& doc)
{
#ifdef IDX_THREADS
if (m_haveSplitQ) {
DbUpdTask *tp = new DbUpdTask(udi, parent_udi, doc);
if (!m_dwqueue.put(tp)) {
LOGERR("processonefile: wqueue.put failed\n");
return false;
} else {
return true;
}
}
#endif
return m_db->addOrUpdate(udi, parent_udi, doc);
}
FsTreeWalker::Status FsTreeWalker::Status
FsIndexer::processonefile(RclConfig *config, FsIndexer::processonefile(RclConfig *config,
const std::string &fn, const struct stat *stp, const std::string &fn, const struct stat *stp,
const map<string, string>& localfields) const map<string, string>& localfields)
{ {
//////////////////// ////////////////////
// Check db up to date ? Doing this before file type // Check db up to date ? Doing this before file type
@ -633,10 +648,9 @@ FsIndexer::processonefile(RclConfig *config,
// excludedmimetypes, etc. // excludedmimetypes, etc.
config->setKeyDir(path_getfather(fn)); config->setKeyDir(path_getfather(fn));
// Document signature. This is based on m/ctime and size and used // File signature and up to date check. The sig is based on
// for the uptodate check (the value computed here is checked // m/ctime and size and the possibly new value is checked against
// against the stored one). Changing the computation forces a full // the stored one.
// reindex of course.
string sig; string sig;
makesig(stp, sig); makesig(stp, sig);
string udi; string udi;
@ -657,7 +671,7 @@ FsIndexer::processonefile(RclConfig *config,
// miss the data update. We would have to store both the mtime and // miss the data update. We would have to store both the mtime and
// the ctime to avoid this // the ctime to avoid this
bool xattronly = m_detectxattronly && !m_db->inFullReset() && bool xattronly = m_detectxattronly && !m_db->inFullReset() &&
existingDoc && needupdate && (stp->st_mtime < stp->st_ctime); existingDoc && needupdate && (stp->st_mtime < stp->st_ctime);
LOGDEB("processone: needupdate " << needupdate << " noretry " << LOGDEB("processone: needupdate " << needupdate << " noretry " <<
m_noretryfailed << " existing " << existingDoc << " oldsig [" << m_noretryfailed << " existing " << existingDoc << " oldsig [" <<
@ -678,19 +692,19 @@ FsIndexer::processonefile(RclConfig *config,
} }
if (!needupdate) { if (!needupdate) {
LOGDEB0("processone: up to date: " << fn << "\n"); LOGDEB0("processone: up to date: " << fn << "\n");
if (m_updater) { if (m_updater) {
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::unique_lock<std::mutex> locker(m_updater->m_mutex); std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif #endif
// Status bar update, abort request etc. // Status bar update, abort request etc.
m_updater->status.fn = fn; m_updater->status.fn = fn;
++(m_updater->status.filesdone); ++(m_updater->status.filesdone);
if (!m_updater->update()) { if (!m_updater->update()) {
return FsTreeWalker::FtwStop; return FsTreeWalker::FtwStop;
} }
} }
return FsTreeWalker::FtwOk; return FsTreeWalker::FtwOk;
} }
LOGDEB0("processone: processing: [" << LOGDEB0("processone: processing: [" <<
@ -712,137 +726,135 @@ FsIndexer::processonefile(RclConfig *config,
string mimetype; string mimetype;
if (!xattronly) { if (!xattronly) {
FileInterner interner(fn, stp, config, FileInterner::FIF_none); FileInterner interner(fn, stp, config, FileInterner::FIF_none);
if (!interner.ok()) { if (!interner.ok()) {
// no indexing whatsoever in this case. This typically means that // no indexing whatsoever in this case. This typically means that
// indexallfilenames is not set // indexallfilenames is not set
return FsTreeWalker::FtwOk; return FsTreeWalker::FtwOk;
} }
mimetype = interner.getMimetype(); mimetype = interner.getMimetype();
interner.setMissingStore(m_missing); interner.setMissingStore(m_missing);
FileInterner::Status fis = FileInterner::FIAgain; FileInterner::Status fis = FileInterner::FIAgain;
bool hadNonNullIpath = false; bool hadNonNullIpath = false;
while (fis == FileInterner::FIAgain) { while (fis == FileInterner::FIAgain) {
doc.erase(); doc.erase();
try { try {
fis = interner.internfile(doc); fis = interner.internfile(doc);
} catch (CancelExcept) { } catch (CancelExcept) {
LOGERR("fsIndexer::processone: interrupted\n"); LOGERR("fsIndexer::processone: interrupted\n");
return FsTreeWalker::FtwStop; return FsTreeWalker::FtwStop;
} }
// We index at least the file name even if there was an error. // We index at least the file name even if there was an error.
// We'll change the signature to ensure that the indexing will // We'll change the signature to ensure that the indexing will
// be retried every time. // be retried every time.
// If there is an error and the base doc was already seen, // If there is an error and the base doc was already seen,
// we're done // we're done
if (fis == FileInterner::FIError && hadNullIpath) if (fis == FileInterner::FIError && hadNullIpath) {
return FsTreeWalker::FtwOk; return FsTreeWalker::FtwOk;
}
// Internal access path for multi-document files. If empty, this is // Internal access path for multi-document files. If empty, this is
// for the main file. // for the main file.
if (doc.ipath.empty()) { if (doc.ipath.empty()) {
hadNullIpath = true; hadNullIpath = true;
if (hadNonNullIpath) { if (hadNonNullIpath) {
// Note that only the filters can reliably compute // Note that only the filters can reliably compute
// this. What we do is dependant of the doc order (if // this. What we do is dependant of the doc order (if
// we see the top doc first, we won't set the flag) // we see the top doc first, we won't set the flag)
doc.haschildren = true; doc.haschildren = true;
} }
} else { } else {
hadNonNullIpath = true; hadNonNullIpath = true;
} }
make_udi(fn, doc.ipath, udi); make_udi(fn, doc.ipath, udi);
// Set file name, mod time and url if not done by // Set file name, mod time and url if not done by
// filter. We used to set the top-level container file // filter. We used to set the top-level container file
// name for all subdocs without a proper file name, but // name for all subdocs without a proper file name, but
// this did not make sense (resulted in multiple not // this did not make sense (resulted in multiple not
// useful hits on the subdocs when searching for the // useful hits on the subdocs when searching for the
// file name). // file name).
if (doc.fmtime.empty()) if (doc.fmtime.empty())
doc.fmtime = ascdate; doc.fmtime = ascdate;
if (doc.url.empty()) if (doc.url.empty())
doc.url = path_pathtofileurl(fn); doc.url = path_pathtofileurl(fn);
const string *fnp = 0; const string *fnp = 0;
if (doc.ipath.empty()) { if (doc.ipath.empty()) {
if (!doc.peekmeta(Rcl::Doc::keyfn, &fnp) || fnp->empty()) if (!doc.peekmeta(Rcl::Doc::keyfn, &fnp) || fnp->empty())
doc.meta[Rcl::Doc::keyfn] = utf8fn; doc.meta[Rcl::Doc::keyfn] = utf8fn;
} }
// Set container file name for all docs, top or subdoc // Set container file name for all docs, top or subdoc
doc.meta[Rcl::Doc::keytcfn] = utf8fn; doc.meta[Rcl::Doc::keytcfn] = utf8fn;
doc.pcbytes = lltodecstr(stp->st_size); doc.pcbytes = lltodecstr(stp->st_size);
// Document signature for up to date checks. All subdocs inherit the // Document signature for up to date checks. All subdocs inherit the
// file's. // file's.
doc.sig = sig; doc.sig = sig;
// If there was an error, ensure indexing will be // If there was an error, ensure indexing will be
// retried. This is for the once missing, later installed // retried. This is for the once missing, later installed
// filter case. It can make indexing much slower (if there are // filter case. It can make indexing much slower (if there are
// myriads of such files, the ext script is executed for them // myriads of such files, the ext script is executed for them
// and fails every time) // and fails every time)
if (fis == FileInterner::FIError) { if (fis == FileInterner::FIError) {
doc.sig += cstr_plus; doc.sig += cstr_plus;
} }
// Possibly add fields from local config // Possibly add fields from local config
if (m_havelocalfields) if (m_havelocalfields)
setlocalfields(localfields, doc); setlocalfields(localfields, doc);
// Add document to database. If there is an ipath, add it // Add document to database. If there is an ipath, add it
// as a child of the file document. // as a child of the file document.
#ifdef IDX_THREADS if (!launchAddOrUpdate(udi, doc.ipath.empty() ?
if (m_haveSplitQ) { cstr_null : parent_udi, doc)) {
DbUpdTask *tp = new DbUpdTask(udi, doc.ipath.empty() ? return FsTreeWalker::FtwError;
cstr_null : parent_udi, doc); }
if (!m_dwqueue.put(tp)) {
LOGERR("processonefile: wqueue.put failed\n");
return FsTreeWalker::FtwError;
}
} else {
#endif
if (!m_db->addOrUpdate(udi, doc.ipath.empty() ?
cstr_null : parent_udi, doc)) {
return FsTreeWalker::FtwError;
}
#ifdef IDX_THREADS
}
#endif
// Tell what we are doing and check for interrupt request // Tell what we are doing and check for interrupt request
if (m_updater) { if (m_updater) {
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::unique_lock<std::mutex> locker(m_updater->m_mutex); std::unique_lock<std::mutex> locker(m_updater->m_mutex);
#endif #endif
++(m_updater->status.docsdone); ++(m_updater->status.docsdone);
if (m_updater->status.dbtotdocs < m_updater->status.docsdone) if (m_updater->status.dbtotdocs < m_updater->status.docsdone)
m_updater->status.dbtotdocs = m_updater->status.docsdone; m_updater->status.dbtotdocs = m_updater->status.docsdone;
m_updater->status.fn = fn; m_updater->status.fn = fn;
if (!doc.ipath.empty()) { if (!doc.ipath.empty()) {
m_updater->status.fn += "|" + doc.ipath; m_updater->status.fn += "|" + doc.ipath;
} else { } else {
if (fis == FileInterner::FIError) { if (fis == FileInterner::FIError) {
++(m_updater->status.fileerrors); ++(m_updater->status.fileerrors);
} }
++(m_updater->status.filesdone); ++(m_updater->status.filesdone);
} }
if (!m_updater->update()) { if (!m_updater->update()) {
return FsTreeWalker::FtwStop; return FsTreeWalker::FtwStop;
} }
} }
} }
// If this doc existed and it's a container, recording for if (fis == FileInterner::FIError) {
// possible subdoc purge (this will be used only if we don't do a // In case of error, avoid purging any existing
// db-wide purge, e.g. if we're called from indexfiles()). // subdoc. For example on windows, this will avoid erasing
LOGDEB2("processOnefile: existingDoc " << existingDoc << // all the emails from a .ost because it is currently
" hadNonNullIpath " << hadNonNullIpath << "\n"); // locked by Outlook.
if (existingDoc && hadNonNullIpath) { LOGDEB("processonefile: internfile error, marking "
m_purgeCandidates.record(parent_udi); "subdocs as existing\n");
} m_db->udiTreeMarkExisting(parent_udi);
} else {
// If this doc existed and it's a container, recording for
// possible subdoc purge (this will be used only if we don't do a
// db-wide purge, e.g. if we're called from indexfiles()).
LOGDEB2("processOnefile: existingDoc " << existingDoc <<
" hadNonNullIpath " << hadNonNullIpath << "\n");
if (existingDoc && hadNonNullIpath) {
m_purgeCandidates.record(parent_udi);
}
}
} }
// If we had no instance with a null ipath, we create an empty // If we had no instance with a null ipath, we create an empty
@ -852,38 +864,30 @@ FsIndexer::processonefile(RclConfig *config,
// If xattronly is set, ONLY the extattr metadata is valid and will be used // If xattronly is set, ONLY the extattr metadata is valid and will be used
// by the following step. // by the following step.
if (xattronly || hadNullIpath == false) { if (xattronly || hadNullIpath == false) {
LOGDEB("Creating empty doc for file or pure xattr update\n"); LOGDEB("Creating empty doc for file or pure xattr update\n");
Rcl::Doc fileDoc; Rcl::Doc fileDoc;
if (xattronly) { if (xattronly) {
map<string, string> xfields; map<string, string> xfields;
reapXAttrs(config, fn, xfields); reapXAttrs(config, fn, xfields);
docFieldsFromXattrs(config, xfields, fileDoc); docFieldsFromXattrs(config, xfields, fileDoc);
fileDoc.onlyxattr = true; fileDoc.onlyxattr = true;
} else { } else {
fileDoc.fmtime = ascdate; fileDoc.fmtime = ascdate;
fileDoc.meta[Rcl::Doc::keyfn] = fileDoc.meta[Rcl::Doc::keyfn] =
fileDoc.meta[Rcl::Doc::keytcfn] = utf8fn; fileDoc.meta[Rcl::Doc::keytcfn] = utf8fn;
fileDoc.haschildren = true; fileDoc.haschildren = true;
fileDoc.mimetype = mimetype; fileDoc.mimetype = mimetype;
fileDoc.url = path_pathtofileurl(fn); fileDoc.url = path_pathtofileurl(fn);
if (m_havelocalfields) if (m_havelocalfields)
setlocalfields(localfields, fileDoc); setlocalfields(localfields, fileDoc);
fileDoc.pcbytes = lltodecstr(stp->st_size); fileDoc.pcbytes = lltodecstr(stp->st_size);
} }
fileDoc.sig = sig; fileDoc.sig = sig;
#ifdef IDX_THREADS if (!launchAddOrUpdate(parent_udi, cstr_null, fileDoc)) {
if (m_haveSplitQ) { return FsTreeWalker::FtwError;
DbUpdTask *tp = new DbUpdTask(parent_udi, cstr_null, fileDoc); }
if (!m_dwqueue.put(tp))
return FsTreeWalker::FtwError;
else
return FsTreeWalker::FtwOk;
}
#endif
if (!m_db->addOrUpdate(parent_udi, cstr_null, fileDoc))
return FsTreeWalker::FtwError;
} }
return FsTreeWalker::FtwOk; return FsTreeWalker::FtwOk;

View File

@ -1,4 +1,4 @@
/* Copyright (C) 2009 J.F.Dockes /* Copyright (C) 2009-2019 J.F.Dockes
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or * the Free Software Foundation; either version 2 of the License, or
@ -33,19 +33,23 @@ struct stat;
class DbUpdTask; class DbUpdTask;
class InternfileTask; class InternfileTask;
namespace Rcl {
class Doc;
}
/** Index selected parts of the file system /** Index selected parts of the file system
Tree indexing: we inherits FsTreeWalkerCB so that, the processone() Tree indexing: we inherits FsTreeWalkerCB so that, the processone()
method is called by the file-system tree walk code for each file and method is called by the file-system tree walk code for each file and
directory. We keep all state needed while indexing, and finally call directory. We keep all state needed while indexing, and finally call
the methods to purge the db of stale entries and create the stemming the methods to purge the db of stale entries and create the stemming
databases. databases.
Single file(s) indexing: there are also calls to index or purge lists of files. Single file(s) indexing: there are also calls to index or purge lists of files.
No database purging or stem db updating in this case. No database purging or stem db updating in this case.
*/ */
class FsIndexer : public FsTreeWalkerCB { class FsIndexer : public FsTreeWalkerCB {
public: public:
/** Constructor does nothing but store parameters /** Constructor does nothing but store parameters
* *
* @param cnf Configuration data * @param cnf Configuration data
@ -76,39 +80,40 @@ class FsIndexer : public FsTreeWalkerCB {
/** Make signature for file up to date checks */ /** Make signature for file up to date checks */
static void makesig(const struct stat *stp, string& out); static void makesig(const struct stat *stp, string& out);
private:
private:
class PurgeCandidateRecorder { class PurgeCandidateRecorder {
public: public:
PurgeCandidateRecorder() PurgeCandidateRecorder()
: dorecord(false) {} : dorecord(false) {}
void setRecord(bool onoff) void setRecord(bool onoff) {
{ dorecord = onoff;
dorecord = onoff; }
} void record(const string& udi) {
void record(const string& udi) // This test does not need to be protected: the value is set at
{ // init and never changed.
// This test does not need to be protected: the value is set at if (!dorecord)
// init and never changed. return;
if (!dorecord)
return;
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::unique_lock<std::mutex> locker(mutex); std::unique_lock<std::mutex> locker(mutex);
#endif #endif
udis.push_back(udi); udis.push_back(udi);
} }
const vector<string>& getCandidates() const vector<string>& getCandidates() {
{ return udis;
return udis; }
}
private: private:
#ifdef IDX_THREADS #ifdef IDX_THREADS
std::mutex mutex; std::mutex mutex;
#endif #endif
bool dorecord; bool dorecord;
std::vector<std::string> udis; std::vector<std::string> udis;
}; };
bool launchAddOrUpdate(const std::string& udi,
const std::string& parent_udi, Rcl::Doc& doc);
FsTreeWalker m_walker; FsTreeWalker m_walker;
RclConfig *m_config; RclConfig *m_config;
Rcl::Db *m_db; Rcl::Db *m_db;
@ -155,7 +160,7 @@ class FsIndexer : public FsTreeWalkerCB {
string getDbDir() {return m_config->getDbDir();} string getDbDir() {return m_config->getDbDir();}
FsTreeWalker::Status FsTreeWalker::Status
processonefile(RclConfig *config, const string &fn, processonefile(RclConfig *config, const string &fn,
const struct stat *, const map<string,string>& localfields); const struct stat *, const map<string,string>& localfields);
}; };
#endif /* _fsindexer_h_included_ */ #endif /* _fsindexer_h_included_ */

View File

@ -2590,7 +2590,7 @@ bool Db::getSubDocs(const Doc &idoc, vector<Doc>& subdocs)
// used for absent FS mountable volumes. // used for absent FS mountable volumes.
bool Db::udiTreeMarkExisting(const string& udi) bool Db::udiTreeMarkExisting(const string& udi)
{ {
LOGDEB("Db::udiTreeWalk: " << udi << endl); LOGDEB("Db::udiTreeMarkExisting: " << udi << endl);
string wrapd = wrap_prefix(udi_prefix); string wrapd = wrap_prefix(udi_prefix);
string expr = udi + "*"; string expr = udi + "*";