Slightly experimental change to speed up indexer exit after interrupt by closing up all queues.

This commit is contained in:
Jean-Francois Dockes 2021-11-24 19:46:56 +01:00
parent 5f2716e628
commit 02c9a6f3f7
5 changed files with 59 additions and 38 deletions

View File

@ -236,20 +236,12 @@ bool FsIndexer::index(int flags)
} }
} }
#ifdef IDX_THREADS shutdownQueues(walkok);
if (m_haveInternQ)
m_iwqueue.waitIdle();
if (m_haveSplitQ)
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
if (m_missing) { if (m_missing) {
string missing; string missing;
m_missing->getMissingDescription(missing); m_missing->getMissingDescription(missing);
if (!missing.empty()) { if (!missing.empty()) {
LOGINFO("FsIndexer::index missing helper program(s):\n" << LOGINFO("FsIndexer::index missing helper program(s):\n" << missing << "\n");
missing << "\n");
} }
m_config->storeMissingHelperDesc(missing); m_config->storeMissingHelperDesc(missing);
} }
@ -257,6 +249,29 @@ bool FsIndexer::index(int flags)
return walkok; return walkok;
} }
void FsIndexer::shutdownQueues(bool ok)
{
#ifdef IDX_THREADS
if (!ok) {
// Error or more probably interrupt. Discard everything for fast shutdown
if (m_haveInternQ) {
m_iwqueue.closeShop();
}
if (m_haveSplitQ) {
m_dwqueue.closeShop();
}
m_db->closeQueue();
}
if (m_haveInternQ) {
m_iwqueue.waitIdle();
}
if (m_haveSplitQ) {
m_dwqueue.waitIdle();
}
m_db->waitUpdIdle();
#endif // IDX_THREADS
}
static bool matchesSkipped( static bool matchesSkipped(
const vector<string>& tdl, FsTreeWalker& walker, const string& path) const vector<string>& tdl, FsTreeWalker& walker, const string& path)
{ {
@ -406,13 +421,7 @@ bool FsIndexer::indexFiles(list<string>& files, int flags)
ret = true; ret = true;
out: out:
#ifdef IDX_THREADS shutdownQueues(ret);
if (m_haveInternQ)
m_iwqueue.waitIdle();
if (m_haveSplitQ)
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
// Purge possible orphan documents // Purge possible orphan documents
if (ret == true) { if (ret == true) {
@ -461,13 +470,7 @@ bool FsIndexer::purgeFiles(list<string>& files)
ret = true; ret = true;
out: out:
#ifdef IDX_THREADS shutdownQueues(ret);
if (m_haveInternQ)
m_iwqueue.waitIdle();
if (m_haveSplitQ)
m_dwqueue.waitIdle();
m_db->waitUpdIdle();
#endif // IDX_THREADS
LOGDEB("FsIndexer::purgeFiles: done\n"); LOGDEB("FsIndexer::purgeFiles: done\n");
return ret; return ret;
} }

View File

@ -159,6 +159,7 @@ private:
processonefile(RclConfig *config, const string &fn, processonefile(RclConfig *config, const string &fn,
const struct PathStat *, const struct PathStat *,
const map<string,string>& localfields); const map<string,string>& localfields);
void shutdownQueues(bool);
}; };
#endif /* _fsindexer_h_included_ */ #endif /* _fsindexer_h_included_ */

View File

@ -1028,11 +1028,11 @@ bool Db::i_close(bool final)
bool w = m_ndb->m_iswritable; bool w = m_ndb->m_iswritable;
if (w) { if (w) {
#ifdef IDX_THREADS #ifdef IDX_THREADS
m_ndb->m_wqueue.closeShop();
waitUpdIdle(); waitUpdIdle();
#endif #endif
if (!m_ndb->m_noversionwrite) if (!m_ndb->m_noversionwrite)
m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION);
cstr_RCL_IDX_VERSION);
LOGDEB("Rcl::Db:close: xapian will close. May take some time\n"); LOGDEB("Rcl::Db:close: xapian will close. May take some time\n");
} }
deleteZ(m_ndb); deleteZ(m_ndb);
@ -1974,6 +1974,12 @@ bool Db::Native::docToXdocXattrOnly(TextSplitDb *splitter, const string &udi,
} }
#ifdef IDX_THREADS #ifdef IDX_THREADS
void Db::closeQueue()
{
if (m_ndb->m_iswritable && m_ndb->m_havewriteq) {
m_ndb->m_wqueue.closeShop();
}
}
void Db::waitUpdIdle() void Db::waitUpdIdle()
{ {
if (m_ndb->m_iswritable && m_ndb->m_havewriteq) { if (m_ndb->m_iswritable && m_ndb->m_havewriteq) {

View File

@ -335,6 +335,7 @@ public:
bool addOrUpdate(const string &udi, const string &parent_udi, Doc &doc); bool addOrUpdate(const string &udi, const string &parent_udi, Doc &doc);
#ifdef IDX_THREADS #ifdef IDX_THREADS
void closeQueue();
void waitUpdIdle(); void waitUpdIdle();
#endif #endif

View File

@ -84,6 +84,12 @@ public:
m_taskfreefunc = func; m_taskfreefunc = func;
} }
/// Forbid inputting new tasks. This is mostly useful for abnormal terminations as some data will
/// probably be lost, depending on how the upstream handles the put() error.
void closeShop() {
m_openforbusiness = false;
}
/** Start the worker threads. /** Start the worker threads.
* *
* @param nworkers number of threads copies to start. * @param nworkers number of threads copies to start.
@ -114,10 +120,12 @@ public:
*/ */
bool put(T t, bool flushprevious = false) { bool put(T t, bool flushprevious = false) {
std::unique_lock<std::mutex> lock(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) { if (!ok() || !m_openforbusiness) {
LOGERR("WorkQueue::put:" << m_name << ": !ok\n"); LOGERR("WorkQueue::put: " << m_name << ": ok: " << ok() << " openforbusiness " <<
m_openforbusiness << "\n");
return false; return false;
} }
LOGDEB2("WorkQueue::put: " << m_name << "\n");
while (ok() && m_high > 0 && m_queue.size() >= m_high) { while (ok() && m_high > 0 && m_queue.size() >= m_high) {
m_clientsleeps++; m_clientsleeps++;
@ -154,7 +162,7 @@ public:
/** Wait until the queue is inactive. Called from client. /** Wait until the queue is inactive. Called from client.
* *
* Waits until the task queue is empty and the workers are all * Waits until the task queue is empty and the workers are all
* back sleeping. Used by the client to wait for all current work * back sleeping (or exited). Used by the client to wait for all current work
* to be completed, when it needs to perform work that couldn't be * to be completed, when it needs to perform work that couldn't be
* done in parallel with the worker's tasks, or before shutting * done in parallel with the worker's tasks, or before shutting
* down. Work can be resumed after calling this. Note that the * down. Work can be resumed after calling this. Note that the
@ -169,15 +177,14 @@ public:
*/ */
bool waitIdle() { bool waitIdle() {
std::unique_lock<std::mutex> lock(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) { // We're not done while:
LOGINF("WorkQueue::waitIdle:" << m_name << ": queue already closed\n"); // - the queue is not empty and we have some workers left
return false; // - OR some workers are working (not exited or back waiting for a task).
} while (((m_queue.size() > 0 && m_workers_exited < m_worker_threads.size()) ||
(m_workers_waiting + m_workers_exited) < m_worker_threads.size())) {
// We're done when the queue is empty AND all workers are back LOGDEB0("waitIdle: " << m_name << " qsz " << m_queue.size() <<
// waiting for a task. " wwaiting " << m_workers_waiting << " wexit " << m_workers_exited << " nthr " <<
while (ok() && (m_queue.size() > 0 || m_worker_threads.size() << "\n");
m_workers_waiting != m_worker_threads.size())) {
m_clients_waiting++; m_clients_waiting++;
m_ccond.wait(lock); m_ccond.wait(lock);
m_clients_waiting--; m_clients_waiting--;
@ -352,6 +359,9 @@ private:
// Status // Status
bool m_ok; bool m_ok;
// Accepting new tasks
bool m_openforbusiness{true};
// Our threads. // Our threads.
std::list<Worker> m_worker_threads; std::list<Worker> m_worker_threads;