monitor the beagle queue

This commit is contained in:
dockes 2009-11-14 10:29:29 +00:00
parent b8b7cbb850
commit 061aa959c6
7 changed files with 84 additions and 54 deletions

View File

@ -20,6 +20,7 @@ static char rcsid[] = "@(#$Id: $ (C) 2005 J.F.Dockes";
#include "autoconfig.h"
#include <sys/types.h>
#include <string.h>
#include "autoconfig.h"
#include "pathut.h"
@ -169,14 +170,15 @@ public:
const string badtmpdirname = "/no/such/dir/really/can/exist";
BeagleQueueIndexer::BeagleQueueIndexer(RclConfig *cnf, Rcl::Db *db,
DbIxStatusUpdater *updfunc)
: m_config(cnf), m_db(db), m_cache(0), m_updater(updfunc)
: m_config(cnf), m_db(db), m_cache(0), m_updater(updfunc),
m_nocacheindex(false)
{
if (!m_config->getConfParam("beaglequeuedir", m_queuedir))
m_queuedir = path_tildexpand("~/.beagle/ToIndex/");
path_catslash(m_queuedir);
if (m_db && m_tmpdir.empty() || access(m_tmpdir.c_str(), 0) < 0) {
if (m_db && (m_tmpdir.empty() || access(m_tmpdir.c_str(), 0) < 0)) {
string reason;
if (!maketmpdir(m_tmpdir, reason)) {
LOGERR(("DbIndexer: cannot create temporary directory: %s\n",
@ -300,33 +302,30 @@ bool BeagleQueueIndexer::index()
m_queuedir.c_str()));
m_config->setKeyDir(m_queuedir);
// First walk the cache to set the existence flags. We do not
// actually check uptodateness because all files in the cache are
// supposedly already indexed.
//TBD: change this as the cache needs reindexing after an index reset!
// Also, we need to read the cache backwards so that the newest
// version of each file gets indexed? Or find a way to index
// multiple versions ?
bool eof;
if (!m_cache->rewind(eof)) {
if (!eof)
return false;
}
vector<string> alludis;
alludis.reserve(20000);
while (m_cache->next(eof)) {
string dict;
m_cache->getcurrentdict(dict);
ConfSimple cf(dict, 1);
string udi;
if (!cf.get("udi", udi, ""))
continue;
alludis.push_back(udi);
}
for (vector<string>::reverse_iterator it = alludis.rbegin();
it != alludis.rend(); it++) {
if (m_db->needUpdate(*it, "")) {
indexFromCache(*it);
// First check that files in the cache are in the index, in case this
// has been reset. We don't do this when called from indexFiles
if (!m_nocacheindex) {
bool eof;
if (!m_cache->rewind(eof)) {
if (!eof)
return false;
}
vector<string> alludis;
alludis.reserve(20000);
while (m_cache->next(eof)) {
string dict;
m_cache->getcurrentdict(dict);
ConfSimple cf(dict, 1);
string udi;
if (!cf.get("udi", udi, ""))
continue;
alludis.push_back(udi);
}
for (vector<string>::reverse_iterator it = alludis.rbegin();
it != alludis.rend(); it++) {
if (m_db->needUpdate(*it, "")) {
indexFromCache(*it);
}
}
}
@ -339,37 +338,51 @@ bool BeagleQueueIndexer::index()
bool BeagleQueueIndexer::indexFiles(list<string>& files)
{
LOGDEB(("BeagleQueueIndexer::indexFiles\n"));
if (!m_db) {
LOGERR(("BeagleQueueIndexer::indexfiles no db??\n"));
return false;
}
for (list<string>::iterator it = files.begin(); it != files.end(); it++) {
if (it->empty())
continue;//??
for (list<string>::iterator it = files.begin(); it != files.end();) {
if (it->empty()) {//??
it++; continue;
}
string father = path_getfather(*it);
if (father.compare(m_queuedir)) {
LOGDEB(("BeagleQueueIndexer::indexfiles: skipping [%s] (nq)\n",
it->c_str()));
continue;
it++; continue;
}
// Pb: we are often called with the dot file, before the
// normal file exists, and sometimes never called for the
// normal file afterwards (ie for bookmarks where the normal
// file is empty). So we perform a normal queue run at the end
// of the function to catch older stuff. Still this is not
// perfect, sometimes some files will not be indexed before
// the next run.
string fn = path_getsimple(*it);
if (fn.empty() || fn.at(0) == '.')
continue;
if (fn.empty() || fn.at(0) == '.') {
it++; continue;
}
struct stat st;
if (lstat(it->c_str(), &st) != 0) {
LOGERR(("BeagleQueueIndexer::indexfiles: cant stat [%s]\n",
it->c_str()));
continue;
it++; continue;
}
if (!S_ISREG(st.st_mode)) {
LOGDEB(("BeagleQueueIndexer::indexfiles: skipping [%s] (nr)\n",
it->c_str()));
continue;
it++; continue;
}
processone(*it, &st, FsTreeWalker::FtwRegular);
files.erase(it);
it = files.erase(it);
}
m_nocacheindex = true;
index();
// Note: no need to reset nocacheindex, we're in the monitor now
return true;
}

View File

@ -67,6 +67,7 @@ private:
string m_queuedir;
string m_tmpdir;
DbIxStatusUpdater *m_updater;
bool m_nocacheindex;
bool indexFromCache(const string& udi);
};

View File

@ -222,21 +222,20 @@ bool FsIndexer::indexFiles(list<string>& files)
if (!init())
return false;
for (list<string>::iterator it = files.begin();
it != files.end(); it++) {
for (list<string>::iterator it = files.begin(); it != files.end(); ) {
LOGDEB2(("FsIndexer::indexFiles: [%s]\n", it->c_str()));
struct stat stb;
if (lstat(it->c_str(), &stb) != 0) {
LOGERR(("FsIndexer::indexFiles: lstat(%s): %s", it->c_str(),
strerror(errno)));
continue;
it++; continue;
}
// If we get to indexing directory names one day, will need to test
// against dbdir here to avoid modification loops (with rclmon).
if (!S_ISREG(stb.st_mode)) {
LOGDEB(("FsIndexer::indexFiles: skipping [%s] (nr)\n",
it->c_str()));
continue;
it++; continue;
}
string dir = path_getfather(*it);
@ -252,8 +251,9 @@ bool FsIndexer::indexFiles(list<string>& files)
}
// Check path against indexed areas and skipped names/paths
if (matchesSkipped(m_tdl, skpnl, skppl, *it))
continue;
if (matchesSkipped(m_tdl, skpnl, skppl, *it)) {
it++; continue;
}
int abslen;
if (m_config->getConfParam("idxabsmlen", &abslen))
@ -264,7 +264,7 @@ bool FsIndexer::indexFiles(list<string>& files)
LOGERR(("FsIndexer::indexFiles: processone failed\n"));
return false;
}
files.erase(it);
it = files.erase(it);
}
return true;
@ -276,8 +276,7 @@ bool FsIndexer::purgeFiles(list<string>& files)
{
if (!init())
return false;
for (list<string>::iterator it = files.begin();
it != files.end(); it++) {
for (list<string>::iterator it = files.begin(); it != files.end(); ) {
string udi;
make_udi(*it, "", udi);
// rcldb::purgefile returns true if the udi was either not
@ -289,7 +288,9 @@ bool FsIndexer::purgeFiles(list<string>& files)
}
// If we actually deleted something, take it off the list
if (existed) {
files.erase(it);
it = files.erase(it);
} else {
it++;
}
}

View File

@ -106,11 +106,11 @@ bool ConfIndexer::index(bool resetbefore, ixType typestorun)
return true;
}
bool ConfIndexer::indexFiles(std::list<string> &files)
bool ConfIndexer::indexFiles(std::list<string>& ifiles)
{
list<string> myfiles;
for (list<string>::const_iterator it = files.begin();
it != files.end(); it++) {
for (list<string>::const_iterator it = ifiles.begin();
it != ifiles.end(); it++) {
myfiles.push_back(path_canon(*it));
}
myfiles.sort();
@ -125,7 +125,9 @@ bool ConfIndexer::indexFiles(std::list<string> &files)
if (!m_fsindexer)
m_fsindexer = new FsIndexer(m_config, &m_db, m_updater);
if (m_fsindexer)
ret = m_fsindexer->indexFiles(files);
ret = m_fsindexer->indexFiles(myfiles);
LOGDEB2(("ConfIndexer::indexFiles: fsindexer returned %d, "
"%d files remainining\n", ret, myfiles.size()));
if (m_dobeagle && !myfiles.empty()) {
if (!m_beagler)
@ -143,6 +145,7 @@ bool ConfIndexer::indexFiles(std::list<string> &files)
m_config->getDbDir().c_str()));
return false;
}
ifiles = myfiles;
return ret;
}

View File

@ -112,6 +112,7 @@ void *rclMonRcvRun(void *q)
LOGDEB(("rclMonRcvRun: running\n"));
recoll_threadinit();
// Create the fam/whatever interface object
RclMonitor *mon;
if ((mon = makeMonitor()) == 0) {
@ -156,6 +157,15 @@ void *rclMonRcvRun(void *q)
walker.walk(*it, walkcb);
}
bool dobeagle = false;
queue->getConfig()->getConfParam("processbeaglequeue", &dobeagle);
if (dobeagle) {
string beaglequeuedir;
if (!queue->getConfig()->getConfParam("beaglequeuedir", beaglequeuedir))
beaglequeuedir = path_tildexpand("~/.beagle/ToIndex/");
mon->addWatch(beaglequeuedir, true);
}
// Forever wait for monitoring events and add them to queue:
MONDEB(("rclMonRcvRun: waiting for events. q->ok() %d\n", queue->ok()));
while (queue->ok() && mon->ok()) {

View File

@ -321,7 +321,7 @@ int main(int argc, const char **argv)
exit(0);
confindexer = new ConfIndexer(config, &updater);
confindexer->index(rezero);
confindexer->index(rezero, ConfIndexer::IxTAll);
deleteZ(confindexer);
int opts = RCLMON_NONE;
if (op_flags & OPT_D)

View File

@ -27,6 +27,8 @@ static char rcsid[] = "@(#$Id: $ (C) 2009 J.F.Dockes";
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <stdlib.h>
#include <memory.h>
#include <sstream>
#include <iostream>