Execmd: added count parameter to receive(), and new getline() function

Netcon: fix receive() to properly handle the case where there is initially
   data in the line buffer
This commit is contained in:
dockes 2009-10-09 13:34:18 +00:00
parent 83409a1069
commit 6bfdc232d1
4 changed files with 97 additions and 43 deletions

View File

@ -51,7 +51,12 @@ static char rcsid[] = "@(#$Id: execmd.cpp,v 1.27 2008-10-06 06:22:47 dockes Exp
using namespace std; using namespace std;
#endif /* NO_NAMESPACES */ #endif /* NO_NAMESPACES */
#ifndef MAX
#define MAX(A,B) ((A) > (B) ? (A) : (B)) #define MAX(A,B) ((A) > (B) ? (A) : (B))
#endif
#ifndef MIN
#define MIN(A,B) ((A) < (B) ? (A) : (B))
#endif
/* From FreeBSD's which command */ /* From FreeBSD's which command */
static bool static bool
@ -373,19 +378,49 @@ int ExecCmd::send(const string& data)
return nwritten; return nwritten;
} }
int ExecCmd::receive(string& data) int ExecCmd::receive(string& data, int cnt)
{ {
NetconCli *con = dynamic_cast<NetconCli *>(m_fromcmd.getptr()); NetconCli *con = dynamic_cast<NetconCli *>(m_fromcmd.getptr());
if (con == 0) { if (con == 0) {
LOGERR(("ExecCmd::receive: outpipe is closed\n")); LOGERR(("ExecCmd::receive: inpipe is closed\n"));
return -1; return -1;
} }
char buf[8192]; const int BS = 4096;
int n = con->receive(buf, 8192); char buf[BS];
int ntot = 0;
do {
int toread = cnt > 0 ? MIN(cnt - ntot, BS) : BS;
int n = con->receive(buf, toread);
if (n < 0) {
LOGERR(("ExecCmd::receive: error\n"));
return -1;
} else if (n > 0) {
ntot += n;
data.append(buf, n);
} else {
LOGDEB(("ExecCmd::receive: got 0\n"));
break;
}
} while (cnt > 0 && ntot < cnt);
return ntot;
}
int ExecCmd::getline(string& data)
{
NetconCli *con = dynamic_cast<NetconCli *>(m_fromcmd.getptr());
if (con == 0) {
LOGERR(("ExecCmd::receive: inpipe is closed\n"));
return -1;
}
const int BS = 1024;
char buf[BS];
int n = con->getline(buf, BS);
if (n < 0) { if (n < 0) {
LOGERR(("ExecCmd::receive: error\n")); LOGERR(("ExecCmd::getline: error\n"));
} else if (n > 0) { } else if (n > 0) {
data.append(buf, n); data.append(buf, n);
} else {
LOGDEB(("ExecCmd::getline: got 0\n"));
} }
return n; return n;
} }
@ -395,7 +430,7 @@ int ExecCmd::wait(bool haderror)
{ {
ExecCmdRsrc e(this); ExecCmdRsrc e(this);
int status = -1; int status = -1;
if (!m_cancelRequest) { if (!m_cancelRequest && m_pid > 0) {
if (waitpid(m_pid, &status, 0) < 0) if (waitpid(m_pid, &status, 0) < 0)
status = -1; status = -1;
m_pid = -1; m_pid = -1;

View File

@ -134,9 +134,12 @@ class ExecCmd {
int startExec(const string &cmd, const list<string>& args, int startExec(const string &cmd, const list<string>& args,
bool has_input, bool has_output); bool has_input, bool has_output);
int send(const string& data); int send(const string& data);
int receive(string& data); int receive(string& data, int cnt = -1);
int getline(string& data);
int wait(bool haderror = false); int wait(bool haderror = false);
pid_t getChildPid() {return m_pid;}
/** /**
* Cancel/kill command. This can be called from another thread or * Cancel/kill command. This can be called from another thread or
* from the advise callback, which could also raise an exception to * from the advise callback, which could also raise an exception to

View File

@ -59,6 +59,7 @@ static const int zero = 0;
#define MILLIS(OLD, NEW) ( (long)(((NEW).tv_sec - (OLD).tv_sec) * 1000 + \ #define MILLIS(OLD, NEW) ( (long)(((NEW).tv_sec - (OLD).tv_sec) * 1000 + \
((NEW).tv_usec - (OLD).tv_usec) / 1000)) ((NEW).tv_usec - (OLD).tv_usec) / 1000))
// Static method
// Simplified interface to 'select()'. Only use one fd, for either // Simplified interface to 'select()'. Only use one fd, for either
// reading or writing. This is only used when not using the // reading or writing. This is only used when not using the
// selectloop() style of network i/o. // selectloop() style of network i/o.
@ -116,9 +117,9 @@ void Netcon::setperiodichandler(int (*handler)(void *), void *p, int ms)
gettimeofday(&lasthdlcall, 0); gettimeofday(&lasthdlcall, 0);
} }
// set the appropriate timeout so that the select call returns in time // Compute the appropriate timeout so that the select call returns in
// to call the periodic routine. // time to call the periodic routine.
void periodictimeout(struct timeval *tv) static void periodictimeout(struct timeval *tv)
{ {
// If periodic not set, the select call times out and we loop // If periodic not set, the select call times out and we loop
// after a very long time (we'd need to pass NULL to select for an // after a very long time (we'd need to pass NULL to select for an
@ -143,7 +144,7 @@ void periodictimeout(struct timeval *tv)
// Check if it's time to call the handler. selectloop will return to // Check if it's time to call the handler. selectloop will return to
// caller if it or we return 0 // caller if it or we return 0
int maybecallperiodic() static int maybecallperiodic()
{ {
if (periodicmillis <= 0) if (periodicmillis <= 0)
return 1; return 1;
@ -310,10 +311,11 @@ Netcon::~Netcon() {
void Netcon::closeconn() void Netcon::closeconn()
{ {
if (m_fd >= 0) { if (m_ownfd && m_fd >= 0) {
close(m_fd); close(m_fd);
m_fd = -1;
} }
m_fd = -1;
m_ownfd = true;
} }
char *Netcon::sterror() char *Netcon::sterror()
@ -424,16 +426,18 @@ int NetconData::receive(char *buf, int cnt, int timeo)
LOGERR(("NetconData::receive: connection not opened\n")); LOGERR(("NetconData::receive: connection not opened\n"));
return -1; return -1;
} }
int fromibuf = 0;
// Get whatever might have been left in the buffer by a previous // Get whatever might have been left in the buffer by a previous
// getline, except if we're called to fill the buffer of course // getline, except if we're called to fill the buffer of course
if (m_buf && m_bufbytes > 0 && (buf < m_buf || buf > m_buf + m_bufsize)) { if (m_buf && m_bufbytes > 0 && (buf < m_buf || buf > m_buf + m_bufsize)) {
int frombuf = MIN(m_bufbytes, cnt); fromibuf = MIN(m_bufbytes, cnt);
memcpy(buf, m_bufbase, frombuf); memcpy(buf, m_bufbase, fromibuf);
m_bufbytes -= frombuf; m_bufbytes -= fromibuf;
m_bufbase += frombuf; m_bufbase += fromibuf;
cnt -= frombuf; cnt -= fromibuf;
LOGDEB2(("NetconData::receive: transferred %d from mbuf\n", fromibuf));
if (cnt <= 0) if (cnt <= 0)
return frombuf; return fromibuf;
} }
if (timeo > 0) { if (timeo > 0) {
int ret = select1(m_fd, timeo); int ret = select1(m_fd, timeo);
@ -448,14 +452,13 @@ int NetconData::receive(char *buf, int cnt, int timeo)
} }
} }
m_didtimo = 0; m_didtimo = 0;
int flags = 0; if ((cnt = read(m_fd, buf + fromibuf, cnt)) < 0) {
if ((cnt = read(m_fd, buf, cnt)) < 0) {
char fdcbuf[10];sprintf(fdcbuf, "%d", m_fd); char fdcbuf[10];sprintf(fdcbuf, "%d", m_fd);
LOGSYSERR("NetconData::receive", "read", fdcbuf); LOGSYSERR("NetconData::receive", "read", fdcbuf);
return -1; return -1;
} }
LOGDEB2(("NetconData::receive: normal return, cnt %d\n", cnt)); LOGDEB2(("NetconData::receive: normal return, cnt %d\n", cnt));
return cnt; return fromibuf + cnt;
} }
// Receive exactly cnt bytes (except for timeout) // Receive exactly cnt bytes (except for timeout)
@ -510,9 +513,9 @@ int NetconData::getline(char *buf, int cnt, int timeo)
LOGDEB2(("Before loop, bufbytes %d, maxtransf %d, nn: %d\n", LOGDEB2(("Before loop, bufbytes %d, maxtransf %d, nn: %d\n",
m_bufbytes, maxtransf, nn)); m_bufbytes, maxtransf, nn));
for (nn = maxtransf; nn > 0;) { for (nn = maxtransf; nn > 0;) {
// This is not pretty but we want nn to be decremented for each // This is not pretty but we want nn to be decremented for
// byte copied (even newline), and not become -1 if we go to the end // each byte copied (even newline), and not become -1 if
// Better ways welcome! // we go to the end. Better ways welcome!
nn--; nn--;
if ((*cp++ = *m_bufbase++) == '\n') if ((*cp++ = *m_bufbase++) == '\n')
break; break;
@ -657,10 +660,10 @@ int NetconCli::openconn(const char *host, char *serv, int timeo)
int NetconCli::setconn(int fd) int NetconCli::setconn(int fd)
{ {
LOGDEB2(("Netconcli::setconn: fd %d\n", fd)); LOGDEB2(("Netconcli::setconn: fd %d\n", fd));
closeconn(); closeconn();
m_fd = fd; m_fd = fd;
m_ownfd = false;
setpeer(""); setpeer("");
return 0; return 0;

View File

@ -6,12 +6,15 @@
/// A set of classes to manage client-server communication over a /// A set of classes to manage client-server communication over a
/// connection-oriented network, or a pipe. /// connection-oriented network, or a pipe.
/// ///
/// Currently only uses TCP. The classes include client-side and /// The listening/connection-accepting code currently only uses
/// server-side (accepting) endpoints, and server-side code to handle /// TCP. The classes include client-side and server-side (accepting)
/// a set of client connections in parallel. /// endpoints. Netcon also has server-side static code to handle a set
/// of client connections in parallel. This should be moved to a
/// friend class.
/// ///
/// The client data transfer class can also be used for /// The client data transfer class can also be used for
/// timeout-protected/asynchronous io using a given fd. /// timeout-protected/asynchronous io using a given fd (ie a pipe
/// descriptor)
/// Base class for all network endpoints: /// Base class for all network endpoints:
class Netcon; class Netcon;
@ -21,7 +24,7 @@ class Netcon {
public: public:
enum Event {NETCONPOLL_READ = 0x1, NETCONPOLL_WRITE=0x2}; enum Event {NETCONPOLL_READ = 0x1, NETCONPOLL_WRITE=0x2};
Netcon() Netcon()
: m_peer(0), m_fd(-1), m_didtimo(0), m_wantedEvents(0) : m_peer(0), m_fd(-1), m_ownfd(true), m_didtimo(0), m_wantedEvents(0)
{} {}
virtual ~Netcon(); virtual ~Netcon();
/// Remember whom we're talking to. We let external code do this because /// Remember whom we're talking to. We let external code do this because
@ -65,7 +68,7 @@ public:
/// All the interface is static (independant of any given object). /// All the interface is static (independant of any given object).
/// Loop waiting for events on the connections and call the /// Loop waiting for events on the connections and call the
/// cando() method on the object where something happens (this will in /// cando() method on the object when something happens (this will in
/// turn typically call the app callback set on the netcon). Possibly /// turn typically call the app callback set on the netcon). Possibly
/// call the periodic handler (if set) at regular intervals. /// call the periodic handler (if set) at regular intervals.
/// @return -1 for error. 0 if no descriptors left for i/o. 1 for periodic /// @return -1 for error. 0 if no descriptors left for i/o. 1 for periodic
@ -89,19 +92,20 @@ public:
/// @param handler the function to be called. /// @param handler the function to be called.
/// - if it is 0, selectloop() will return after ms mS (and can be called /// - if it is 0, selectloop() will return after ms mS (and can be called
/// again /// again
/// - if it is not 0, it will be called at ms mS intervals. If its return /// - if it is not 0, it will be called at ms mS intervals. If its return
/// value is <= 0, selectloop will return. /// value is <= 0, selectloop will return.
/// @param clp client data to be passed to handler at every call. /// @param clp client data to be passed to handler at every call.
/// @param ms milliseconds interval between handler calls or before return. /// @param ms milliseconds interval between handler calls or
/// set ms to 0 for no periodic handler /// before return. Set to 0 for no periodic handler.
static void setperiodichandler(int (*handler)(void *), void *clp, int ms); static void setperiodichandler(int (*handler)(void *), void *clp, int ms);
protected: protected:
static bool o_selectloopDoReturn; static bool o_selectloopDoReturn;
static int o_selectloopReturnValue; static int o_selectloopReturnValue;
char *m_peer; // Name of the connected host char *m_peer; // Name of the connected host
int m_fd; int m_fd;
int m_didtimo; bool m_ownfd;
int m_didtimo;
// Used when part of the selectloop map. // Used when part of the selectloop map.
short m_wantedEvents; short m_wantedEvents;
// Method called by the selectloop when something can be done with a netcon // Method called by the selectloop when something can be done with a netcon
@ -125,7 +129,7 @@ public:
virtual int data(NetconData *con, Netcon::Event reason) = 0; virtual int data(NetconData *con, Netcon::Event reason) = 0;
}; };
/// Base class for connections that actually transfer data. /// Base class for connections that actually transfer data. T
class NetconData : public Netcon { class NetconData : public Netcon {
public: public:
NetconData() : m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0) NetconData() : m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0)
@ -142,7 +146,8 @@ public:
/// Read from the connection /// Read from the connection
/// @param buf the data buffer /// @param buf the data buffer
/// @param cnt the number of bytes we should try to read /// @param cnt the number of bytes we should try to read (but we return
/// as soon as we get data)
/// @param timeo maximum number of seconds we should be waiting for data. /// @param timeo maximum number of seconds we should be waiting for data.
/// @return the count of bytes actually read. 0 for timeout (call /// @return the count of bytes actually read. 0 for timeout (call
/// didtimo() to discriminate from EOF). -1 if an error occurred. /// didtimo() to discriminate from EOF). -1 if an error occurred.
@ -172,16 +177,24 @@ private:
class NetconCli : public NetconData { class NetconCli : public NetconData {
public: public:
NetconCli(int silent = 0) {m_silentconnectfailure = silent;} NetconCli(int silent = 0) {m_silentconnectfailure = silent;}
/// Open connection to specified host and named service. /// Open connection to specified host and named service.
int openconn(const char *host, char *serv, int timeo = -1); int openconn(const char *host, char *serv, int timeo = -1);
/// Open connection to specified host and numeric port. port is in /// Open connection to specified host and numeric port. port is in
/// HOST byte order /// HOST byte order
int openconn(const char *host, unsigned int port, int timeo = -1); int openconn(const char *host, unsigned int port, int timeo = -1);
/// Reuse existing fd. We DONT take ownership of the fd, and do no closin'
/// EVEN on an explicit closeconn() (use getfd(), close, setconn(-1)). /// Reuse existing fd.
/// We DONT take ownership of the fd, and do no closin' EVEN on an
/// explicit closeconn() or setconn() (use getfd(), close,
/// setconn(-1) if you need to really close the fd and have no
/// other copy).
int setconn(int fd); int setconn(int fd);
/// Do not log message if openconn() fails. /// Do not log message if openconn() fails.
void setSilentFail(int onoff) {m_silentconnectfailure = onoff;} void setSilentFail(int onoff) {m_silentconnectfailure = onoff;}
private: private:
int m_silentconnectfailure; // No logging of connection failures if set int m_silentconnectfailure; // No logging of connection failures if set
}; };