From e5a72da783d09ba2dd29b96bdfaff63e10018dfb Mon Sep 17 00:00:00 2001 From: Jean-Francois Dockes Date: Fri, 30 Jun 2017 15:24:42 +0200 Subject: [PATCH] Add kqueue-based version of netcon selectloop to work around select() issues on dragonfly bsd --- src/common/autoconfig.h.in | 7 +- src/configure.ac | 2 +- src/utils/netcon.cpp | 329 ++++++++++++++++++++++++++++++++----- src/utils/netcon.h | 93 ++++------- 4 files changed, 323 insertions(+), 108 deletions(-) diff --git a/src/common/autoconfig.h.in b/src/common/autoconfig.h.in index 75edc877..747dcc9d 100644 --- a/src/common/autoconfig.h.in +++ b/src/common/autoconfig.h.in @@ -30,6 +30,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_INTTYPES_H +/* Define to 1 if you have the `kqueue' function. */ +#undef HAVE_KQUEUE + /* Define to 1 if you have the `dl' library (-ldl). */ #undef HAVE_LIBDL @@ -45,8 +48,8 @@ /* Define to 1 if you have the `mkdtemp' function. */ #undef HAVE_MKDTEMP -/* Define to 1 if you have the `posix_spawn,' function. */ -#undef HAVE_POSIX_SPAWN_ +/* Define to 1 if you have the `posix_spawn' function. */ +#undef HAVE_POSIX_SPAWN /* Define to 1 if you have the `setrlimit' function. */ #undef HAVE_SETRLIMIT diff --git a/src/configure.ac b/src/configure.ac index b9db9383..2894c670 100644 --- a/src/configure.ac +++ b/src/configure.ac @@ -37,7 +37,7 @@ AC_SYS_LARGEFILE # OpenBSD needs sys/param.h for mount.h to compile AC_CHECK_HEADERS([sys/param.h, spawn.h]) -AC_CHECK_FUNCS([posix_spawn, setrlimit]) +AC_CHECK_FUNCS([posix_spawn setrlimit kqueue]) if test "x$ac_cv_func_posix_spawn" = xyes; then : AC_ARG_ENABLE(posix_spawn, diff --git a/src/utils/netcon.cpp b/src/utils/netcon.cpp index 7f9e8e86..cd3840ed 100644 --- a/src/utils/netcon.cpp +++ b/src/utils/netcon.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #ifdef _AIX #include @@ -36,13 +37,17 @@ #include #include -#include #include #include #include #include #include #include +#ifdef HAVE_KQUEUE +#include +#include +#include +#endif #include @@ -55,7 +60,7 @@ using namespace std; #endif // Size of path buffer in sockaddr_un (AF_UNIX socket -// addr). Mysteriously it's 108 (explicit value) under linux, no +// addr). Mysteriously it is 108 (explicit value) under linux, no // define accessible. Let's take a little margin as it appears that // some systems use 92. I believe we could also malloc a variable size // struct but why bother. @@ -81,8 +86,8 @@ static const int zero = 0; #define freeZ(X) if (X) {free(X);X=0;} #endif -#define MILLIS(OLD, NEW) ( (long)(((NEW).tv_sec - (OLD).tv_sec) * 1000 + \ - ((NEW).tv_usec - (OLD).tv_usec) / 1000)) +#define MILLIS(OLD, NEW) ( (uint64_t((NEW).tv_sec) - (OLD).tv_sec) * 1000 + \ + ((NEW).tv_usec - (OLD).tv_usec) / 1000 ) // Static method // Simplified interface to 'select()'. Only use one fd, for either @@ -109,24 +114,87 @@ int Netcon::select1(int fd, int timeo, int write) return ret; } + +/////////////////////////////////////////// +// SelectLoop + +class SelectLoop::Internal { +public: + Internal() { +#ifdef HAVE_KQUEUE + if ((kq = kqueue()) == -1) { + LOGSYSERR("Netcon::selectloop", "kqueue", ""); + } +#endif + } + + ~Internal() { +#ifdef HAVE_KQUEUE + if (kq >= 0) + close(kq); +#endif + } + + // Set by client callback to tell selectloop to return. + bool selectloopDoReturn{false}; + int selectloopReturnValue{0}; + int placetostart{0}; + + // Map of NetconP indexed by fd + map polldata; +#ifdef HAVE_KQUEUE + int kq{-1}; +#endif + // The last time we did the periodic thing. Initialized by setperiodic() + struct timeval lasthdlcall; + + // The call back function and its parameter + int (*periodichandler)(void *){0}; + void *periodicparam{0}; + // The periodic interval + int periodicmillis{0}; + + void periodictimeout(struct timeval *tv); + void periodictimeout(struct timespec *ts); + int maybecallperiodic(); + int setselevents(int fd, int events); + int setselevents(NetconP& con, int events); +}; + +SelectLoop::SelectLoop() +{ + m = new Internal; +} + +SelectLoop::~SelectLoop() +{ + delete m; +} + +void SelectLoop::loopReturn(int value) +{ + m->selectloopDoReturn = true; + m->selectloopReturnValue = value; +} + void SelectLoop::setperiodichandler(int (*handler)(void *), void *p, int ms) { - m_periodichandler = handler; - m_periodicparam = p; - m_periodicmillis = ms; - if (m_periodicmillis > 0) { - gettimeofday(&m_lasthdlcall, 0); + m->periodichandler = handler; + m->periodicparam = p; + m->periodicmillis = ms; + if (m->periodicmillis > 0) { + gettimeofday(&m->lasthdlcall, 0); } } // Compute the appropriate timeout so that the select call returns in // time to call the periodic routine. -void SelectLoop::periodictimeout(struct timeval *tv) +void SelectLoop::Internal::periodictimeout(struct timeval *tv) { // If periodic not set, the select call times out and we loop // after a very long time (we'd need to pass NULL to select for an // infinite wait, and I'm too lazy to handle it) - if (m_periodicmillis <= 0) { + if (periodicmillis <= 0) { tv->tv_sec = 10000; tv->tv_usec = 0; return; @@ -134,7 +202,7 @@ void SelectLoop::periodictimeout(struct timeval *tv) struct timeval mtv; gettimeofday(&mtv, 0); - int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv); + int millis = periodicmillis - MILLIS(lasthdlcall, mtv); // millis <= 0 means we should have already done the thing. *dont* set the // tv to 0, which means no timeout at all ! @@ -145,20 +213,31 @@ void SelectLoop::periodictimeout(struct timeval *tv) tv->tv_usec = (millis % 1000) * 1000; } -// Check if it's time to call the handler. selectloop will return to -// caller if it or we return 0 -int SelectLoop::maybecallperiodic() +void SelectLoop::Internal::periodictimeout(struct timespec *ts) { - if (m_periodicmillis <= 0) { + struct timeval tv; + periodictimeout(&tv); + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000; +} + + +// Check if it's time to call the handler. selectloop will return to +// caller if either we or the handler return 0 +int SelectLoop::Internal::maybecallperiodic() +{ + if (periodicmillis <= 0) { return 1; } + struct timeval mtv; gettimeofday(&mtv, 0); - int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv); + int millis = periodicmillis - MILLIS(lasthdlcall, mtv); + if (millis <= 0) { - gettimeofday(&m_lasthdlcall, 0); - if (m_periodichandler) { - return m_periodichandler(m_periodicparam); + lasthdlcall = mtv; + if (periodichandler) { + return periodichandler(periodicparam); } else { return 0; } @@ -166,14 +245,17 @@ int SelectLoop::maybecallperiodic() return 1; } +#ifndef HAVE_KQUEUE + int SelectLoop::doLoop() { for (;;) { - if (m_selectloopDoReturn) { - m_selectloopDoReturn = false; + if (m->selectloopDoReturn) { + m->selectloopDoReturn = false; LOGDEB("Netcon::selectloop: returning on request\n"); - return m_selectloopReturnValue; + return m->selectloopReturnValue; } + int nfds; fd_set rd, wd; FD_ZERO(&rd); @@ -182,10 +264,9 @@ int SelectLoop::doLoop() // Walk the netcon map and set up the read and write fd_sets // for select() nfds = 0; - for (map::iterator it = m_polldata.begin(); - it != m_polldata.end(); it++) { - NetconP& pll = it->second; - int fd = it->first; + for (auto& entry : m->polldata) { + NetconP& pll = entry.second; + int fd = entry.first; LOGDEB2("Selectloop: fd " << fd << " flags 0x" << pll->m_wantedEvents << "\n"); if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) { @@ -206,7 +287,7 @@ int SelectLoop::doLoop() // Just in case there would still be open fds in there // (with no r/w flags set). Should not be needed, but safer - m_polldata.clear(); + m->polldata.clear(); LOGDEB1("Netcon::selectloop: no fds\n"); return 0; } @@ -216,7 +297,7 @@ int SelectLoop::doLoop() // Compute the next timeout according to what might need to be // done apart from waiting for data struct timeval tv; - periodictimeout(&tv); + m->periodictimeout(&tv); // Wait for something to happen int ret = select(nfds, &rd, &wd, 0, &tv); LOGDEB2("Netcon::selectloop: nfds " << nfds << @@ -225,7 +306,7 @@ int SelectLoop::doLoop() LOGSYSERR("Netcon::selectloop", "select", ""); return -1; } - if (m_periodicmillis > 0 && maybecallperiodic() <= 0) { + if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) { return 1; } @@ -242,12 +323,12 @@ int SelectLoop::doLoop() // map may change between 2 sweeps, so that we'd have to be smart // with the iterator. As the cost per unused fd is low (just 2 bit // flag tests), we keep it like this for now - if (m_placetostart >= nfds) { - m_placetostart = 0; + if (m->placetostart >= nfds) { + m->placetostart = 0; } int i, fd; int activefds = 0; - for (i = 0, fd = m_placetostart; i < nfds; i++, fd++) { + for (i = 0, fd = m->placetostart; i < nfds; i++, fd++) { if (fd >= nfds) { fd = 0; } @@ -263,8 +344,8 @@ int SelectLoop::doLoop() continue; } - map::iterator it = m_polldata.find(fd); - if (it == m_polldata.end()) { + auto it = m->polldata.find(fd); + if (it == m->polldata.end()) { // This should never happen, because we only set our // own fds in the mask ! LOGERR("Netcon::selectloop: fd " << fd << " not found\n"); @@ -272,7 +353,7 @@ int SelectLoop::doLoop() } activefds++; // Next start will be one beyond last serviced (modulo nfds) - m_placetostart = fd + 1; + m->placetostart = fd + 1; NetconP& pll = it->second; if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) { @@ -285,7 +366,7 @@ int SelectLoop::doLoop() (Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) { LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x" << it->second->m_wantedEvents << " mask, erasing\n"); - m_polldata.erase(it); + m->polldata.erase(it); } } // fd sweep @@ -299,7 +380,155 @@ int SelectLoop::doLoop() return -1; } -// Add a connection to the monitored set. +#else // -> Using kqueue: use select() + +int SelectLoop::doLoop() +{ + for (;;) { + if (m->selectloopDoReturn) { + m->selectloopDoReturn = false; + LOGDEB("Netcon::selectloop: returning on request\n"); + return m->selectloopReturnValue; + } + + // Check that we do have something to wait for. + int nfds = 0; + for (auto& entry : m->polldata) { + NetconP& pll = entry.second; + if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) { + nfds++; + } else if (pll->m_wantedEvents & Netcon::NETCONPOLL_WRITE) { + nfds++; + } + } + if (nfds == 0) { + // This should never happen in a server as we should at least + // always monitor the main listening server socket. For a + // client, it's up to client code to avoid or process this + // condition. + + // Just in case there would still be open fds in there + // (with no r/w flags set). Should not be needed, but safer + m->polldata.clear(); + LOGDEB1("Netcon::selectloop: no fds\n"); + return 0; + } + + // Compute the next timeout according to what might need to be + // done apart from waiting for data + struct timespec ts; + m->periodictimeout(&ts); + // Wait for something to happen + vector events; + events.resize(nfds); + LOGDEB("Netcon::selectloop: kevent(), nfds = " << nfds << "\n"); + int ret = kevent(m->kq, 0, 0, &events[0], events.size(), &ts); + LOGDEB("Netcon::selectloop: nfds " << nfds << + " kevent returns " << ret << "\n"); + if (ret < 0) { + LOGSYSERR("Netcon::selectloop", "kevent", ""); + return -1; + } + if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) { + return 1; + } + if (ret == 0) { + // Timeout, do it again. + continue; + } + + for (int i = 0; i < ret; i++) { + struct kevent& ev = events[i]; + if (ev.flags & EV_ERROR) { + LOGSYSERR("Netcon::selectLoop", "kevent", ""); + LOGERR("Netcon::selectLoop: event error: " << + strerror(ev.data)); + return -1; + } + int canread = ev.filter == EVFILT_READ; + int canwrite = ev.filter == EVFILT_WRITE; + bool none = !canread && !canwrite; + LOGDEB("Netcon::selectloop: fd " << int(ev.ident) << " " << + (none ? "blocked" : "can") << " " << + (canread ? "read" : "") << " " << + (canwrite ? "write" : "") << "\n"); + if (none) { + LOGERR("Kevent returned unknown filter " << ev.filter <polldata.find(int(ev.ident)); + if (it == m->polldata.end()) { + LOGERR("Netcon::selectloop: fd " << int(ev.ident) << + " not found\n"); + continue; + } + NetconP& pll = it->second; + if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) { + pll->setselevents(pll->getselevents() & + ~Netcon::NETCONPOLL_READ); + } + if (canwrite && pll->cando(Netcon::NETCONPOLL_WRITE) <= 0) { + pll->setselevents(pll->getselevents() & + ~Netcon::NETCONPOLL_WRITE); + } + if (!(pll->getselevents() & + (Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) { + LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x" + << it->second->getselevents() << " mask, erasing\n"); + m->polldata.erase(it); + } + } // fd sweep + + } // forever loop + LOGERR("SelectLoop::doLoop: got out of loop !\n"); + return -1; +} + +#endif // kqueue version + +int SelectLoop::Internal::setselevents(int fd, int events) +{ +#ifdef HAVE_KQUEUE + auto it = polldata.find(fd); + if (it == polldata.end()) { + return -1; + } + return setselevents(it->second, events); +#endif + return 0; +} + +int SelectLoop::Internal::setselevents(NetconP& con, int events) +{ +#ifdef HAVE_KQUEUE + struct kevent event; + if (events & Netcon::NETCONPOLL_READ) { + EV_SET(&event, con->m_fd, EVFILT_READ, EV_ADD, 0, 0, 0); + if(kevent(kq, &event, 1, 0, 0, 0) < 0) { + LOGSYSERR("SelectLoop::addselcon", "kevent", ""); + return -1; + } + } else { + EV_SET(&event, con->m_fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + kevent(kq, &event, 1, 0, 0, 0); + } + if (events & Netcon::NETCONPOLL_WRITE) { + EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_ADD, 0, 0, 0); + if(kevent(kq, &event, 1, 0, 0, 0) < 0) { + LOGSYSERR("SelectLoop::addselcon", "kevent", ""); + return -1; + } + } else { + EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + kevent(kq, &event, 1, 0, 0, 0); + } +#endif + return 0; +} + +// Add a connection to the monitored set. This can be used to change +// the event flags too (won't add duplicates) int SelectLoop::addselcon(NetconP con, int events) { if (!con) { @@ -307,10 +536,10 @@ int SelectLoop::addselcon(NetconP con, int events) } LOGDEB1("Netcon::addselcon: fd " << con->m_fd << "\n"); con->set_nonblock(1); - con->setselevents(events); - m_polldata[con->m_fd] = con; + con->m_wantedEvents = events; + m->polldata[con->m_fd] = con; con->setloop(this); - return 0; + return m->setselevents(con, events); } // Remove a connection from the monitored set. @@ -320,14 +549,15 @@ int SelectLoop::remselcon(NetconP con) return -1; } LOGDEB1("Netcon::remselcon: fd " << con->m_fd << "\n"); - map::iterator it = m_polldata.find(con->m_fd); - if (it == m_polldata.end()) { + m->setselevents(con, 0); + auto it = m->polldata.find(con->m_fd); + if (it == m->polldata.end()) { LOGDEB1("Netcon::remselcon: con not found for fd " << con->m_fd << "\n"); return -1; } con->setloop(0); - m_polldata.erase(it); + m->polldata.erase(it); return 0; } @@ -394,6 +624,15 @@ int Netcon::set_nonblock(int onoff) return flags; } +int Netcon::setselevents(int events) +{ + m_wantedEvents = events; + if (m_loop) { + m_loop->m->setselevents(m_fd, events); + } + return m_wantedEvents; +} + ///////////////////////////////////////////////////////////////////// // Data socket (NetconData) methods @@ -651,7 +890,7 @@ int NetconData::cando(Netcon::Event reason) return 0; } } - clearselevents(NETCONPOLL_WRITE); + m_wantedEvents &= ~NETCONPOLL_WRITE; return 1; } diff --git a/src/utils/netcon.h b/src/utils/netcon.h index 471c90ae..7d52f3fc 100644 --- a/src/utils/netcon.h +++ b/src/utils/netcon.h @@ -23,9 +23,8 @@ #endif #include -#include -#include +#include #include /// A set of classes to manage client-server communication over a @@ -85,21 +84,11 @@ public: /// Decide what events the connection will be looking for /// (NETCONPOLL_READ, NETCONPOLL_WRITE) - int setselevents(int evs) { - return m_wantedEvents = evs; - } + int setselevents(int evs); /// Retrieve the connection's currently monitored set of events int getselevents() { return m_wantedEvents; } - /// Add events to current set - int addselevents(int evs) { - return m_wantedEvents |= evs; - } - /// Clear events from current set - int clearselevents(int evs) { - return m_wantedEvents &= ~evs; - } friend class SelectLoop; SelectLoop *getloop() { @@ -115,7 +104,7 @@ protected: int m_fd; bool m_ownfd; int m_didtimo; - // Used when part of the selectloop map. + // Used when part of the selectloop. short m_wantedEvents; SelectLoop *m_loop; // Method called by the selectloop when something can be done with a netcon @@ -130,16 +119,34 @@ protected: /// The selectloop interface is used to implement parallel servers. // The select loop mechanism allows several netcons to be used for io // in a program without blocking as long as there is data to be read -// or written. In a multithread program which is also using select, it -// would typically make sense to have one SelectLoop active per +// or written. In a multithread program, if each thread needs +// non-blocking IO it may make sense to have one SelectLoop active per // thread. class SelectLoop { public: - SelectLoop() - : m_selectloopDoReturn(false), m_selectloopReturnValue(0), - m_placetostart(0), - m_periodichandler(0), m_periodicparam(0), m_periodicmillis(0) { - } + SelectLoop(); + SelectLoop(const SelectLoop&) = delete; + SelectLoop& operator=(const SelectLoop&) = delete; + ~SelectLoop(); + + /// Add a connection to be monitored (this will usually be called + /// from the server's listen connection's accept callback) + int addselcon(NetconP con, int events); + + /// Remove a connection from the monitored set. This is + /// automatically called when EOF is detected on a connection. + int remselcon(NetconP con); + + /// Set a function to be called periodically, or a time before return. + /// @param handler the function to be called. + /// - if it is 0, doLoop() will return after ms mS (and can be called + /// again) + /// - if it is not 0, it will be called at ms mS intervals. If its return + /// value is <= 0, selectloop will return. + /// @param clp client data to be passed to handler at every call. + /// @param ms milliseconds interval between handler calls or + /// before return. Set to 0 for no periodic handler. + void setperiodichandler(int (*handler)(void *), void *clp, int ms); /// Loop waiting for events on the connections and call the /// cando() method on the object when something happens (this will in @@ -149,47 +156,13 @@ public: /// timeout (should call back in after processing) int doLoop(); - /// Call from data handler: make selectloop return the param value - void loopReturn(int value) { - m_selectloopDoReturn = true; - m_selectloopReturnValue = value; - } - /// Add a connection to be monitored (this will usually be called - /// from the server's listen connection's accept callback) - int addselcon(NetconP con, int events); - /// Remove a connection from the monitored set. This is - /// automatically called when EOF is detected on a connection. - int remselcon(NetconP con); - - /// Set a function to be called periodically, or a time before return. - /// @param handler the function to be called. - /// - if it is 0, selectloop() will return after ms mS (and can be called - /// again - /// - if it is not 0, it will be called at ms mS intervals. If its return - /// value is <= 0, selectloop will return. - /// @param clp client data to be passed to handler at every call. - /// @param ms milliseconds interval between handler calls or - /// before return. Set to 0 for no periodic handler. - void setperiodichandler(int (*handler)(void *), void *clp, int ms); + /// Call from data handler: make doLoop() return @param value + void loopReturn(int value); + friend class Netcon; private: - // Set by client callback to tell selectloop to return. - bool m_selectloopDoReturn; - int m_selectloopReturnValue; - int m_placetostart; - - // Map of NetconP indexed by fd - std::map m_polldata; - - // The last time we did the periodic thing. Initialized by setperiodic() - struct timeval m_lasthdlcall; - // The call back function and its parameter - int (*m_periodichandler)(void *); - void *m_periodicparam; - // The periodic interval - int m_periodicmillis; - void periodictimeout(struct timeval *tv); - int maybecallperiodic(); + class Internal; + Internal *m; }; ///////////////////////