diff --git a/src/index/fsindexer.cpp b/src/index/fsindexer.cpp index 704f48fa..e89ce7ec 100644 --- a/src/index/fsindexer.cpp +++ b/src/index/fsindexer.cpp @@ -236,20 +236,12 @@ bool FsIndexer::index(int flags) } } -#ifdef IDX_THREADS - if (m_haveInternQ) - m_iwqueue.waitIdle(); - if (m_haveSplitQ) - m_dwqueue.waitIdle(); - m_db->waitUpdIdle(); -#endif // IDX_THREADS - + shutdownQueues(walkok); if (m_missing) { string missing; m_missing->getMissingDescription(missing); if (!missing.empty()) { - LOGINFO("FsIndexer::index missing helper program(s):\n" << - missing << "\n"); + LOGINFO("FsIndexer::index missing helper program(s):\n" << missing << "\n"); } m_config->storeMissingHelperDesc(missing); } @@ -257,6 +249,29 @@ bool FsIndexer::index(int flags) return walkok; } +void FsIndexer::shutdownQueues(bool ok) +{ +#ifdef IDX_THREADS + if (!ok) { + // Error or more probably interrupt. Discard everything for fast shutdown + if (m_haveInternQ) { + m_iwqueue.closeShop(); + } + if (m_haveSplitQ) { + m_dwqueue.closeShop(); + } + m_db->closeQueue(); + } + if (m_haveInternQ) { + m_iwqueue.waitIdle(); + } + if (m_haveSplitQ) { + m_dwqueue.waitIdle(); + } + m_db->waitUpdIdle(); +#endif // IDX_THREADS +} + static bool matchesSkipped( const vector& tdl, FsTreeWalker& walker, const string& path) { @@ -406,13 +421,7 @@ bool FsIndexer::indexFiles(list& files, int flags) ret = true; out: -#ifdef IDX_THREADS - if (m_haveInternQ) - m_iwqueue.waitIdle(); - if (m_haveSplitQ) - m_dwqueue.waitIdle(); - m_db->waitUpdIdle(); -#endif // IDX_THREADS + shutdownQueues(ret); // Purge possible orphan documents if (ret == true) { @@ -461,13 +470,7 @@ bool FsIndexer::purgeFiles(list& files) ret = true; out: -#ifdef IDX_THREADS - if (m_haveInternQ) - m_iwqueue.waitIdle(); - if (m_haveSplitQ) - m_dwqueue.waitIdle(); - m_db->waitUpdIdle(); -#endif // IDX_THREADS + shutdownQueues(ret); LOGDEB("FsIndexer::purgeFiles: done\n"); return ret; } diff --git a/src/index/fsindexer.h b/src/index/fsindexer.h index 4934a985..e6ef0101 100644 --- a/src/index/fsindexer.h +++ b/src/index/fsindexer.h @@ -159,6 +159,7 @@ private: processonefile(RclConfig *config, const string &fn, const struct PathStat *, const map& localfields); + void shutdownQueues(bool); }; #endif /* _fsindexer_h_included_ */ diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index af0882fe..557affcb 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -1028,11 +1028,11 @@ bool Db::i_close(bool final) bool w = m_ndb->m_iswritable; if (w) { #ifdef IDX_THREADS + m_ndb->m_wqueue.closeShop(); waitUpdIdle(); #endif if (!m_ndb->m_noversionwrite) - m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, - cstr_RCL_IDX_VERSION); + m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION); LOGDEB("Rcl::Db:close: xapian will close. May take some time\n"); } deleteZ(m_ndb); @@ -1974,6 +1974,12 @@ bool Db::Native::docToXdocXattrOnly(TextSplitDb *splitter, const string &udi, } #ifdef IDX_THREADS +void Db::closeQueue() +{ + if (m_ndb->m_iswritable && m_ndb->m_havewriteq) { + m_ndb->m_wqueue.closeShop(); + } +} void Db::waitUpdIdle() { if (m_ndb->m_iswritable && m_ndb->m_havewriteq) { diff --git a/src/rcldb/rcldb.h b/src/rcldb/rcldb.h index af8860b2..d96517eb 100644 --- a/src/rcldb/rcldb.h +++ b/src/rcldb/rcldb.h @@ -335,6 +335,7 @@ public: bool addOrUpdate(const string &udi, const string &parent_udi, Doc &doc); #ifdef IDX_THREADS + void closeQueue(); void waitUpdIdle(); #endif diff --git a/src/utils/workqueue.h b/src/utils/workqueue.h index 2a0a7fe7..b4fef8a5 100644 --- a/src/utils/workqueue.h +++ b/src/utils/workqueue.h @@ -84,6 +84,12 @@ public: m_taskfreefunc = func; } + /// Forbid inputting new tasks. This is mostly useful for abnormal terminations as some data will + /// probably be lost, depending on how the upstream handles the put() error. + void closeShop() { + m_openforbusiness = false; + } + /** Start the worker threads. * * @param nworkers number of threads copies to start. @@ -114,10 +120,12 @@ public: */ bool put(T t, bool flushprevious = false) { std::unique_lock lock(m_mutex); - if (!ok()) { - LOGERR("WorkQueue::put:" << m_name << ": !ok\n"); + if (!ok() || !m_openforbusiness) { + LOGERR("WorkQueue::put: " << m_name << ": ok: " << ok() << " openforbusiness " << + m_openforbusiness << "\n"); return false; } + LOGDEB2("WorkQueue::put: " << m_name << "\n"); while (ok() && m_high > 0 && m_queue.size() >= m_high) { m_clientsleeps++; @@ -154,7 +162,7 @@ public: /** Wait until the queue is inactive. Called from client. * * Waits until the task queue is empty and the workers are all - * back sleeping. Used by the client to wait for all current work + * back sleeping (or exited). Used by the client to wait for all current work * to be completed, when it needs to perform work that couldn't be * done in parallel with the worker's tasks, or before shutting * down. Work can be resumed after calling this. Note that the @@ -169,15 +177,14 @@ public: */ bool waitIdle() { std::unique_lock lock(m_mutex); - if (!ok()) { - LOGINF("WorkQueue::waitIdle:" << m_name << ": queue already closed\n"); - return false; - } - - // We're done when the queue is empty AND all workers are back - // waiting for a task. - while (ok() && (m_queue.size() > 0 || - m_workers_waiting != m_worker_threads.size())) { + // We're not done while: + // - the queue is not empty and we have some workers left + // - OR some workers are working (not exited or back waiting for a task). + while (((m_queue.size() > 0 && m_workers_exited < m_worker_threads.size()) || + (m_workers_waiting + m_workers_exited) < m_worker_threads.size())) { + LOGDEB0("waitIdle: " << m_name << " qsz " << m_queue.size() << + " wwaiting " << m_workers_waiting << " wexit " << m_workers_exited << " nthr " << + m_worker_threads.size() << "\n"); m_clients_waiting++; m_ccond.wait(lock); m_clients_waiting--; @@ -352,6 +359,9 @@ private: // Status bool m_ok; + // Accepting new tasks + bool m_openforbusiness{true}; + // Our threads. std::list m_worker_threads;