diff --git a/src/rcldb/rcldb.cpp b/src/rcldb/rcldb.cpp index 19cb5964..c19999ea 100644 --- a/src/rcldb/rcldb.cpp +++ b/src/rcldb/rcldb.cpp @@ -132,7 +132,8 @@ Db::Native::Native(Db *db) m_noversionwrite(false) #ifdef IDX_THREADS , m_wqueue("DbUpd", m_rcldb->m_config->getThrConf(RclConfig::ThrDbWrite).first), - m_totalworkns(0LL) + m_loglevel(4), + m_totalworkns(0LL), m_havewriteq(false) #endif // IDX_THREADS { LOGDEB1(("Native::Native: me %p\n", this)); @@ -142,7 +143,7 @@ Db::Native::~Native() { LOGDEB1(("Native::~Native: me %p\n", this)); #ifdef IDX_THREADS - if (m_haveWriteQ) { + if (m_havewriteq) { void *status = m_wqueue.setTerminateAndWait(); LOGDEB2(("Native::~Native: worker status %ld\n", long(status))); } @@ -186,7 +187,7 @@ void Db::Native::maybeStartThreads() { m_loglevel = DebugLog::getdbl()->getlevel(); - m_haveWriteQ = false; + m_havewriteq = false; const RclConfig *cnf = m_rcldb->m_config; int writeqlen = cnf->getThrConf(RclConfig::ThrDbWrite).first; int writethreads = cnf->getThrConf(RclConfig::ThrDbWrite).second; @@ -199,10 +200,10 @@ void Db::Native::maybeStartThreads() LOGERR(("Db::Db: Worker start failed\n")); return; } - m_haveWriteQ = true; + m_havewriteq = true; } LOGDEB(("RclDb:: threads: haveWriteQ %d, wqlen %d wqts %d\n", - m_haveWriteQ, writeqlen, writethreads)); + m_havewriteq, writeqlen, writethreads)); } #endif // IDX_THREADS @@ -500,6 +501,9 @@ bool Db::i_close(bool final) m_ndb->xwdb.set_metadata(cstr_RCL_IDX_VERSION_KEY, cstr_RCL_IDX_VERSION); LOGDEB(("Rcl::Db:close: xapian will close. May take some time\n")); } +#ifdef IDX_THREADS + waitUpdIdle(); +#endif // Used to do a flush here. Cant see why it should be necessary. deleteZ(m_ndb); if (w) @@ -1138,7 +1142,7 @@ bool Db::addOrUpdate(const string &udi, const string &parent_udi, Doc &doc) newdocument.set_data(record); #ifdef IDX_THREADS - if (m_ndb->m_haveWriteQ) { + if (m_ndb->m_havewriteq) { DbUpdTask *tp = new DbUpdTask(udi, uniterm, newdocument, doc.text.length()); if (!m_ndb->m_wqueue.put(tp)) { @@ -1163,7 +1167,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, // thread, we only need to protect the update map update below // (against interaction with threads calling needUpdate()). Else, // all threads from above need to synchronize here - PTMutexLocker lock(m_mutex, m_haveWriteQ); + PTMutexLocker lock(m_mutex, m_havewriteq); #endif // Check file system full every mbyte of indexed text. It's a bit wasteful @@ -1193,7 +1197,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, #ifdef IDX_THREADS // Need to protect against interaction with the up-to-date checks // which also update the existence map - PTMutexLocker lock(m_mutex, !m_haveWriteQ); + PTMutexLocker lock(m_mutex, !m_havewriteq); #endif if (did < m_rcldb->updated.size()) { m_rcldb->updated[did] = true; @@ -1229,7 +1233,7 @@ bool Db::Native::addOrUpdateWrite(const string& udi, const string& uniterm, #ifdef IDX_THREADS void Db::waitUpdIdle() { - if (m_ndb->m_haveWriteQ) { + if (m_ndb->m_iswritable && m_ndb->m_havewriteq) { Chrono chron; m_ndb->m_wqueue.waitIdle(); // We flush here just for correct measurement of the thread work time @@ -1415,12 +1419,12 @@ bool Db::purge() #ifdef IDX_THREADS // If we manage our own write queue, make sure it's drained and closed - if (m_ndb->m_haveWriteQ) + if (m_ndb->m_havewriteq) m_ndb->m_wqueue.setTerminateAndWait(); // else we need to lock out other top level threads. This is just // a precaution as they should have been waited for by the top // level actor at this point - PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); + PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq); #endif // IDX_THREADS // For xapian versions up to 1.0.1, deleting a non-existant @@ -1488,7 +1492,7 @@ bool Db::docExists(const string& uniterm) #ifdef IDX_THREADS // If we're not running our own (single) thread, need to protect // read db against multiaccess (e.g. from needUpdate(), or this method). - PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); + PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq); #endif string ermsg; @@ -1521,7 +1525,7 @@ bool Db::purgeFile(const string &udi, bool *existed) return true; #ifdef IDX_THREADS - if (m_ndb->m_haveWriteQ) { + if (m_ndb->m_havewriteq) { Xapian::Document xdoc; DbUpdTask *tp = new DbUpdTask(udi, uniterm, xdoc, (size_t)-1); if (!m_ndb->m_wqueue.put(tp)) { @@ -1542,7 +1546,7 @@ bool Db::purgeFileWrite(const string& udi, const string& uniterm) // If we have a write queue we're called from there, and single // threaded, no locking. Else need to mutex other threads from // above - PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_haveWriteQ); + PTMutexLocker lock(m_ndb->m_mutex, m_ndb->m_havewriteq); #endif // IDX_THREADS Xapian::WritableDatabase db = m_ndb->xwdb; diff --git a/src/rcldb/rcldb_p.h b/src/rcldb/rcldb_p.h index cdfda99f..94c38ca5 100644 --- a/src/rcldb/rcldb_p.h +++ b/src/rcldb/rcldb_p.h @@ -66,7 +66,7 @@ class Db::Native { int m_loglevel; PTMutexInit m_mutex; long long m_totalworkns; - bool m_haveWriteQ; + bool m_havewriteq; void maybeStartThreads(); #endif // IDX_THREADS