real time indexing: implement delaying for fast changing files
This commit is contained in:
parent
9c26b2330f
commit
5c4f6e48a5
@ -683,6 +683,21 @@ fvwm
|
||||
your system is short on resources. Periodic indexing is
|
||||
adequate in most cases.</para>
|
||||
|
||||
<sect2 id="rcl.indexing.monitor.fastfiles">
|
||||
<title>Slowing down the reindexing rate for fast changing
|
||||
files</title>
|
||||
|
||||
<para>When using the real time monitor, it may happen that some
|
||||
files need to be indexed, but change so often that they impose an
|
||||
excessive load for the system.</para>
|
||||
|
||||
<para>&RCL; provides a configuration option to specify the minimum
|
||||
time before which a file, specified by a wildcard pattern, cannot be
|
||||
reindexed. See the <literal>mondelaypatterns</literal> parameter in
|
||||
the <link linkend="rcl.install.config.recollconf.misc">
|
||||
configuration section</link>.</para>
|
||||
|
||||
</sect2>
|
||||
</sect1>
|
||||
|
||||
</chapter>
|
||||
@ -3561,7 +3576,6 @@ skippedPaths = ~/somedir/∗.txt
|
||||
</variablelist>
|
||||
</sect3>
|
||||
|
||||
|
||||
<sect3 id="rcl.install.config.recollconf.misc">
|
||||
<title>Miscellaneous parameters:</title>
|
||||
|
||||
@ -3585,6 +3599,40 @@ skippedPaths = ~/somedir/∗.txt
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
|
||||
<varlistentry><term><literal>mondelaypatterns</literal></term>
|
||||
<listitem><para>This allows specify wildcard path patterns
|
||||
(processed with fnmatch(3) with 0 flag), to match files which
|
||||
change too often and for which a delay should be observed before
|
||||
re-indexing. This is a space-separated list, each entry being a
|
||||
pattern and a time in seconds, separated by a colon. You can
|
||||
use double quotes if a path entry contains white
|
||||
space. Example:</para>
|
||||
<programlisting>
|
||||
mondelaypatterns = *.log:20 "this one has spaces*:10"
|
||||
</programlisting>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry><term><literal>monixinterval</literal></term>
|
||||
<listitem><para>Minimum interval (seconds) for processing the
|
||||
indexing queue. The real time monitor does not process each
|
||||
event when it comes in, but will wait this time for the queue
|
||||
to accumulate to diminish overhead and in order to aggregate
|
||||
multiple events to the same file. Default 30 S.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry><term><literal>monauxinterval</literal></term>
|
||||
<listitem><para>Period (in seconds) at which the real time
|
||||
monitor will regenerate the auxiliary databases (spelling,
|
||||
stemming) if needed. The default is one hour.</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
|
||||
|
||||
|
||||
<varlistentry><term><literal>filtermaxseconds</literal></term>
|
||||
<listitem><para>Maximum filter execution time, after which it
|
||||
is aborted. Some postscript programs just loop...</para>
|
||||
|
||||
@ -29,7 +29,7 @@
|
||||
* actually a hash map indexed by file path for easy coalescing of
|
||||
* multiple events to the same file.
|
||||
*/
|
||||
|
||||
#include <time.h>
|
||||
#include <string>
|
||||
#include <map>
|
||||
|
||||
@ -48,9 +48,18 @@ class RclMonEvent {
|
||||
enum EvType {RCLEVT_NONE, RCLEVT_MODIFY, RCLEVT_DELETE,
|
||||
RCLEVT_DIRCREATE};
|
||||
string m_path;
|
||||
string m_opath;
|
||||
EvType m_etyp;
|
||||
RclMonEvent() : m_etyp(RCLEVT_NONE) {}
|
||||
|
||||
///// For fast changing files: minimum time interval before reindex
|
||||
// Minimum interval (from config)
|
||||
int m_itvsecs;
|
||||
// Don't process this entry before:
|
||||
time_t m_minclock;
|
||||
// Changed since put in purgatory after reindex
|
||||
bool m_needidx;
|
||||
|
||||
RclMonEvent() : m_etyp(RCLEVT_NONE),
|
||||
m_itvsecs(0), m_minclock(0), m_needidx(false) {}
|
||||
};
|
||||
|
||||
enum RclMonitorOption {RCLMON_NONE=0, RCLMON_NOFORK=1, RCLMON_NOX11=2};
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
|
||||
/**
|
||||
* Recoll real time monitor processing. This file has the code to retrieve
|
||||
* event from the event queue and do the database-side processing, and the
|
||||
* event from the event queue and do the database-side processing. Also the
|
||||
* initialization function.
|
||||
*/
|
||||
|
||||
@ -29,9 +29,12 @@
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <signal.h>
|
||||
#include <fnmatch.h>
|
||||
|
||||
#include <cstring>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <list>
|
||||
|
||||
#include "debuglog.h"
|
||||
#include "rclmon.h"
|
||||
@ -40,15 +43,90 @@
|
||||
#include "pathut.h"
|
||||
#include "x11mon.h"
|
||||
|
||||
typedef unsigned long mttcast;
|
||||
|
||||
static pthread_t rcv_thrid;
|
||||
|
||||
// Seconds between auxiliary db (stem, spell) updates:
|
||||
static const int dfltauxinterval = 60 *60;
|
||||
static int auxinterval = dfltauxinterval;
|
||||
|
||||
// Seconds between indexing queue processing: for merging events to
|
||||
// fast changing files and saving some of the indexing overhead.
|
||||
static const int dfltixinterval = 30;
|
||||
static int ixinterval = dfltixinterval;
|
||||
|
||||
static RclMonEventQueue rclEQ;
|
||||
|
||||
//
|
||||
// Delayed events: this is a special feature for fast changing files.
|
||||
// A list of pattern/delays can be specified in the configuration so
|
||||
// that they don't get re-indexed before some timeout is elapsed. Such
|
||||
// events are kept on a separate queue (m_dqueue) with an auxiliary
|
||||
// list in time-to-reindex order, while the normal events are on
|
||||
// m_iqueue.
|
||||
|
||||
// Queue management performance: on a typical recoll system there will
|
||||
// be only a few entries on the event queues and no significant time
|
||||
// will be needed to manage them. Even on a busy system, the time used
|
||||
// would most probably be negligible compared to the actual processing
|
||||
// of the indexing events. So this is just for reference. Let I be the
|
||||
// number of immediate events and D the number of delayed ones, N
|
||||
// stands for either.
|
||||
//
|
||||
// Periodic timeout polling: the recollindex process periodically (2S)
|
||||
// wakes up to check for exit requests. At this time it also checks
|
||||
// the queues for new entries (should not happen because the producer
|
||||
// would normally wake up the consumer threads), or ready entries
|
||||
// among the delayed ones. At this time it calls the "empty()"
|
||||
// routine. This has constant time behaviour (checks for stl container
|
||||
// emptiness and the top entry of the delays list).
|
||||
//
|
||||
// Adding a new event (pushEvent()): this performs a search for an
|
||||
// existing event with the same path (O(log(N)), then an insert on the
|
||||
// appropriate queue (O(log(N))) and an insert on the times list (O(D)).
|
||||
//
|
||||
// Popping an event: this is constant time as it just looks at the
|
||||
// tops of the normal and delayed queues.
|
||||
|
||||
|
||||
// Indexing event container: a map indexed by file path for fast
|
||||
// insertion of duplicate events to the same file
|
||||
typedef map<string, RclMonEvent> queue_type;
|
||||
|
||||
// Entries for delayed events are duplicated (as iterators) on an
|
||||
// auxiliary, sorted by time-to-reindex list. We could get rid of
|
||||
// this, the price would be that the RclEQ.empty() call would have to
|
||||
// walk the whole queue instead of only looking at the first delays
|
||||
// entry.
|
||||
typedef list<queue_type::iterator> delays_type;
|
||||
|
||||
// DelayPat stores a path wildcard pattern and a minimum time between
|
||||
// reindexes, it is read from the recoll configuration
|
||||
struct DelayPat {
|
||||
string pattern;
|
||||
int seconds;
|
||||
DelayPat() : seconds(0) {}
|
||||
};
|
||||
|
||||
/** Private part of RclEQ: things that we don't wish to exist in the interface
|
||||
* include file.
|
||||
*/
|
||||
class RclEQData {
|
||||
public:
|
||||
int m_opts;
|
||||
queue_type m_queue;
|
||||
// Queue for normal files (unlimited reindex)
|
||||
queue_type m_iqueue;
|
||||
// Queue for delayed reindex files
|
||||
queue_type m_dqueue;
|
||||
// The delays list stores pointers (iterators) to elements on
|
||||
// m_dqueue. The list is kept in time-to-index order. Elements of
|
||||
// m_dqueue which are also in m_delays can only be deleted while
|
||||
// walking m_delays, so we are certain that the m_dqueue iterators
|
||||
// stored in m_delays remain valid.
|
||||
delays_type m_delays;
|
||||
// Configured intervals for path patterns, read from the configuration.
|
||||
vector<DelayPat> m_delaypats;
|
||||
RclConfig *m_config;
|
||||
bool m_ok;
|
||||
pthread_mutex_t m_mutex;
|
||||
@ -59,9 +137,68 @@ public:
|
||||
if (!pthread_mutex_init(&m_mutex, 0) && !pthread_cond_init(&m_cond, 0))
|
||||
m_ok = true;
|
||||
}
|
||||
void readDelayPats(int dfltsecs);
|
||||
DelayPat searchDelayPats(const string& path)
|
||||
{
|
||||
for (vector<DelayPat>::iterator it = m_delaypats.begin();
|
||||
it != m_delaypats.end(); it++) {
|
||||
if (fnmatch(it->pattern.c_str(), path.c_str(), 0) == 0) {
|
||||
return *it;
|
||||
}
|
||||
}
|
||||
return DelayPat();
|
||||
}
|
||||
void delayInsert(const queue_type::iterator &qit);
|
||||
};
|
||||
|
||||
static RclMonEventQueue rclEQ;
|
||||
void RclEQData::readDelayPats(int dfltsecs)
|
||||
{
|
||||
if (m_config == 0)
|
||||
return;
|
||||
string patstring;
|
||||
if (!m_config->getConfParam("mondelaypatterns", patstring) ||
|
||||
patstring.empty())
|
||||
return;
|
||||
|
||||
vector<string> dplist;
|
||||
if (!stringToStrings(patstring, dplist)) {
|
||||
LOGERR(("rclEQData: bad pattern list: [%s]\n", patstring.c_str()));
|
||||
return;
|
||||
}
|
||||
|
||||
for (vector<string>::iterator it = dplist.begin();
|
||||
it != dplist.end(); it++) {
|
||||
string::size_type pos = it->find_last_of(":");
|
||||
DelayPat dp;
|
||||
dp.pattern = it->substr(0, pos);
|
||||
if (pos != string::npos && pos != it->size()-1) {
|
||||
dp.seconds = atoi(it->substr(pos+1).c_str());
|
||||
} else {
|
||||
dp.seconds = dfltsecs;
|
||||
}
|
||||
m_delaypats.push_back(dp);
|
||||
LOGDEB2(("rclmon::readDelayPats: add [%s] %d\n",
|
||||
dp.pattern.c_str(), dp.seconds));
|
||||
}
|
||||
}
|
||||
|
||||
// Insert event (as queue iterator) into delays list, in time order,
|
||||
// We DO NOT take care of duplicate qits. erase should be called first
|
||||
// when necessary.
|
||||
void RclEQData::delayInsert(const queue_type::iterator &qit)
|
||||
{
|
||||
MONDEB(("RclEQData::delayInsert: minclock %lu\n",
|
||||
(mttcast)qit->second.m_minclock));
|
||||
for (delays_type::iterator dit = m_delays.begin();
|
||||
dit != m_delays.end(); dit++) {
|
||||
queue_type::iterator qit1 = *dit;
|
||||
if ((*qit1).second.m_minclock > qit->second.m_minclock) {
|
||||
m_delays.insert(dit, qit);
|
||||
return;
|
||||
}
|
||||
}
|
||||
m_delays.push_back(qit);
|
||||
}
|
||||
|
||||
RclMonEventQueue::RclMonEventQueue()
|
||||
{
|
||||
@ -73,29 +210,13 @@ RclMonEventQueue::~RclMonEventQueue()
|
||||
delete m_data;
|
||||
}
|
||||
|
||||
bool RclMonEventQueue::empty()
|
||||
{
|
||||
return m_data == 0 ? true : m_data->m_queue.empty();
|
||||
}
|
||||
|
||||
void RclMonEventQueue::setopts(int opts)
|
||||
{
|
||||
if (m_data)
|
||||
m_data->m_opts = opts;
|
||||
}
|
||||
|
||||
// Must be called with the queue locked
|
||||
RclMonEvent RclMonEventQueue::pop()
|
||||
{
|
||||
RclMonEvent ev;
|
||||
if (!empty()) {
|
||||
ev = m_data->m_queue.begin()->second;
|
||||
m_data->m_queue.erase(m_data->m_queue.begin());
|
||||
}
|
||||
return ev;
|
||||
}
|
||||
|
||||
/** Wait until there is something to process on the queue.
|
||||
/** Wait until there is something to process on the queue, or timeout.
|
||||
* Must be called with the queue locked
|
||||
*/
|
||||
bool RclMonEventQueue::wait(int seconds, bool *top)
|
||||
@ -145,6 +266,7 @@ bool RclMonEventQueue::lock()
|
||||
MONDEB(("RclMonEventQueue:: lock return\n"));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RclMonEventQueue::unlock()
|
||||
{
|
||||
MONDEB(("RclMonEventQueue:: unlock\n"));
|
||||
@ -158,6 +280,9 @@ bool RclMonEventQueue::unlock()
|
||||
void RclMonEventQueue::setConfig(RclConfig *cnf)
|
||||
{
|
||||
m_data->m_config = cnf;
|
||||
// Don't use ixinterval here, could be 0 ! Base the default
|
||||
// delayed reindex delay on the default ixinterval delay
|
||||
m_data->readDelayPats(10 * dfltixinterval);
|
||||
}
|
||||
|
||||
RclConfig *RclMonEventQueue::getConfig()
|
||||
@ -168,15 +293,15 @@ RclConfig *RclMonEventQueue::getConfig()
|
||||
bool RclMonEventQueue::ok()
|
||||
{
|
||||
if (m_data == 0) {
|
||||
LOGDEB(("RclMonEventQueue: not ok: bad state\n"));
|
||||
LOGINFO(("RclMonEventQueue: not ok: bad state\n"));
|
||||
return false;
|
||||
}
|
||||
if (stopindexing) {
|
||||
LOGDEB(("RclMonEventQueue: not ok: stop request\n"));
|
||||
LOGINFO(("RclMonEventQueue: not ok: stop request\n"));
|
||||
return false;
|
||||
}
|
||||
if (!m_data->m_ok) {
|
||||
LOGDEB(("RclMonEventQueue: not ok: queue terminated\n"));
|
||||
LOGINFO(("RclMonEventQueue: not ok: queue terminated\n"));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@ -191,24 +316,133 @@ void RclMonEventQueue::setTerminate()
|
||||
unlock();
|
||||
}
|
||||
|
||||
// Must be called with the queue locked
|
||||
bool RclMonEventQueue::empty()
|
||||
{
|
||||
if (m_data == 0) {
|
||||
MONDEB(("RclMonEventQueue::empty(): true (m_data==0)\n"));
|
||||
return true;
|
||||
}
|
||||
if (!m_data->m_iqueue.empty()) {
|
||||
MONDEB(("RclMonEventQueue::empty(): false (m_iqueue not empty)\n"));
|
||||
return true;
|
||||
}
|
||||
if (m_data->m_dqueue.empty()) {
|
||||
MONDEB(("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n"));
|
||||
return true;
|
||||
}
|
||||
// Only dqueue has events. Have to check the delays (only the
|
||||
// first, earliest one):
|
||||
queue_type::iterator qit = *(m_data->m_delays.begin());
|
||||
if (qit->second.m_minclock > time(0)) {
|
||||
MONDEB(("RclMonEventQueue::empty(): true (no delay ready %lu)\n",
|
||||
(mttcast)qit->second.m_minclock));
|
||||
return true;
|
||||
}
|
||||
MONDEB(("RclMonEventQueue::empty(): returning false (delay expired)\n"));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Retrieve indexing event for processing. Returns empty event if
|
||||
// nothing interesting is found
|
||||
// Must be called with the queue locked
|
||||
RclMonEvent RclMonEventQueue::pop()
|
||||
{
|
||||
time_t now = time(0);
|
||||
MONDEB(("RclMonEventQueue::pop(), now %lu\n", (mttcast)now));
|
||||
|
||||
// Look at the delayed events, get rid of the expired/unactive
|
||||
// ones, possibly return an expired/needidx one.
|
||||
while (!m_data->m_delays.empty()) {
|
||||
delays_type::iterator dit = m_data->m_delays.begin();
|
||||
queue_type::iterator qit = *dit;
|
||||
MONDEB(("RclMonEventQueue::pop(): in delays: evt minclock %lu\n",
|
||||
(mttcast)qit->second.m_minclock));
|
||||
if (qit->second.m_minclock <= now) {
|
||||
if (qit->second.m_needidx) {
|
||||
RclMonEvent ev = qit->second;
|
||||
qit->second.m_minclock = time(0) + qit->second.m_itvsecs;
|
||||
qit->second.m_needidx = false;
|
||||
m_data->m_delays.erase(dit);
|
||||
m_data->delayInsert(qit);
|
||||
return ev;
|
||||
} else {
|
||||
// Delay elapsed without new update, get rid of event.
|
||||
m_data->m_dqueue.erase(qit);
|
||||
m_data->m_delays.erase(dit);
|
||||
}
|
||||
} else {
|
||||
// This and following events are for later processing, we
|
||||
// are done with the delayed event list.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Look for non-delayed event
|
||||
if (!m_data->m_iqueue.empty()) {
|
||||
queue_type::iterator qit = m_data->m_iqueue.begin();
|
||||
RclMonEvent ev = qit->second;
|
||||
m_data->m_iqueue.erase(qit);
|
||||
return ev;
|
||||
}
|
||||
|
||||
return RclMonEvent();
|
||||
}
|
||||
|
||||
// Add new event (update or delete) to the processing queue.
|
||||
// It seems that a newer event is always correct to override any
|
||||
// older. TBVerified ?
|
||||
// Some conf-designated files, supposedly updated at a high rate get
|
||||
// special processing to limit their reindexing rate.
|
||||
bool RclMonEventQueue::pushEvent(const RclMonEvent &ev)
|
||||
{
|
||||
MONDEB(("RclMonEventQueue::pushEvent for %s\n", ev.m_path.c_str()));
|
||||
lock();
|
||||
// It seems that a newer event is always correct to override any
|
||||
// older. TBVerified ?
|
||||
m_data->m_queue[ev.m_path] = ev;
|
||||
|
||||
DelayPat pat = m_data->searchDelayPats(ev.m_path);
|
||||
if (pat.seconds != 0) {
|
||||
// Using delayed reindex queue. Need to take care of minclock and also
|
||||
// insert into the in-minclock-order list
|
||||
queue_type::iterator qit = m_data->m_dqueue.find(ev.m_path);
|
||||
if (qit == m_data->m_dqueue.end()) {
|
||||
// Not there yet, insert new
|
||||
qit =
|
||||
m_data->m_dqueue.insert(queue_type::value_type(ev.m_path, ev)).first;
|
||||
// Set the time to next index to "now" as it has not been
|
||||
// indexed recently (otherwise it would still be in the
|
||||
// queue), and add the iterator to the delay queue.
|
||||
qit->second.m_minclock = time(0);
|
||||
qit->second.m_needidx = true;
|
||||
qit->second.m_itvsecs = pat.seconds;
|
||||
m_data->delayInsert(qit);
|
||||
} else {
|
||||
// Already in queue. Possibly update type but save minclock
|
||||
// (so no need to touch m_delays). Flag as needing indexing
|
||||
time_t saved_clock = qit->second.m_minclock;
|
||||
qit->second = ev;
|
||||
qit->second.m_minclock = saved_clock;
|
||||
qit->second.m_needidx = true;
|
||||
}
|
||||
} else {
|
||||
// Immediate event: just insert it, erasing any previously
|
||||
// existing entry
|
||||
m_data->m_iqueue[ev.m_path] = ev;
|
||||
}
|
||||
|
||||
pthread_cond_broadcast(&m_data->m_cond);
|
||||
unlock();
|
||||
return true;
|
||||
}
|
||||
|
||||
pthread_t rcv_thrid;
|
||||
|
||||
bool startMonitor(RclConfig *conf, int opts)
|
||||
{
|
||||
if (!conf->getConfParam("monauxinterval", &auxinterval))
|
||||
auxinterval = dfltauxinterval;
|
||||
if (!conf->getConfParam("monixinterval", &ixinterval))
|
||||
ixinterval = dfltixinterval;
|
||||
rclEQ.setConfig(conf);
|
||||
rclEQ.setopts(opts);
|
||||
|
||||
if (pthread_create(&rcv_thrid, 0, &rclMonRcvRun, &rclEQ) != 0) {
|
||||
LOGERR(("startMonitor: cant create event-receiving thread\n"));
|
||||
return false;
|
||||
@ -219,13 +453,11 @@ bool startMonitor(RclConfig *conf, int opts)
|
||||
return false;
|
||||
}
|
||||
LOGDEB(("start_monitoring: entering main loop\n"));
|
||||
|
||||
bool timedout;
|
||||
time_t lastauxtime = time(0);
|
||||
time_t lastixtime = lastauxtime;
|
||||
bool didsomething = false;
|
||||
const int auxinterval = 60 *60;
|
||||
const int ixinterval = 30;
|
||||
|
||||
list<string> modified;
|
||||
list<string> deleted;
|
||||
|
||||
@ -244,9 +476,11 @@ bool startMonitor(RclConfig *conf, int opts)
|
||||
}
|
||||
|
||||
// Process event queue
|
||||
while (!rclEQ.empty()) {
|
||||
for (;;) {
|
||||
// Retrieve event
|
||||
RclMonEvent ev = rclEQ.pop();
|
||||
if (ev.m_path.empty())
|
||||
break;
|
||||
switch (ev.m_etyp) {
|
||||
case RclMonEvent::RCLEVT_MODIFY:
|
||||
LOGDEB(("Monitor: Modify/Check on %s\n", ev.m_path.c_str()));
|
||||
@ -257,7 +491,7 @@ bool startMonitor(RclConfig *conf, int opts)
|
||||
deleted.push_back(ev.m_path);
|
||||
break;
|
||||
default:
|
||||
LOGDEB(("Monitor: got Other on %s\n", ev.m_path.c_str()));
|
||||
LOGDEB(("Monitor: got Other on [%s]\n", ev.m_path.c_str()));
|
||||
}
|
||||
}
|
||||
// Unlock queue before processing lists
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user