From 5c4f6e48a5e170aa2ea399cb0e993a7eb3aa5f52 Mon Sep 17 00:00:00 2001 From: Jean-Francois Dockes Date: Mon, 1 Aug 2011 14:52:21 +0200 Subject: [PATCH] real time indexing: implement delaying for fast changing files --- src/doc/user/usermanual.sgml | 50 +++++- src/index/rclmon.h | 15 +- src/index/rclmonprc.cpp | 300 +++++++++++++++++++++++++++++++---- 3 files changed, 328 insertions(+), 37 deletions(-) diff --git a/src/doc/user/usermanual.sgml b/src/doc/user/usermanual.sgml index 448b51c3..72afc01e 100644 --- a/src/doc/user/usermanual.sgml +++ b/src/doc/user/usermanual.sgml @@ -683,6 +683,21 @@ fvwm your system is short on resources. Periodic indexing is adequate in most cases. + + Slowing down the reindexing rate for fast changing + files + + When using the real time monitor, it may happen that some + files need to be indexed, but change so often that they impose an + excessive load for the system. + + &RCL; provides a configuration option to specify the minimum + time before which a file, specified by a wildcard pattern, cannot be + reindexed. See the mondelaypatterns parameter in + the + configuration section. + + @@ -3561,7 +3576,6 @@ skippedPaths = ~/somedir/∗.txt - Miscellaneous parameters: @@ -3585,6 +3599,40 @@ skippedPaths = ~/somedir/∗.txt + + mondelaypatterns + This allows specify wildcard path patterns + (processed with fnmatch(3) with 0 flag), to match files which + change too often and for which a delay should be observed before + re-indexing. This is a space-separated list, each entry being a + pattern and a time in seconds, separated by a colon. You can + use double quotes if a path entry contains white + space. Example: + +mondelaypatterns = *.log:20 "this one has spaces*:10" + + + + + monixinterval + Minimum interval (seconds) for processing the + indexing queue. The real time monitor does not process each + event when it comes in, but will wait this time for the queue + to accumulate to diminish overhead and in order to aggregate + multiple events to the same file. Default 30 S. + + + + monauxinterval + Period (in seconds) at which the real time + monitor will regenerate the auxiliary databases (spelling, + stemming) if needed. The default is one hour. + + + + + + filtermaxseconds Maximum filter execution time, after which it is aborted. Some postscript programs just loop... diff --git a/src/index/rclmon.h b/src/index/rclmon.h index ed8339bb..649eddb9 100644 --- a/src/index/rclmon.h +++ b/src/index/rclmon.h @@ -29,7 +29,7 @@ * actually a hash map indexed by file path for easy coalescing of * multiple events to the same file. */ - +#include #include #include @@ -48,9 +48,18 @@ class RclMonEvent { enum EvType {RCLEVT_NONE, RCLEVT_MODIFY, RCLEVT_DELETE, RCLEVT_DIRCREATE}; string m_path; - string m_opath; EvType m_etyp; - RclMonEvent() : m_etyp(RCLEVT_NONE) {} + + ///// For fast changing files: minimum time interval before reindex + // Minimum interval (from config) + int m_itvsecs; + // Don't process this entry before: + time_t m_minclock; + // Changed since put in purgatory after reindex + bool m_needidx; + + RclMonEvent() : m_etyp(RCLEVT_NONE), + m_itvsecs(0), m_minclock(0), m_needidx(false) {} }; enum RclMonitorOption {RCLMON_NONE=0, RCLMON_NOFORK=1, RCLMON_NOX11=2}; diff --git a/src/index/rclmonprc.cpp b/src/index/rclmonprc.cpp index bd134a22..75ad40d5 100644 --- a/src/index/rclmonprc.cpp +++ b/src/index/rclmonprc.cpp @@ -20,7 +20,7 @@ /** * Recoll real time monitor processing. This file has the code to retrieve - * event from the event queue and do the database-side processing, and the + * event from the event queue and do the database-side processing. Also the * initialization function. */ @@ -29,9 +29,12 @@ #include #include #include +#include + #include #include #include +#include #include "debuglog.h" #include "rclmon.h" @@ -40,15 +43,90 @@ #include "pathut.h" #include "x11mon.h" +typedef unsigned long mttcast; + +static pthread_t rcv_thrid; + +// Seconds between auxiliary db (stem, spell) updates: +static const int dfltauxinterval = 60 *60; +static int auxinterval = dfltauxinterval; + +// Seconds between indexing queue processing: for merging events to +// fast changing files and saving some of the indexing overhead. +static const int dfltixinterval = 30; +static int ixinterval = dfltixinterval; + +static RclMonEventQueue rclEQ; + +// +// Delayed events: this is a special feature for fast changing files. +// A list of pattern/delays can be specified in the configuration so +// that they don't get re-indexed before some timeout is elapsed. Such +// events are kept on a separate queue (m_dqueue) with an auxiliary +// list in time-to-reindex order, while the normal events are on +// m_iqueue. + +// Queue management performance: on a typical recoll system there will +// be only a few entries on the event queues and no significant time +// will be needed to manage them. Even on a busy system, the time used +// would most probably be negligible compared to the actual processing +// of the indexing events. So this is just for reference. Let I be the +// number of immediate events and D the number of delayed ones, N +// stands for either. +// +// Periodic timeout polling: the recollindex process periodically (2S) +// wakes up to check for exit requests. At this time it also checks +// the queues for new entries (should not happen because the producer +// would normally wake up the consumer threads), or ready entries +// among the delayed ones. At this time it calls the "empty()" +// routine. This has constant time behaviour (checks for stl container +// emptiness and the top entry of the delays list). +// +// Adding a new event (pushEvent()): this performs a search for an +// existing event with the same path (O(log(N)), then an insert on the +// appropriate queue (O(log(N))) and an insert on the times list (O(D)). +// +// Popping an event: this is constant time as it just looks at the +// tops of the normal and delayed queues. + + +// Indexing event container: a map indexed by file path for fast +// insertion of duplicate events to the same file typedef map queue_type; +// Entries for delayed events are duplicated (as iterators) on an +// auxiliary, sorted by time-to-reindex list. We could get rid of +// this, the price would be that the RclEQ.empty() call would have to +// walk the whole queue instead of only looking at the first delays +// entry. +typedef list delays_type; + +// DelayPat stores a path wildcard pattern and a minimum time between +// reindexes, it is read from the recoll configuration +struct DelayPat { + string pattern; + int seconds; + DelayPat() : seconds(0) {} +}; + /** Private part of RclEQ: things that we don't wish to exist in the interface * include file. */ class RclEQData { public: int m_opts; - queue_type m_queue; + // Queue for normal files (unlimited reindex) + queue_type m_iqueue; + // Queue for delayed reindex files + queue_type m_dqueue; + // The delays list stores pointers (iterators) to elements on + // m_dqueue. The list is kept in time-to-index order. Elements of + // m_dqueue which are also in m_delays can only be deleted while + // walking m_delays, so we are certain that the m_dqueue iterators + // stored in m_delays remain valid. + delays_type m_delays; + // Configured intervals for path patterns, read from the configuration. + vector m_delaypats; RclConfig *m_config; bool m_ok; pthread_mutex_t m_mutex; @@ -59,9 +137,68 @@ public: if (!pthread_mutex_init(&m_mutex, 0) && !pthread_cond_init(&m_cond, 0)) m_ok = true; } + void readDelayPats(int dfltsecs); + DelayPat searchDelayPats(const string& path) + { + for (vector::iterator it = m_delaypats.begin(); + it != m_delaypats.end(); it++) { + if (fnmatch(it->pattern.c_str(), path.c_str(), 0) == 0) { + return *it; + } + } + return DelayPat(); + } + void delayInsert(const queue_type::iterator &qit); }; -static RclMonEventQueue rclEQ; +void RclEQData::readDelayPats(int dfltsecs) +{ + if (m_config == 0) + return; + string patstring; + if (!m_config->getConfParam("mondelaypatterns", patstring) || + patstring.empty()) + return; + + vector dplist; + if (!stringToStrings(patstring, dplist)) { + LOGERR(("rclEQData: bad pattern list: [%s]\n", patstring.c_str())); + return; + } + + for (vector::iterator it = dplist.begin(); + it != dplist.end(); it++) { + string::size_type pos = it->find_last_of(":"); + DelayPat dp; + dp.pattern = it->substr(0, pos); + if (pos != string::npos && pos != it->size()-1) { + dp.seconds = atoi(it->substr(pos+1).c_str()); + } else { + dp.seconds = dfltsecs; + } + m_delaypats.push_back(dp); + LOGDEB2(("rclmon::readDelayPats: add [%s] %d\n", + dp.pattern.c_str(), dp.seconds)); + } +} + +// Insert event (as queue iterator) into delays list, in time order, +// We DO NOT take care of duplicate qits. erase should be called first +// when necessary. +void RclEQData::delayInsert(const queue_type::iterator &qit) +{ + MONDEB(("RclEQData::delayInsert: minclock %lu\n", + (mttcast)qit->second.m_minclock)); + for (delays_type::iterator dit = m_delays.begin(); + dit != m_delays.end(); dit++) { + queue_type::iterator qit1 = *dit; + if ((*qit1).second.m_minclock > qit->second.m_minclock) { + m_delays.insert(dit, qit); + return; + } + } + m_delays.push_back(qit); +} RclMonEventQueue::RclMonEventQueue() { @@ -73,29 +210,13 @@ RclMonEventQueue::~RclMonEventQueue() delete m_data; } -bool RclMonEventQueue::empty() -{ - return m_data == 0 ? true : m_data->m_queue.empty(); -} - void RclMonEventQueue::setopts(int opts) { if (m_data) m_data->m_opts = opts; } -// Must be called with the queue locked -RclMonEvent RclMonEventQueue::pop() -{ - RclMonEvent ev; - if (!empty()) { - ev = m_data->m_queue.begin()->second; - m_data->m_queue.erase(m_data->m_queue.begin()); - } - return ev; -} - -/** Wait until there is something to process on the queue. +/** Wait until there is something to process on the queue, or timeout. * Must be called with the queue locked */ bool RclMonEventQueue::wait(int seconds, bool *top) @@ -145,6 +266,7 @@ bool RclMonEventQueue::lock() MONDEB(("RclMonEventQueue:: lock return\n")); return true; } + bool RclMonEventQueue::unlock() { MONDEB(("RclMonEventQueue:: unlock\n")); @@ -158,6 +280,9 @@ bool RclMonEventQueue::unlock() void RclMonEventQueue::setConfig(RclConfig *cnf) { m_data->m_config = cnf; + // Don't use ixinterval here, could be 0 ! Base the default + // delayed reindex delay on the default ixinterval delay + m_data->readDelayPats(10 * dfltixinterval); } RclConfig *RclMonEventQueue::getConfig() @@ -168,15 +293,15 @@ RclConfig *RclMonEventQueue::getConfig() bool RclMonEventQueue::ok() { if (m_data == 0) { - LOGDEB(("RclMonEventQueue: not ok: bad state\n")); + LOGINFO(("RclMonEventQueue: not ok: bad state\n")); return false; } if (stopindexing) { - LOGDEB(("RclMonEventQueue: not ok: stop request\n")); + LOGINFO(("RclMonEventQueue: not ok: stop request\n")); return false; } if (!m_data->m_ok) { - LOGDEB(("RclMonEventQueue: not ok: queue terminated\n")); + LOGINFO(("RclMonEventQueue: not ok: queue terminated\n")); return false; } return true; @@ -191,24 +316,133 @@ void RclMonEventQueue::setTerminate() unlock(); } +// Must be called with the queue locked +bool RclMonEventQueue::empty() +{ + if (m_data == 0) { + MONDEB(("RclMonEventQueue::empty(): true (m_data==0)\n")); + return true; + } + if (!m_data->m_iqueue.empty()) { + MONDEB(("RclMonEventQueue::empty(): false (m_iqueue not empty)\n")); + return true; + } + if (m_data->m_dqueue.empty()) { + MONDEB(("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n")); + return true; + } + // Only dqueue has events. Have to check the delays (only the + // first, earliest one): + queue_type::iterator qit = *(m_data->m_delays.begin()); + if (qit->second.m_minclock > time(0)) { + MONDEB(("RclMonEventQueue::empty(): true (no delay ready %lu)\n", + (mttcast)qit->second.m_minclock)); + return true; + } + MONDEB(("RclMonEventQueue::empty(): returning false (delay expired)\n")); + return false; +} + +// Retrieve indexing event for processing. Returns empty event if +// nothing interesting is found +// Must be called with the queue locked +RclMonEvent RclMonEventQueue::pop() +{ + time_t now = time(0); + MONDEB(("RclMonEventQueue::pop(), now %lu\n", (mttcast)now)); + + // Look at the delayed events, get rid of the expired/unactive + // ones, possibly return an expired/needidx one. + while (!m_data->m_delays.empty()) { + delays_type::iterator dit = m_data->m_delays.begin(); + queue_type::iterator qit = *dit; + MONDEB(("RclMonEventQueue::pop(): in delays: evt minclock %lu\n", + (mttcast)qit->second.m_minclock)); + if (qit->second.m_minclock <= now) { + if (qit->second.m_needidx) { + RclMonEvent ev = qit->second; + qit->second.m_minclock = time(0) + qit->second.m_itvsecs; + qit->second.m_needidx = false; + m_data->m_delays.erase(dit); + m_data->delayInsert(qit); + return ev; + } else { + // Delay elapsed without new update, get rid of event. + m_data->m_dqueue.erase(qit); + m_data->m_delays.erase(dit); + } + } else { + // This and following events are for later processing, we + // are done with the delayed event list. + break; + } + } + + // Look for non-delayed event + if (!m_data->m_iqueue.empty()) { + queue_type::iterator qit = m_data->m_iqueue.begin(); + RclMonEvent ev = qit->second; + m_data->m_iqueue.erase(qit); + return ev; + } + + return RclMonEvent(); +} + +// Add new event (update or delete) to the processing queue. +// It seems that a newer event is always correct to override any +// older. TBVerified ? +// Some conf-designated files, supposedly updated at a high rate get +// special processing to limit their reindexing rate. bool RclMonEventQueue::pushEvent(const RclMonEvent &ev) { MONDEB(("RclMonEventQueue::pushEvent for %s\n", ev.m_path.c_str())); lock(); - // It seems that a newer event is always correct to override any - // older. TBVerified ? - m_data->m_queue[ev.m_path] = ev; + + DelayPat pat = m_data->searchDelayPats(ev.m_path); + if (pat.seconds != 0) { + // Using delayed reindex queue. Need to take care of minclock and also + // insert into the in-minclock-order list + queue_type::iterator qit = m_data->m_dqueue.find(ev.m_path); + if (qit == m_data->m_dqueue.end()) { + // Not there yet, insert new + qit = + m_data->m_dqueue.insert(queue_type::value_type(ev.m_path, ev)).first; + // Set the time to next index to "now" as it has not been + // indexed recently (otherwise it would still be in the + // queue), and add the iterator to the delay queue. + qit->second.m_minclock = time(0); + qit->second.m_needidx = true; + qit->second.m_itvsecs = pat.seconds; + m_data->delayInsert(qit); + } else { + // Already in queue. Possibly update type but save minclock + // (so no need to touch m_delays). Flag as needing indexing + time_t saved_clock = qit->second.m_minclock; + qit->second = ev; + qit->second.m_minclock = saved_clock; + qit->second.m_needidx = true; + } + } else { + // Immediate event: just insert it, erasing any previously + // existing entry + m_data->m_iqueue[ev.m_path] = ev; + } + pthread_cond_broadcast(&m_data->m_cond); unlock(); return true; } -pthread_t rcv_thrid; - bool startMonitor(RclConfig *conf, int opts) { + if (!conf->getConfParam("monauxinterval", &auxinterval)) + auxinterval = dfltauxinterval; + if (!conf->getConfParam("monixinterval", &ixinterval)) + ixinterval = dfltixinterval; rclEQ.setConfig(conf); rclEQ.setopts(opts); + if (pthread_create(&rcv_thrid, 0, &rclMonRcvRun, &rclEQ) != 0) { LOGERR(("startMonitor: cant create event-receiving thread\n")); return false; @@ -219,13 +453,11 @@ bool startMonitor(RclConfig *conf, int opts) return false; } LOGDEB(("start_monitoring: entering main loop\n")); + bool timedout; time_t lastauxtime = time(0); time_t lastixtime = lastauxtime; bool didsomething = false; - const int auxinterval = 60 *60; - const int ixinterval = 30; - list modified; list deleted; @@ -244,9 +476,11 @@ bool startMonitor(RclConfig *conf, int opts) } // Process event queue - while (!rclEQ.empty()) { + for (;;) { // Retrieve event RclMonEvent ev = rclEQ.pop(); + if (ev.m_path.empty()) + break; switch (ev.m_etyp) { case RclMonEvent::RCLEVT_MODIFY: LOGDEB(("Monitor: Modify/Check on %s\n", ev.m_path.c_str())); @@ -257,7 +491,7 @@ bool startMonitor(RclConfig *conf, int opts) deleted.push_back(ev.m_path); break; default: - LOGDEB(("Monitor: got Other on %s\n", ev.m_path.c_str())); + LOGDEB(("Monitor: got Other on [%s]\n", ev.m_path.c_str())); } } // Unlock queue before processing lists