diff --git a/src/utils/netcon.cpp b/src/utils/netcon.cpp index 68158ac5..71b52fd4 100644 --- a/src/utils/netcon.cpp +++ b/src/utils/netcon.cpp @@ -67,9 +67,9 @@ using namespace std; static const int one = 1; static const int zero = 0; -#define LOGSYSERR(who, call, spar) \ - LOGERR((who) << ": " << (call) << "(" << (spar) << ") errno " << \ - (errno) << " (" << (strerror(errno)) << ")\n") +#define LOGSYSERR(who, call, spar) \ + LOGERR(who << ": " << call << "(" << spar << ") errno " << \ + errno << " (" << strerror(errno) << ")\n") #ifndef MIN #define MIN(a,b) ((a)<(b)?(a):(b)) @@ -186,7 +186,8 @@ int SelectLoop::doLoop() it != m_polldata.end(); it++) { NetconP& pll = it->second; int fd = it->first; - LOGDEB2("Selectloop: fd " << (fd) << " flags 0x" << (pll->m_wantedEvents) << "\n" ); + LOGDEB2("Selectloop: fd " << fd << " flags 0x" << + pll->m_wantedEvents << "\n"); if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) { FD_SET(fd, &rd); nfds = MAX(nfds, fd + 1); @@ -210,7 +211,7 @@ int SelectLoop::doLoop() return 0; } - LOGDEB2("Netcon::selectloop: selecting, nfds = " << (nfds) << "\n" ); + LOGDEB2("Netcon::selectloop: selecting, nfds = " << nfds << "\n"); // Compute the next timeout according to what might need to be // done apart from waiting for data @@ -362,6 +363,7 @@ int Netcon::settcpnodelay(int on) return 0; } + // Set/reset non-blocking flag on fd int Netcon::set_nonblock(int onoff) { @@ -379,23 +381,46 @@ int Netcon::set_nonblock(int onoff) ///////////////////////////////////////////////////////////////////// // Data socket (NetconData) methods +NetconData::NetconData(bool cancellable) + : m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0), m_wkfds{-1,-1} +{ + if (cancellable) { + if (pipe(m_wkfds) < 0) { + LOGSYSERR("NetconData::NetconData", "pipe", ""); + m_wkfds[0] = m_wkfds[1] = -1; + } + LOGDEB2("NetconData:: m_wkfds[0] " << m_wkfds[0] << " m_wkfds[1] " << + m_wkfds[1] << endl); + for (int i = 0; i < 2; i++) { + int flags = fcntl(m_wkfds[i], F_GETFL, 0); + fcntl(m_wkfds[i], F_SETFL, flags | O_NONBLOCK); + } + } +} + NetconData::~NetconData() { freeZ(m_buf); m_bufbase = 0; m_bufbytes = m_bufsize = 0; + for (int i = 0; i < 2; i++) { + if (m_wkfds[i] >= 0) { + close(m_wkfds[i]); + } + } } int NetconData::send(const char *buf, int cnt, int expedited) { - LOGDEB2("NetconData::send: fd " << (m_fd) << " cnt " << (cnt) << " expe " << (expedited) << "\n" ); + LOGDEB2("NetconData::send: fd " << m_fd << " cnt " << cnt << + " expe " << expedited << "\n"); int flag = 0; if (m_fd < 0) { LOGERR("NetconData::send: connection not opened\n" ); return -1; } if (expedited) { - LOGDEB2("NetconData::send: expedited data, count " << (cnt) << " bytes\n" ); + LOGDEB2("NetconData::send: expedited data, count " <= 0) { + LOGDEB2("NetconData::cancelReceive: writing to " << m_wkfds[1] << endl); + ::write(m_wkfds[1], "!", 1); } - return select1(m_fd, 0); -} - -// Test for writable -int NetconData::writeready() -{ - LOGDEB2("NetconData::writeready\n" ); - if (m_fd < 0) { - LOGERR("NetconData::writeready: connection not opened\n" ); - return -1; - } - return select1(m_fd, 0, 1); } // Receive at most cnt bytes (maybe less) int NetconData::receive(char *buf, int cnt, int timeo) { - LOGDEB2("NetconData::receive: cnt " << (cnt) << " timeo " << (timeo) << " m_buf 0x" << (m_buf) << " m_bufbytes " << (m_bufbytes) << "\n" ); + LOGDEB2("NetconData::receive: cnt " << cnt << " timeo " << timeo << + " m_buf 0x" << m_buf << " m_bufbytes " << m_bufbytes << "\n"); + if (m_fd < 0) { LOGERR("NetconData::receive: connection not opened\n" ); return -1; } + int fromibuf = 0; // Get whatever might have been left in the buffer by a previous // getline, except if we're called to fill the buffer of course @@ -455,31 +469,54 @@ int NetconData::receive(char *buf, int cnt, int timeo) m_bufbytes -= fromibuf; m_bufbase += fromibuf; cnt -= fromibuf; - LOGDEB2("NetconData::receive: transferred " << (fromibuf) << " from mbuf\n" ); + LOGDEB2("NetconData::receive: got " << fromibuf << " from mbuf\n"); if (cnt <= 0) { return fromibuf; } } + if (timeo > 0) { - int ret = select1(m_fd, timeo); - if (ret == 0) { - LOGDEB2("NetconData::receive timed out\n" ); - m_didtimo = 1; - return -1; + struct timeval tv; + tv.tv_sec = timeo; + tv.tv_usec = 0; + fd_set rd; + FD_ZERO(&rd); + FD_SET(m_fd, &rd); + bool cancellable = (m_wkfds[0] >= 0); + if (cancellable) { + LOGDEB2("NetconData::receive: cancel fd " << m_wkfds[0] << endl); + FD_SET(m_wkfds[0], &rd); } + int nfds = MAX(m_fd, m_wkfds[0]) + 1; + + int ret = select(nfds, &rd, 0, 0, &tv); + LOGDEB2("NetconData::receive: select returned " << ret << endl); + + if (cancellable && FD_ISSET(m_wkfds[0], &rd)) { + char b[100]; + read(m_wkfds[0], b, 100); + return Cancelled; + } + + if (!FD_ISSET(m_fd, &rd)) { + m_didtimo = 1; + return TimeoutOrError; + } + if (ret < 0) { LOGSYSERR("NetconData::receive", "select", ""); - return -1; + m_didtimo = 0; + return TimeoutOrError; } } + m_didtimo = 0; if ((cnt = read(m_fd, buf + fromibuf, cnt)) < 0) { - char fdcbuf[20]; - sprintf(fdcbuf, "%d", m_fd); - LOGSYSERR("NetconData::receive", "read", fdcbuf); + LOGSYSERR("NetconData::receive", "read", m_fd); return -1; } - LOGDEB2("NetconData::receive: normal return, cnt " << (cnt) << "\n" ); + LOGDEB2("NetconData::receive: normal return, fromibuf " << fromibuf << + " cnt " << cnt << "\n"); return fromibuf + cnt; } @@ -487,13 +524,13 @@ int NetconData::receive(char *buf, int cnt, int timeo) int NetconData::doreceive(char *buf, int cnt, int timeo) { int got, cur; - LOGDEB2("Netcon::doreceive: cnt " << (cnt) << ", timeo " << (timeo) << "\n" ); + LOGDEB2("Netcon::doreceive: cnt " << cnt << ", timeo " << timeo << "\n"); cur = 0; while (cnt > cur) { got = receive(buf, cnt - cur, timeo); LOGDEB2("Netcon::doreceive: got " << (got) << "\n" ); if (got < 0) { - return -1; + return got; } if (got == 0) { return cur; diff --git a/src/utils/netcon.h b/src/utils/netcon.h index 5de75561..471c90ae 100644 --- a/src/utils/netcon.h +++ b/src/utils/netcon.h @@ -215,8 +215,7 @@ public: /// Base class for connections that actually transfer data. T class NetconData : public Netcon { public: - NetconData() : m_buf(0), m_bufbase(0), m_bufbytes(0), m_bufsize(0) { - } + NetconData(bool cancellable = false); virtual ~NetconData(); /// Write data to the connection. @@ -232,18 +231,20 @@ public: /// @param cnt the number of bytes we should try to read (but we return /// as soon as we get data) /// @param timeo maximum number of seconds we should be waiting for data. - /// @return the count of bytes actually read. 0 for timeout (call - /// didtimo() to discriminate from EOF). -1 if an error occurred. + /// @return the count of bytes actually read (0 for EOF), or + /// TimeoutOrError (-1) for timeout or error (call timedout() to + /// discriminate and reset), Cancelled (-2) if cancelled. + enum RcvReason {Eof = 0, TimeoutOrError = -1, Cancelled = -2}; virtual int receive(char *buf, int cnt, int timeo = -1); + virtual void cancelReceive(); + /// Loop on receive until cnt bytes are actually read or a timeout occurs virtual int doreceive(char *buf, int cnt, int timeo = -1); - /// Check for data being available for reading - virtual int readready(); - /// Check for data being available for writing - virtual int writeready(); + /// Read a line of text on an ascii connection. Returns -1 or byte count /// including final 0. \n is kept virtual int getline(char *buf, int cnt, int timeo = -1); + /// Set handler to be called when the connection is placed in the /// selectloop and an event occurs. virtual void setcallback(std::shared_ptr user) { @@ -251,10 +252,14 @@ public: } private: + char *m_buf; // Buffer. Only used when doing getline()s char *m_bufbase; // Pointer to current 1st byte of useful data int m_bufbytes; // Bytes of data. int m_bufsize; // Total buffer size + + int m_wkfds[2]; + std::shared_ptr m_user; virtual int cando(Netcon::Event reason); // Selectloop slot }; @@ -262,8 +267,8 @@ private: /// Network endpoint, client side. class NetconCli : public NetconData { public: - NetconCli(int silent = 0) { - m_silentconnectfailure = silent; + NetconCli(bool cancellable = false) + : NetconData(cancellable), m_silentconnectfailure(false) { } /// Open connection to specified host and named service. Set host @@ -284,12 +289,12 @@ public: int setconn(int fd); /// Do not log message if openconn() fails. - void setSilentFail(int onoff) { + void setSilentFail(bool onoff) { m_silentconnectfailure = onoff; } private: - int m_silentconnectfailure; // No logging of connection failures if set + bool m_silentconnectfailure; // No logging of connection failures if set }; class NetconServCon;