merge common code

This commit is contained in:
Jean-Francois Dockes 2016-11-15 18:30:01 +01:00
parent b3c64e6ac4
commit 554ae238ec
2 changed files with 93 additions and 51 deletions

View File

@ -67,9 +67,9 @@ using namespace std;
static const int one = 1; static const int one = 1;
static const int zero = 0; static const int zero = 0;
#define LOGSYSERR(who, call, spar) \ #define LOGSYSERR(who, call, spar) \
LOGERR((who) << ": " << (call) << "(" << (spar) << ") errno " << \ LOGERR(who << ": " << call << "(" << spar << ") errno " << \
(errno) << " (" << (strerror(errno)) << ")\n") errno << " (" << strerror(errno) << ")\n")
#ifndef MIN #ifndef MIN
#define MIN(a,b) ((a)<(b)?(a):(b)) #define MIN(a,b) ((a)<(b)?(a):(b))
@ -186,7 +186,8 @@ int SelectLoop::doLoop()
it != m_polldata.end(); it++) { it != m_polldata.end(); it++) {
NetconP& pll = it->second; NetconP& pll = it->second;
int fd = it->first; int fd = it->first;
LOGDEB2("Selectloop: fd " << (fd) << " flags 0x" << (pll->m_wantedEvents) << "\n" ); LOGDEB2("Selectloop: fd " << fd << " flags 0x" <<
pll->m_wantedEvents << "\n");
if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) { if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
FD_SET(fd, &rd); FD_SET(fd, &rd);
nfds = MAX(nfds, fd + 1); nfds = MAX(nfds, fd + 1);
@ -210,7 +211,7 @@ int SelectLoop::doLoop()
return 0; return 0;
} }
LOGDEB2("Netcon::selectloop: selecting, nfds = " << (nfds) << "\n" ); LOGDEB2("Netcon::selectloop: selecting, nfds = " << nfds << "\n");
// Compute the next timeout according to what might need to be // Compute the next timeout according to what might need to be
// done apart from waiting for data // done apart from waiting for data
@ -362,6 +363,7 @@ int Netcon::settcpnodelay(int on)
return 0; return 0;
} }
// Set/reset non-blocking flag on fd // Set/reset non-blocking flag on fd
int Netcon::set_nonblock(int onoff) int Netcon::set_nonblock(int onoff)
{ {
@ -379,23 +381,46 @@ int Netcon::set_nonblock(int onoff)
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
// Data socket (NetconData) methods // Data socket (NetconData) methods
NetconData::NetconData(bool cancellable)
: m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0), m_wkfds{-1,-1}
{
if (cancellable) {
if (pipe(m_wkfds) < 0) {
LOGSYSERR("NetconData::NetconData", "pipe", "");
m_wkfds[0] = m_wkfds[1] = -1;
}
LOGDEB2("NetconData:: m_wkfds[0] " << m_wkfds[0] << " m_wkfds[1] " <<
m_wkfds[1] << endl);
for (int i = 0; i < 2; i++) {
int flags = fcntl(m_wkfds[i], F_GETFL, 0);
fcntl(m_wkfds[i], F_SETFL, flags | O_NONBLOCK);
}
}
}
NetconData::~NetconData() NetconData::~NetconData()
{ {
freeZ(m_buf); freeZ(m_buf);
m_bufbase = 0; m_bufbase = 0;
m_bufbytes = m_bufsize = 0; m_bufbytes = m_bufsize = 0;
for (int i = 0; i < 2; i++) {
if (m_wkfds[i] >= 0) {
close(m_wkfds[i]);
}
}
} }
int NetconData::send(const char *buf, int cnt, int expedited) int NetconData::send(const char *buf, int cnt, int expedited)
{ {
LOGDEB2("NetconData::send: fd " << (m_fd) << " cnt " << (cnt) << " expe " << (expedited) << "\n" ); LOGDEB2("NetconData::send: fd " << m_fd << " cnt " << cnt <<
" expe " << expedited << "\n");
int flag = 0; int flag = 0;
if (m_fd < 0) { if (m_fd < 0) {
LOGERR("NetconData::send: connection not opened\n" ); LOGERR("NetconData::send: connection not opened\n" );
return -1; return -1;
} }
if (expedited) { if (expedited) {
LOGDEB2("NetconData::send: expedited data, count " << (cnt) << " bytes\n" ); LOGDEB2("NetconData::send: expedited data, count " <<cnt << " bytes\n");
flag = MSG_OOB; flag = MSG_OOB;
} }
int ret; int ret;
@ -416,36 +441,25 @@ int NetconData::send(const char *buf, int cnt, int expedited)
return ret; return ret;
} }
// Test for data available void NetconData::cancelReceive()
int NetconData::readready()
{ {
LOGDEB2("NetconData::readready\n" ); if (m_wkfds[1] >= 0) {
if (m_fd < 0) { LOGDEB2("NetconData::cancelReceive: writing to " << m_wkfds[1] << endl);
LOGERR("NetconData::readready: connection not opened\n" ); ::write(m_wkfds[1], "!", 1);
return -1;
} }
return select1(m_fd, 0);
}
// Test for writable
int NetconData::writeready()
{
LOGDEB2("NetconData::writeready\n" );
if (m_fd < 0) {
LOGERR("NetconData::writeready: connection not opened\n" );
return -1;
}
return select1(m_fd, 0, 1);
} }
// Receive at most cnt bytes (maybe less) // Receive at most cnt bytes (maybe less)
int NetconData::receive(char *buf, int cnt, int timeo) int NetconData::receive(char *buf, int cnt, int timeo)
{ {
LOGDEB2("NetconData::receive: cnt " << (cnt) << " timeo " << (timeo) << " m_buf 0x" << (m_buf) << " m_bufbytes " << (m_bufbytes) << "\n" ); LOGDEB2("NetconData::receive: cnt " << cnt << " timeo " << timeo <<
" m_buf 0x" << m_buf << " m_bufbytes " << m_bufbytes << "\n");
if (m_fd < 0) { if (m_fd < 0) {
LOGERR("NetconData::receive: connection not opened\n" ); LOGERR("NetconData::receive: connection not opened\n" );
return -1; return -1;
} }
int fromibuf = 0; int fromibuf = 0;
// Get whatever might have been left in the buffer by a previous // Get whatever might have been left in the buffer by a previous
// getline, except if we're called to fill the buffer of course // getline, except if we're called to fill the buffer of course
@ -455,31 +469,54 @@ int NetconData::receive(char *buf, int cnt, int timeo)
m_bufbytes -= fromibuf; m_bufbytes -= fromibuf;
m_bufbase += fromibuf; m_bufbase += fromibuf;
cnt -= fromibuf; cnt -= fromibuf;
LOGDEB2("NetconData::receive: transferred " << (fromibuf) << " from mbuf\n" ); LOGDEB2("NetconData::receive: got " << fromibuf << " from mbuf\n");
if (cnt <= 0) { if (cnt <= 0) {
return fromibuf; return fromibuf;
} }
} }
if (timeo > 0) { if (timeo > 0) {
int ret = select1(m_fd, timeo); struct timeval tv;
if (ret == 0) { tv.tv_sec = timeo;
LOGDEB2("NetconData::receive timed out\n" ); tv.tv_usec = 0;
m_didtimo = 1; fd_set rd;
return -1; FD_ZERO(&rd);
FD_SET(m_fd, &rd);
bool cancellable = (m_wkfds[0] >= 0);
if (cancellable) {
LOGDEB2("NetconData::receive: cancel fd " << m_wkfds[0] << endl);
FD_SET(m_wkfds[0], &rd);
} }
int nfds = MAX(m_fd, m_wkfds[0]) + 1;
int ret = select(nfds, &rd, 0, 0, &tv);
LOGDEB2("NetconData::receive: select returned " << ret << endl);
if (cancellable && FD_ISSET(m_wkfds[0], &rd)) {
char b[100];
read(m_wkfds[0], b, 100);
return Cancelled;
}
if (!FD_ISSET(m_fd, &rd)) {
m_didtimo = 1;
return TimeoutOrError;
}
if (ret < 0) { if (ret < 0) {
LOGSYSERR("NetconData::receive", "select", ""); LOGSYSERR("NetconData::receive", "select", "");
return -1; m_didtimo = 0;
return TimeoutOrError;
} }
} }
m_didtimo = 0; m_didtimo = 0;
if ((cnt = read(m_fd, buf + fromibuf, cnt)) < 0) { if ((cnt = read(m_fd, buf + fromibuf, cnt)) < 0) {
char fdcbuf[20]; LOGSYSERR("NetconData::receive", "read", m_fd);
sprintf(fdcbuf, "%d", m_fd);
LOGSYSERR("NetconData::receive", "read", fdcbuf);
return -1; return -1;
} }
LOGDEB2("NetconData::receive: normal return, cnt " << (cnt) << "\n" ); LOGDEB2("NetconData::receive: normal return, fromibuf " << fromibuf <<
" cnt " << cnt << "\n");
return fromibuf + cnt; return fromibuf + cnt;
} }
@ -487,13 +524,13 @@ int NetconData::receive(char *buf, int cnt, int timeo)
int NetconData::doreceive(char *buf, int cnt, int timeo) int NetconData::doreceive(char *buf, int cnt, int timeo)
{ {
int got, cur; int got, cur;
LOGDEB2("Netcon::doreceive: cnt " << (cnt) << ", timeo " << (timeo) << "\n" ); LOGDEB2("Netcon::doreceive: cnt " << cnt << ", timeo " << timeo << "\n");
cur = 0; cur = 0;
while (cnt > cur) { while (cnt > cur) {
got = receive(buf, cnt - cur, timeo); got = receive(buf, cnt - cur, timeo);
LOGDEB2("Netcon::doreceive: got " << (got) << "\n" ); LOGDEB2("Netcon::doreceive: got " << (got) << "\n" );
if (got < 0) { if (got < 0) {
return -1; return got;
} }
if (got == 0) { if (got == 0) {
return cur; return cur;

View File

@ -215,8 +215,7 @@ public:
/// Base class for connections that actually transfer data. T /// Base class for connections that actually transfer data. T
class NetconData : public Netcon { class NetconData : public Netcon {
public: public:
NetconData() : m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0) { NetconData(bool cancellable = false);
}
virtual ~NetconData(); virtual ~NetconData();
/// Write data to the connection. /// Write data to the connection.
@ -232,18 +231,20 @@ public:
/// @param cnt the number of bytes we should try to read (but we return /// @param cnt the number of bytes we should try to read (but we return
/// as soon as we get data) /// as soon as we get data)
/// @param timeo maximum number of seconds we should be waiting for data. /// @param timeo maximum number of seconds we should be waiting for data.
/// @return the count of bytes actually read. 0 for timeout (call /// @return the count of bytes actually read (0 for EOF), or
/// didtimo() to discriminate from EOF). -1 if an error occurred. /// TimeoutOrError (-1) for timeout or error (call timedout() to
/// discriminate and reset), Cancelled (-2) if cancelled.
enum RcvReason {Eof = 0, TimeoutOrError = -1, Cancelled = -2};
virtual int receive(char *buf, int cnt, int timeo = -1); virtual int receive(char *buf, int cnt, int timeo = -1);
virtual void cancelReceive();
/// Loop on receive until cnt bytes are actually read or a timeout occurs /// Loop on receive until cnt bytes are actually read or a timeout occurs
virtual int doreceive(char *buf, int cnt, int timeo = -1); virtual int doreceive(char *buf, int cnt, int timeo = -1);
/// Check for data being available for reading
virtual int readready();
/// Check for data being available for writing
virtual int writeready();
/// Read a line of text on an ascii connection. Returns -1 or byte count /// Read a line of text on an ascii connection. Returns -1 or byte count
/// including final 0. \n is kept /// including final 0. \n is kept
virtual int getline(char *buf, int cnt, int timeo = -1); virtual int getline(char *buf, int cnt, int timeo = -1);
/// Set handler to be called when the connection is placed in the /// Set handler to be called when the connection is placed in the
/// selectloop and an event occurs. /// selectloop and an event occurs.
virtual void setcallback(std::shared_ptr<NetconWorker> user) { virtual void setcallback(std::shared_ptr<NetconWorker> user) {
@ -251,10 +252,14 @@ public:
} }
private: private:
char *m_buf; // Buffer. Only used when doing getline()s char *m_buf; // Buffer. Only used when doing getline()s
char *m_bufbase; // Pointer to current 1st byte of useful data char *m_bufbase; // Pointer to current 1st byte of useful data
int m_bufbytes; // Bytes of data. int m_bufbytes; // Bytes of data.
int m_bufsize; // Total buffer size int m_bufsize; // Total buffer size
int m_wkfds[2];
std::shared_ptr<NetconWorker> m_user; std::shared_ptr<NetconWorker> m_user;
virtual int cando(Netcon::Event reason); // Selectloop slot virtual int cando(Netcon::Event reason); // Selectloop slot
}; };
@ -262,8 +267,8 @@ private:
/// Network endpoint, client side. /// Network endpoint, client side.
class NetconCli : public NetconData { class NetconCli : public NetconData {
public: public:
NetconCli(int silent = 0) { NetconCli(bool cancellable = false)
m_silentconnectfailure = silent; : NetconData(cancellable), m_silentconnectfailure(false) {
} }
/// Open connection to specified host and named service. Set host /// Open connection to specified host and named service. Set host
@ -284,12 +289,12 @@ public:
int setconn(int fd); int setconn(int fd);
/// Do not log message if openconn() fails. /// Do not log message if openconn() fails.
void setSilentFail(int onoff) { void setSilentFail(bool onoff) {
m_silentconnectfailure = onoff; m_silentconnectfailure = onoff;
} }
private: private:
int m_silentconnectfailure; // No logging of connection failures if set bool m_silentconnectfailure; // No logging of connection failures if set
}; };
class NetconServCon; class NetconServCon;