Cleaned up execmd/netcon to enable multiple exec without locking

This commit is contained in:
Jean-Francois Dockes 2011-04-29 15:01:14 +02:00
parent 4869dfa775
commit 7d2928022c
9 changed files with 175 additions and 132 deletions

View File

@ -17,6 +17,12 @@ autom4*
config.cache
config.log
config.status
doc/user/usermanual.aux
doc/user/usermanual.log
doc/user/usermanual.out
doc/user/usermanual.pdf
doc/user/usermanual.tex-pdf
doc/user/usermanual.tex-pdf-tmp
excludefile
kde/recoll_applet
lib/librcl.a

View File

@ -437,6 +437,7 @@ utils/netcon.cpp
utils/netcon.h
utils/pathut.cpp
utils/pathut.h
utils/ptmutex.h
utils/pxattr.cpp
utils/pxattr.h
utils/readfile.cpp

View File

@ -114,7 +114,7 @@ static void recollCleanup()
LOGDEB2(("recollCleanup: closing database\n"));
deleteZ(rcldb);
deleteZ(theconfig);
deleteZ(thestableconfig);
// deleteZ(thestableconfig);
#ifdef RCL_USE_ASPELL
deleteZ(aspell);
#endif

View File

@ -1,11 +1,11 @@
depth = ..
include $(depth)/mk/sysconf
PROGS = trcopyfile trcircache trmd5 trreadfile trfileudi trconftree \
wipedir smallut \
trfstreewalk trpathut \
transcode trbase64 \
trmimeparse trexecmd utf8iter idfile
PROGS = trnetcon trcopyfile trcircache trmd5 trreadfile trfileudi trconftree \
wipedir smallut \
trfstreewalk trpathut \
transcode trbase64 \
trmimeparse trexecmd utf8iter idfile
all: $(PROGS) $(BIGLIB)
@ -54,6 +54,12 @@ trpathut : $(PATHUT_OBJS)
trpathut.o : pathut.cpp pathut.h
$(CXX) -o trpathut.o -c $(ALL_CXXFLAGS) -DTEST_PATHUT pathut.cpp
NETCON_OBJS= trnetcon.o $(BIGLIB)
trnetcon : $(NETCON_OBJS)
$(CXX) $(ALL_CXXFLAGS) -o trnetcon $(NETCON_OBJS) $(LIBICONV) $(LIBSYS)
trnetcon.o : netcon.cpp netcon.h
$(CXX) -o trnetcon.o -c $(ALL_CXXFLAGS) -DTEST_NETCON netcon.cpp
FILEUDI_OBJS= trfileudi.o $(BIGLIB)
trfileudi : $(FILEUDI_OBJS)
$(CXX) $(ALL_CXXFLAGS) -o trfileudi $(FILEUDI_OBJS)

View File

@ -76,7 +76,7 @@ static bool inflateToDynBuf(void *inp, UINT inlen, void **outpp, UINT *outlenp);
// Entry header.
// 3 x 32 bits sizes as hex integers + 1 x 16 bits flag + at least 1 zero
// 15 + 3x(9) + 3 + 1 = 46
const char *headerformat = "circacheSizes = %x %x %x %hx";
static const char *headerformat = "circacheSizes = %x %x %x %hx";
#define CIRCACHE_HEADER_SIZE 64
class EntryHeaderData {

View File

@ -335,7 +335,11 @@ DebugLog *getdbl()
}
DebugLog *dbl;
if (!(dbl = (DebugLog *)pthread_getspecific(dbl_key))) {
dbl = new DebugLog;
if ((dbl = new DebugLog) == 0) {
fprintf(stderr, "debuglog: new DebugLog returned 0! ");
abort();
}
dbl->setwriter(theWriter);
initfiles();
status = pthread_setspecific(dbl_key, dbl);

View File

@ -313,16 +313,9 @@ private:
};
// The netcon selectloop that doexec() uses for reading/writing would
// be complicated to render thread-safe. Use locking to ensure only
// one thread in there
static PTMutexInit o_lock;
int ExecCmd::doexec(const string &cmd, const list<string>& args,
const string *input, string *output)
{
// Only one thread allowed in here...
PTMutexLocker locker(o_lock);
if (startExec(cmd, args, input != 0, output != 0) < 0) {
return -1;
@ -330,7 +323,7 @@ int ExecCmd::doexec(const string &cmd, const list<string>& args,
// Cleanup in case we return early
ExecCmdRsrc e(this);
SelectLoop myloop;
int ret = 0;
if (input || output) {
// Setup output
@ -342,7 +335,7 @@ int ExecCmd::doexec(const string &cmd, const list<string>& args,
}
oclicon->setcallback(RefCntr<NetconWorker>
(new ExecReader(output, m_advise)));
Netcon::addselcon(m_fromcmd, Netcon::NETCONPOLL_READ);
myloop.addselcon(m_fromcmd, Netcon::NETCONPOLL_READ);
// Give up ownership
m_fromcmd.release();
}
@ -355,14 +348,14 @@ int ExecCmd::doexec(const string &cmd, const list<string>& args,
}
iclicon->setcallback(RefCntr<NetconWorker>
(new ExecWriter(input, m_provide)));
Netcon::addselcon(m_tocmd, Netcon::NETCONPOLL_WRITE);
myloop.addselcon(m_tocmd, Netcon::NETCONPOLL_WRITE);
// Give up ownership
m_tocmd.release();
}
// Do the actual reading/writing/waiting
Netcon::setperiodichandler(0, 0, m_timeoutMs);
while ((ret = Netcon::selectloop()) > 0) {
myloop.setperiodichandler(0, 0, m_timeoutMs);
while ((ret = myloop.doLoop()) > 0) {
LOGDEB(("ExecCmd::doexec: selectloop returned %d\n", ret));
if (m_advise)
m_advise->newData(0);

View File

@ -83,46 +83,23 @@ int Netcon::select1(int fd, int timeo, int write)
return ret;
}
// The select loop mechanism allows several netcons to be used for io
// in a program without blocking as long as there is data to be read
// or written.
// Set by client callback to tell selectloop to return.
bool Netcon::o_selectloopDoReturn;
int Netcon::o_selectloopReturnValue;
// Other static data for the selectloop mecanism
// Could be declared as static members, but I don't see any advantage
// to it as all code in this file is in Netcon:: anyway.
// Map of NetconP indexed by fd
static map<int, NetconP> polldata;
// The last time we did the periodic thing
static struct timeval lasthdlcall;
// The call back function and its parameter
static int (*periodichandler)(void *);
static void *periodicparam;
// The periodic interval
static int periodicmillis;
void Netcon::setperiodichandler(int (*handler)(void *), void *p, int ms)
void SelectLoop::setperiodichandler(int (*handler)(void *), void *p, int ms)
{
periodichandler = handler;
periodicparam = p;
periodicmillis = ms;
if (periodicmillis > 0)
gettimeofday(&lasthdlcall, 0);
m_periodichandler = handler;
m_periodicparam = p;
m_periodicmillis = ms;
if (m_periodicmillis > 0)
gettimeofday(&m_lasthdlcall, 0);
}
// Compute the appropriate timeout so that the select call returns in
// time to call the periodic routine.
static void periodictimeout(struct timeval *tv)
void SelectLoop::periodictimeout(struct timeval *tv)
{
// If periodic not set, the select call times out and we loop
// after a very long time (we'd need to pass NULL to select for an
// infinite wait, and I'm too lazy to handle it)
if (periodicmillis <= 0) {
if (m_periodicmillis <= 0) {
tv->tv_sec = 10000;
tv->tv_usec = 0;
return;
@ -130,7 +107,7 @@ static void periodictimeout(struct timeval *tv)
struct timeval mtv;
gettimeofday(&mtv, 0);
int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
// millis <= 0 means we should have already done the thing. *dont* set the
// tv to 0, which means no timeout at all !
@ -142,50 +119,49 @@ static void periodictimeout(struct timeval *tv)
// Check if it's time to call the handler. selectloop will return to
// caller if it or we return 0
static int maybecallperiodic()
int SelectLoop::maybecallperiodic()
{
if (periodicmillis <= 0)
if (m_periodicmillis <= 0)
return 1;
struct timeval mtv;
gettimeofday(&mtv, 0);
int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
if (millis <= 0) {
gettimeofday(&lasthdlcall, 0);
if (periodichandler)
return periodichandler(periodicparam);
gettimeofday(&m_lasthdlcall, 0);
if (m_periodichandler)
return m_periodichandler(m_periodicparam);
else
return 0;
}
return 1;
}
int Netcon::selectloop()
int SelectLoop::doLoop()
{
static int placetostart;
for (;;) {
if (o_selectloopDoReturn) {
o_selectloopDoReturn = false;
if (m_selectloopDoReturn) {
m_selectloopDoReturn = false;
LOGDEB(("Netcon::selectloop: returning on request\n"));
return o_selectloopReturnValue;
return m_selectloopReturnValue;
}
int nfds;
fd_set rd, wd;
FD_ZERO(&rd);
FD_ZERO(&wd);
// Look for all descriptors in the map and set up the read and
// write fd_sets for select()
// Walk the netcon map and set up the read and write fd_sets
// for select()
nfds = 0;
for (map<int,NetconP>::iterator it = polldata.begin();
it != polldata.end(); it++) {
for (map<int,NetconP>::iterator it = m_polldata.begin();
it != m_polldata.end(); it++) {
NetconP &pll = it->second;
int fd = it->first;
LOGDEB2(("Selectloop: fd %d flags 0x%x\n",fd, pll->m_wantedEvents));
if (pll->m_wantedEvents & NETCONPOLL_READ) {
if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
FD_SET(fd, &rd);
nfds = MAX(nfds, fd + 1);
}
if (pll->m_wantedEvents & NETCONPOLL_WRITE) {
if (pll->m_wantedEvents & Netcon::NETCONPOLL_WRITE) {
FD_SET(fd, &wd);
nfds = MAX(nfds, fd + 1);
}
@ -199,7 +175,7 @@ int Netcon::selectloop()
// Just in case there would still be open fds in there
// (with no r/w flags set). Should not be needed, but safer
polldata.clear();
m_polldata.clear();
LOGDEB1(("Netcon::selectloop: no fds\n"));
return 0;
}
@ -217,7 +193,7 @@ int Netcon::selectloop()
LOGSYSERR("Netcon::selectloop", "select", "");
return -1;
}
if (periodicmillis > 0)
if (m_periodicmillis > 0)
if (maybecallperiodic() <= 0)
return 1;
@ -231,10 +207,10 @@ int Netcon::selectloop()
// map may change between 2 sweeps, so that we'd have to be smart
// with the iterator. As the cost per unused fd is low (just 2 bit
// flag tests), we keep it like this for now
if (placetostart >= nfds)
placetostart = 0;
if (m_placetostart >= nfds)
m_placetostart = 0;
int i, fd;
for (i = 0, fd = placetostart; i < nfds;i++, fd++) {
for (i = 0, fd = m_placetostart; i < nfds;i++, fd++) {
if (fd >= nfds)
fd = 0;
@ -247,53 +223,56 @@ int Netcon::selectloop()
if (none)
continue;
map<int,NetconP>::iterator it = polldata.find(fd);
if (it == polldata.end()) {
map<int,NetconP>::iterator it = m_polldata.find(fd);
if (it == m_polldata.end()) {
/// This should not happen actually
LOGDEB2(("Netcon::selectloop: fd %d not found\n", fd));
continue;
}
// Next start will be one beyond last serviced (modulo nfds)
placetostart = fd + 1;
m_placetostart = fd + 1;
NetconP &pll = it->second;
if (canread && pll->cando(NETCONPOLL_READ) <= 0)
pll->m_wantedEvents &= ~NETCONPOLL_READ;
if (canwrite && pll->cando(NETCONPOLL_WRITE) <= 0)
pll->m_wantedEvents &= ~NETCONPOLL_WRITE;
if (!(pll->m_wantedEvents & (NETCONPOLL_WRITE|NETCONPOLL_READ))) {
if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0)
pll->m_wantedEvents &= ~Netcon::NETCONPOLL_READ;
if (canwrite && pll->cando(Netcon::NETCONPOLL_WRITE) <= 0)
pll->m_wantedEvents &= ~Netcon::NETCONPOLL_WRITE;
if (!(pll->m_wantedEvents & (Netcon::NETCONPOLL_WRITE|Netcon::NETCONPOLL_READ))) {
LOGDEB0(("Netcon::selectloop: fd %d has 0x%x mask, erasing\n",
it->first, it->second->m_wantedEvents));
polldata.erase(it);
m_polldata.erase(it);
}
} // fd sweep
} // forever loop
LOGERR(("Netcon::selectloop: got out of loop !\n"));
LOGERR(("SelectLoop::doLoop: got out of loop !\n"));
return -1;
}
// Add a connection to the monitored set.
int Netcon::addselcon(NetconP con, int events)
int SelectLoop::addselcon(NetconP con, int events)
{
if (con.isNull()) return -1;
LOGDEB1(("Netcon::addselcon: fd %d\n", con->m_fd));
con->set_nonblock(1);
con->setselevents(events);
polldata[con->m_fd] = con;
m_polldata[con->m_fd] = con;
con->setloop(this);
return 0;
}
// Remove a connection from the monitored set.
int Netcon::remselcon(NetconP con)
int SelectLoop::remselcon(NetconP con)
{
if (con.isNull()) return -1;
LOGDEB1(("Netcon::remselcon: fd %d\n", con->m_fd));
map<int,NetconP>::iterator it = polldata.find(con->m_fd);
if (it == polldata.end()) {
map<int,NetconP>::iterator it = m_polldata.find(con->m_fd);
if (it == m_polldata.end()) {
LOGDEB1(("Netcon::remselcon: con not found for fd %d\n", con->m_fd));
return -1;
}
polldata.erase(it);
con->setloop(0);
m_polldata.erase(it);
return 0;
}
@ -969,7 +948,7 @@ int main(int argc, char **argv)
argv++;
}
DebugLog::setfilename("stderr");
DebugLog::getdbl()->setloglevel(DEBINFO);
DebugLog::getdbl()->setloglevel(DEBDEB2);
if (op_flags & OPT_c) {
if (argc != 2) {
@ -1032,7 +1011,8 @@ public:
}
if (m_count >= 10) {
fprintf(stderr, "Did 10, enough\n");
Netcon::selectloopReturn(0);
if (con->getloop())
con->getloop()->loopReturn(0);
}
return 0;
}
@ -1077,9 +1057,10 @@ int trycli(char *host, char *serv)
RefCntr<NetconWorker> worker =
RefCntr<NetconWorker>(new CliNetconWorker());
clicon->setcallback(worker);
Netcon::addselcon(con, Netcon::NETCONPOLL_WRITE);
SelectLoop myloop;
myloop.addselcon(con, Netcon::NETCONPOLL_WRITE);
fprintf(stderr, "client ready\n");
int ret = Netcon::selectloop();
int ret = myloop.doLoop();
if (ret < 0) {
fprintf(stderr, "selectloop failed\n");
exit(1);
@ -1130,6 +1111,11 @@ private:
};
class MyNetconServLis : public NetconServLis {
public:
MyNetconServLis(SelectLoop &loop)
: NetconServLis(), m_loop(loop)
{
}
protected:
int cando(Netcon::Event reason) {
NetconServCon *con = accept();
@ -1138,9 +1124,10 @@ protected:
RefCntr<NetconWorker> worker =
RefCntr<NetconWorker>(new ServNetconWorker());
con->setcallback(worker);
addselcon(NetconP(con), NETCONPOLL_READ);
m_loop.addselcon(NetconP(con), NETCONPOLL_READ);
return 1;
}
SelectLoop& m_loop;
};
NetconP lis;
@ -1158,7 +1145,8 @@ onexit(int sig)
int tryserv(char *serv)
{
signal(SIGCHLD, SIG_IGN);
MyNetconServLis *servlis = new MyNetconServLis();
SelectLoop myloop;
MyNetconServLis *servlis = new MyNetconServLis(myloop);
lis = NetconP(servlis);
if (lis.isNull()) {
fprintf(stderr, "new NetconServLis failed\n");
@ -1178,9 +1166,9 @@ int tryserv(char *serv)
fprintf(stderr, "openservice(%s) failed\n", serv);
return 1;
}
Netcon::addselcon(lis, Netcon::NETCONPOLL_READ);
myloop.addselcon(lis, Netcon::NETCONPOLL_READ);
fprintf(stderr, "openservice(%s) Ok\n", serv);
if (Netcon::selectloop() < 0) {
if (myloop.doLoop() < 0) {
fprintf(stderr, "selectloop failed\n");
exit(1);
}

View File

@ -16,8 +16,12 @@
* Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <sys/time.h>
#include <map>
#include "refcntr.h"
using std::map;
/// A set of classes to manage client-server communication over a
/// connection-oriented network, or a pipe.
///
@ -34,12 +38,14 @@
/// Base class for all network endpoints:
class Netcon;
typedef RefCntr<Netcon> NetconP;
class SelectLoop;
class Netcon {
public:
enum Event {NETCONPOLL_READ = 0x1, NETCONPOLL_WRITE=0x2};
Netcon()
: m_peer(0), m_fd(-1), m_ownfd(true), m_didtimo(0), m_wantedEvents(0)
: m_peer(0), m_fd(-1), m_ownfd(true), m_didtimo(0), m_wantedEvents(0),
m_loop(0)
{}
virtual ~Netcon();
/// Remember whom we're talking to. We let external code do this because
@ -75,12 +81,41 @@ public:
/// Clear events from current set
int clearselevents(int evs) {return m_wantedEvents &= ~evs;}
friend class SelectLoop;
SelectLoop *getloop() {return m_loop;}
/// Utility function for a simplified select() interface: check one fd
/// for reading or writing, for a specified maximum number of seconds.
static int select1(int fd, int secs, int writing = 0);
/// The selectloop interface is used to implement parallel servers.
/// All the interface is static (independant of any given object).
protected:
char *m_peer; // Name of the connected host
int m_fd;
bool m_ownfd;
int m_didtimo;
// Used when part of the selectloop map.
short m_wantedEvents;
SelectLoop *m_loop;
// Method called by the selectloop when something can be done with a netcon
virtual int cando(Netcon::Event reason) = 0;
// Called when added to loop
virtual void setloop(SelectLoop *loop) {m_loop = loop;}
};
/// The selectloop interface is used to implement parallel servers.
// The select loop mechanism allows several netcons to be used for io
// in a program without blocking as long as there is data to be read
// or written. In a multithread program which is also using select, it
// would typically make sense to have one SelectLoop active per
// thread.
class SelectLoop {
public:
SelectLoop()
: m_selectloopDoReturn(false), m_selectloopReturnValue(0),
m_placetostart(0),
m_periodichandler(0), m_periodicparam(0), m_periodicmillis(0)
{}
/// Loop waiting for events on the connections and call the
/// cando() method on the object when something happens (this will in
@ -88,20 +123,20 @@ public:
/// call the periodic handler (if set) at regular intervals.
/// @return -1 for error. 0 if no descriptors left for i/o. 1 for periodic
/// timeout (should call back in after processing)
static int selectloop();
int doLoop();
/// Call from data handler: make selectloop return the param value
static void selectloopReturn(int value)
void loopReturn(int value)
{
o_selectloopDoReturn = true;
o_selectloopReturnValue = value;
m_selectloopDoReturn = true;
m_selectloopReturnValue = value;
}
/// Add a connection to be monitored (this will usually be called
/// from the server's listen connection's accept callback)
static int addselcon(NetconP con, int);
/// Remove a connection from the monitored set. Note that this is
/// automatically called from the Netcon destructor, and when EOF is
/// detected on a connection.
static int remselcon(NetconP con);
int addselcon(NetconP con, int events);
/// Remove a connection from the monitored set. This is
/// automatically called when EOF is detected on a connection.
int remselcon(NetconP con);
/// Set a function to be called periodically, or a time before return.
/// @param handler the function to be called.
@ -112,35 +147,45 @@ public:
/// @param clp client data to be passed to handler at every call.
/// @param ms milliseconds interval between handler calls or
/// before return. Set to 0 for no periodic handler.
static void setperiodichandler(int (*handler)(void *), void *clp, int ms);
void setperiodichandler(int (*handler)(void *), void *clp, int ms);
protected:
static bool o_selectloopDoReturn;
static int o_selectloopReturnValue;
char *m_peer; // Name of the connected host
int m_fd;
bool m_ownfd;
int m_didtimo;
// Used when part of the selectloop map.
short m_wantedEvents;
// Method called by the selectloop when something can be done with a netcon
virtual int cando(Netcon::Event reason) = 0;
private:
// Set by client callback to tell selectloop to return.
bool m_selectloopDoReturn;
int m_selectloopReturnValue;
int m_placetostart;
// Map of NetconP indexed by fd
map<int, NetconP> m_polldata;
// The last time we did the periodic thing. Initialized by setperiodic()
struct timeval m_lasthdlcall;
// The call back function and its parameter
int (*m_periodichandler)(void *);
void *m_periodicparam;
// The periodic interval
int m_periodicmillis;
void periodictimeout(struct timeval *tv);
int maybecallperiodic();
};
///////////////////////
class NetconData;
/// Class for the application callback routine (when in
/// selectloop). It would be nicer to override cando() in a subclass
/// instead of setting a callback, but this can't be done conveniently
/// because accept() always creates a base NetconData (another way
/// would be to pass a factory function function to the listener, to create
/// NetconData derivatives).
/// Class for the application callback routine (when in selectloop).
///
/// This is set by the app on the NetconData by calling
/// setcallback(). It is then called from the NetconData's cando()
/// routine, itself called by selectloop.
///
/// It would be nicer to override cando() in a subclass instead of
/// setting a callback, but this can't be done conveniently because
/// accept() always creates a base NetconData (another approach would
/// be to pass a factory function to the listener, to create
/// NetconData derived classes).
class NetconWorker {
public:
virtual ~NetconWorker() {}
// NetconP holds a NetconData oeuf corse
virtual int data(NetconData *con, Netcon::Event reason) = 0;
};
@ -185,7 +230,7 @@ private:
int m_bufbytes; // Bytes of data.
int m_bufsize; // Total buffer size
RefCntr<NetconWorker> m_user;
virtual int cando(Netcon::Event reason);
virtual int cando(Netcon::Event reason); // Selectloop slot
};
/// Network endpoint, client side.