Add kqueue-based version of netcon selectloop to work around select() issues on dragonfly bsd

This commit is contained in:
Jean-Francois Dockes 2017-06-30 15:24:42 +02:00
parent dadf448902
commit e5a72da783
4 changed files with 323 additions and 108 deletions

View File

@ -30,6 +30,9 @@
/* Define to 1 if you have the <inttypes.h> header file. */
#undef HAVE_INTTYPES_H
/* Define to 1 if you have the `kqueue' function. */
#undef HAVE_KQUEUE
/* Define to 1 if you have the `dl' library (-ldl). */
#undef HAVE_LIBDL
@ -45,8 +48,8 @@
/* Define to 1 if you have the `mkdtemp' function. */
#undef HAVE_MKDTEMP
/* Define to 1 if you have the `posix_spawn,' function. */
#undef HAVE_POSIX_SPAWN_
/* Define to 1 if you have the `posix_spawn' function. */
#undef HAVE_POSIX_SPAWN
/* Define to 1 if you have the `setrlimit' function. */
#undef HAVE_SETRLIMIT

View File

@ -37,7 +37,7 @@ AC_SYS_LARGEFILE
# OpenBSD needs sys/param.h for mount.h to compile
AC_CHECK_HEADERS([sys/param.h, spawn.h])
AC_CHECK_FUNCS([posix_spawn, setrlimit])
AC_CHECK_FUNCS([posix_spawn setrlimit kqueue])
if test "x$ac_cv_func_posix_spawn" = xyes; then :
AC_ARG_ENABLE(posix_spawn,

View File

@ -29,6 +29,7 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdint.h>
#ifdef _AIX
#include <strings.h>
@ -36,13 +37,17 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#include <vector>
#endif
#include <map>
@ -55,7 +60,7 @@ using namespace std;
#endif
// Size of path buffer in sockaddr_un (AF_UNIX socket
// addr). Mysteriously it's 108 (explicit value) under linux, no
// addr). Mysteriously it is 108 (explicit value) under linux, no
// define accessible. Let's take a little margin as it appears that
// some systems use 92. I believe we could also malloc a variable size
// struct but why bother.
@ -81,8 +86,8 @@ static const int zero = 0;
#define freeZ(X) if (X) {free(X);X=0;}
#endif
#define MILLIS(OLD, NEW) ( (long)(((NEW).tv_sec - (OLD).tv_sec) * 1000 + \
((NEW).tv_usec - (OLD).tv_usec) / 1000))
#define MILLIS(OLD, NEW) ( (uint64_t((NEW).tv_sec) - (OLD).tv_sec) * 1000 + \
((NEW).tv_usec - (OLD).tv_usec) / 1000 )
// Static method
// Simplified interface to 'select()'. Only use one fd, for either
@ -109,24 +114,87 @@ int Netcon::select1(int fd, int timeo, int write)
return ret;
}
///////////////////////////////////////////
// SelectLoop
class SelectLoop::Internal {
public:
Internal() {
#ifdef HAVE_KQUEUE
if ((kq = kqueue()) == -1) {
LOGSYSERR("Netcon::selectloop", "kqueue", "");
}
#endif
}
~Internal() {
#ifdef HAVE_KQUEUE
if (kq >= 0)
close(kq);
#endif
}
// Set by client callback to tell selectloop to return.
bool selectloopDoReturn{false};
int selectloopReturnValue{0};
int placetostart{0};
// Map of NetconP indexed by fd
map<int, NetconP> polldata;
#ifdef HAVE_KQUEUE
int kq{-1};
#endif
// The last time we did the periodic thing. Initialized by setperiodic()
struct timeval lasthdlcall;
// The call back function and its parameter
int (*periodichandler)(void *){0};
void *periodicparam{0};
// The periodic interval
int periodicmillis{0};
void periodictimeout(struct timeval *tv);
void periodictimeout(struct timespec *ts);
int maybecallperiodic();
int setselevents(int fd, int events);
int setselevents(NetconP& con, int events);
};
SelectLoop::SelectLoop()
{
m = new Internal;
}
SelectLoop::~SelectLoop()
{
delete m;
}
void SelectLoop::loopReturn(int value)
{
m->selectloopDoReturn = true;
m->selectloopReturnValue = value;
}
void SelectLoop::setperiodichandler(int (*handler)(void *), void *p, int ms)
{
m_periodichandler = handler;
m_periodicparam = p;
m_periodicmillis = ms;
if (m_periodicmillis > 0) {
gettimeofday(&m_lasthdlcall, 0);
m->periodichandler = handler;
m->periodicparam = p;
m->periodicmillis = ms;
if (m->periodicmillis > 0) {
gettimeofday(&m->lasthdlcall, 0);
}
}
// Compute the appropriate timeout so that the select call returns in
// time to call the periodic routine.
void SelectLoop::periodictimeout(struct timeval *tv)
void SelectLoop::Internal::periodictimeout(struct timeval *tv)
{
// If periodic not set, the select call times out and we loop
// after a very long time (we'd need to pass NULL to select for an
// infinite wait, and I'm too lazy to handle it)
if (m_periodicmillis <= 0) {
if (periodicmillis <= 0) {
tv->tv_sec = 10000;
tv->tv_usec = 0;
return;
@ -134,7 +202,7 @@ void SelectLoop::periodictimeout(struct timeval *tv)
struct timeval mtv;
gettimeofday(&mtv, 0);
int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
// millis <= 0 means we should have already done the thing. *dont* set the
// tv to 0, which means no timeout at all !
@ -145,20 +213,31 @@ void SelectLoop::periodictimeout(struct timeval *tv)
tv->tv_usec = (millis % 1000) * 1000;
}
// Check if it's time to call the handler. selectloop will return to
// caller if it or we return 0
int SelectLoop::maybecallperiodic()
void SelectLoop::Internal::periodictimeout(struct timespec *ts)
{
if (m_periodicmillis <= 0) {
struct timeval tv;
periodictimeout(&tv);
ts->tv_sec = tv.tv_sec;
ts->tv_nsec = tv.tv_usec * 1000;
}
// Check if it's time to call the handler. selectloop will return to
// caller if either we or the handler return 0
int SelectLoop::Internal::maybecallperiodic()
{
if (periodicmillis <= 0) {
return 1;
}
struct timeval mtv;
gettimeofday(&mtv, 0);
int millis = m_periodicmillis - MILLIS(m_lasthdlcall, mtv);
int millis = periodicmillis - MILLIS(lasthdlcall, mtv);
if (millis <= 0) {
gettimeofday(&m_lasthdlcall, 0);
if (m_periodichandler) {
return m_periodichandler(m_periodicparam);
lasthdlcall = mtv;
if (periodichandler) {
return periodichandler(periodicparam);
} else {
return 0;
}
@ -166,14 +245,17 @@ int SelectLoop::maybecallperiodic()
return 1;
}
#ifndef HAVE_KQUEUE
int SelectLoop::doLoop()
{
for (;;) {
if (m_selectloopDoReturn) {
m_selectloopDoReturn = false;
if (m->selectloopDoReturn) {
m->selectloopDoReturn = false;
LOGDEB("Netcon::selectloop: returning on request\n");
return m_selectloopReturnValue;
return m->selectloopReturnValue;
}
int nfds;
fd_set rd, wd;
FD_ZERO(&rd);
@ -182,10 +264,9 @@ int SelectLoop::doLoop()
// Walk the netcon map and set up the read and write fd_sets
// for select()
nfds = 0;
for (map<int, NetconP>::iterator it = m_polldata.begin();
it != m_polldata.end(); it++) {
NetconP& pll = it->second;
int fd = it->first;
for (auto& entry : m->polldata) {
NetconP& pll = entry.second;
int fd = entry.first;
LOGDEB2("Selectloop: fd " << fd << " flags 0x" <<
pll->m_wantedEvents << "\n");
if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
@ -206,7 +287,7 @@ int SelectLoop::doLoop()
// Just in case there would still be open fds in there
// (with no r/w flags set). Should not be needed, but safer
m_polldata.clear();
m->polldata.clear();
LOGDEB1("Netcon::selectloop: no fds\n");
return 0;
}
@ -216,7 +297,7 @@ int SelectLoop::doLoop()
// Compute the next timeout according to what might need to be
// done apart from waiting for data
struct timeval tv;
periodictimeout(&tv);
m->periodictimeout(&tv);
// Wait for something to happen
int ret = select(nfds, &rd, &wd, 0, &tv);
LOGDEB2("Netcon::selectloop: nfds " << nfds <<
@ -225,7 +306,7 @@ int SelectLoop::doLoop()
LOGSYSERR("Netcon::selectloop", "select", "");
return -1;
}
if (m_periodicmillis > 0 && maybecallperiodic() <= 0) {
if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) {
return 1;
}
@ -242,12 +323,12 @@ int SelectLoop::doLoop()
// map may change between 2 sweeps, so that we'd have to be smart
// with the iterator. As the cost per unused fd is low (just 2 bit
// flag tests), we keep it like this for now
if (m_placetostart >= nfds) {
m_placetostart = 0;
if (m->placetostart >= nfds) {
m->placetostart = 0;
}
int i, fd;
int activefds = 0;
for (i = 0, fd = m_placetostart; i < nfds; i++, fd++) {
for (i = 0, fd = m->placetostart; i < nfds; i++, fd++) {
if (fd >= nfds) {
fd = 0;
}
@ -263,8 +344,8 @@ int SelectLoop::doLoop()
continue;
}
map<int, NetconP>::iterator it = m_polldata.find(fd);
if (it == m_polldata.end()) {
auto it = m->polldata.find(fd);
if (it == m->polldata.end()) {
// This should never happen, because we only set our
// own fds in the mask !
LOGERR("Netcon::selectloop: fd " << fd << " not found\n");
@ -272,7 +353,7 @@ int SelectLoop::doLoop()
}
activefds++;
// Next start will be one beyond last serviced (modulo nfds)
m_placetostart = fd + 1;
m->placetostart = fd + 1;
NetconP& pll = it->second;
if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) {
@ -285,7 +366,7 @@ int SelectLoop::doLoop()
(Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) {
LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x"
<< it->second->m_wantedEvents << " mask, erasing\n");
m_polldata.erase(it);
m->polldata.erase(it);
}
} // fd sweep
@ -299,7 +380,155 @@ int SelectLoop::doLoop()
return -1;
}
// Add a connection to the monitored set.
#else // -> Using kqueue: use select()
int SelectLoop::doLoop()
{
for (;;) {
if (m->selectloopDoReturn) {
m->selectloopDoReturn = false;
LOGDEB("Netcon::selectloop: returning on request\n");
return m->selectloopReturnValue;
}
// Check that we do have something to wait for.
int nfds = 0;
for (auto& entry : m->polldata) {
NetconP& pll = entry.second;
if (pll->m_wantedEvents & Netcon::NETCONPOLL_READ) {
nfds++;
} else if (pll->m_wantedEvents & Netcon::NETCONPOLL_WRITE) {
nfds++;
}
}
if (nfds == 0) {
// This should never happen in a server as we should at least
// always monitor the main listening server socket. For a
// client, it's up to client code to avoid or process this
// condition.
// Just in case there would still be open fds in there
// (with no r/w flags set). Should not be needed, but safer
m->polldata.clear();
LOGDEB1("Netcon::selectloop: no fds\n");
return 0;
}
// Compute the next timeout according to what might need to be
// done apart from waiting for data
struct timespec ts;
m->periodictimeout(&ts);
// Wait for something to happen
vector<struct kevent> events;
events.resize(nfds);
LOGDEB("Netcon::selectloop: kevent(), nfds = " << nfds << "\n");
int ret = kevent(m->kq, 0, 0, &events[0], events.size(), &ts);
LOGDEB("Netcon::selectloop: nfds " << nfds <<
" kevent returns " << ret << "\n");
if (ret < 0) {
LOGSYSERR("Netcon::selectloop", "kevent", "");
return -1;
}
if (m->periodicmillis > 0 && m->maybecallperiodic() <= 0) {
return 1;
}
if (ret == 0) {
// Timeout, do it again.
continue;
}
for (int i = 0; i < ret; i++) {
struct kevent& ev = events[i];
if (ev.flags & EV_ERROR) {
LOGSYSERR("Netcon::selectLoop", "kevent", "");
LOGERR("Netcon::selectLoop: event error: " <<
strerror(ev.data));
return -1;
}
int canread = ev.filter == EVFILT_READ;
int canwrite = ev.filter == EVFILT_WRITE;
bool none = !canread && !canwrite;
LOGDEB("Netcon::selectloop: fd " << int(ev.ident) << " " <<
(none ? "blocked" : "can") << " " <<
(canread ? "read" : "") << " " <<
(canwrite ? "write" : "") << "\n");
if (none) {
LOGERR("Kevent returned unknown filter " << ev.filter <<endl);
continue;
}
auto it = m->polldata.find(int(ev.ident));
if (it == m->polldata.end()) {
LOGERR("Netcon::selectloop: fd " << int(ev.ident) <<
" not found\n");
continue;
}
NetconP& pll = it->second;
if (canread && pll->cando(Netcon::NETCONPOLL_READ) <= 0) {
pll->setselevents(pll->getselevents() &
~Netcon::NETCONPOLL_READ);
}
if (canwrite && pll->cando(Netcon::NETCONPOLL_WRITE) <= 0) {
pll->setselevents(pll->getselevents() &
~Netcon::NETCONPOLL_WRITE);
}
if (!(pll->getselevents() &
(Netcon::NETCONPOLL_WRITE | Netcon::NETCONPOLL_READ))) {
LOGDEB0("Netcon::selectloop: fd " << it->first << " has 0x"
<< it->second->getselevents() << " mask, erasing\n");
m->polldata.erase(it);
}
} // fd sweep
} // forever loop
LOGERR("SelectLoop::doLoop: got out of loop !\n");
return -1;
}
#endif // kqueue version
int SelectLoop::Internal::setselevents(int fd, int events)
{
#ifdef HAVE_KQUEUE
auto it = polldata.find(fd);
if (it == polldata.end()) {
return -1;
}
return setselevents(it->second, events);
#endif
return 0;
}
int SelectLoop::Internal::setselevents(NetconP& con, int events)
{
#ifdef HAVE_KQUEUE
struct kevent event;
if (events & Netcon::NETCONPOLL_READ) {
EV_SET(&event, con->m_fd, EVFILT_READ, EV_ADD, 0, 0, 0);
if(kevent(kq, &event, 1, 0, 0, 0) < 0) {
LOGSYSERR("SelectLoop::addselcon", "kevent", "");
return -1;
}
} else {
EV_SET(&event, con->m_fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
kevent(kq, &event, 1, 0, 0, 0);
}
if (events & Netcon::NETCONPOLL_WRITE) {
EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
if(kevent(kq, &event, 1, 0, 0, 0) < 0) {
LOGSYSERR("SelectLoop::addselcon", "kevent", "");
return -1;
}
} else {
EV_SET(&event, con->m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
kevent(kq, &event, 1, 0, 0, 0);
}
#endif
return 0;
}
// Add a connection to the monitored set. This can be used to change
// the event flags too (won't add duplicates)
int SelectLoop::addselcon(NetconP con, int events)
{
if (!con) {
@ -307,10 +536,10 @@ int SelectLoop::addselcon(NetconP con, int events)
}
LOGDEB1("Netcon::addselcon: fd " << con->m_fd << "\n");
con->set_nonblock(1);
con->setselevents(events);
m_polldata[con->m_fd] = con;
con->m_wantedEvents = events;
m->polldata[con->m_fd] = con;
con->setloop(this);
return 0;
return m->setselevents(con, events);
}
// Remove a connection from the monitored set.
@ -320,14 +549,15 @@ int SelectLoop::remselcon(NetconP con)
return -1;
}
LOGDEB1("Netcon::remselcon: fd " << con->m_fd << "\n");
map<int, NetconP>::iterator it = m_polldata.find(con->m_fd);
if (it == m_polldata.end()) {
m->setselevents(con, 0);
auto it = m->polldata.find(con->m_fd);
if (it == m->polldata.end()) {
LOGDEB1("Netcon::remselcon: con not found for fd " <<
con->m_fd << "\n");
return -1;
}
con->setloop(0);
m_polldata.erase(it);
m->polldata.erase(it);
return 0;
}
@ -394,6 +624,15 @@ int Netcon::set_nonblock(int onoff)
return flags;
}
int Netcon::setselevents(int events)
{
m_wantedEvents = events;
if (m_loop) {
m_loop->m->setselevents(m_fd, events);
}
return m_wantedEvents;
}
/////////////////////////////////////////////////////////////////////
// Data socket (NetconData) methods
@ -651,7 +890,7 @@ int NetconData::cando(Netcon::Event reason)
return 0;
}
}
clearselevents(NETCONPOLL_WRITE);
m_wantedEvents &= ~NETCONPOLL_WRITE;
return 1;
}

View File

@ -23,9 +23,8 @@
#endif
#include <sys/time.h>
#include <map>
#include <string>
#include <string>
#include <memory>
/// A set of classes to manage client-server communication over a
@ -85,21 +84,11 @@ public:
/// Decide what events the connection will be looking for
/// (NETCONPOLL_READ, NETCONPOLL_WRITE)
int setselevents(int evs) {
return m_wantedEvents = evs;
}
int setselevents(int evs);
/// Retrieve the connection's currently monitored set of events
int getselevents() {
return m_wantedEvents;
}
/// Add events to current set
int addselevents(int evs) {
return m_wantedEvents |= evs;
}
/// Clear events from current set
int clearselevents(int evs) {
return m_wantedEvents &= ~evs;
}
friend class SelectLoop;
SelectLoop *getloop() {
@ -115,7 +104,7 @@ protected:
int m_fd;
bool m_ownfd;
int m_didtimo;
// Used when part of the selectloop map.
// Used when part of the selectloop.
short m_wantedEvents;
SelectLoop *m_loop;
// Method called by the selectloop when something can be done with a netcon
@ -130,16 +119,34 @@ protected:
/// The selectloop interface is used to implement parallel servers.
// The select loop mechanism allows several netcons to be used for io
// in a program without blocking as long as there is data to be read
// or written. In a multithread program which is also using select, it
// would typically make sense to have one SelectLoop active per
// or written. In a multithread program, if each thread needs
// non-blocking IO it may make sense to have one SelectLoop active per
// thread.
class SelectLoop {
public:
SelectLoop()
: m_selectloopDoReturn(false), m_selectloopReturnValue(0),
m_placetostart(0),
m_periodichandler(0), m_periodicparam(0), m_periodicmillis(0) {
}
SelectLoop();
SelectLoop(const SelectLoop&) = delete;
SelectLoop& operator=(const SelectLoop&) = delete;
~SelectLoop();
/// Add a connection to be monitored (this will usually be called
/// from the server's listen connection's accept callback)
int addselcon(NetconP con, int events);
/// Remove a connection from the monitored set. This is
/// automatically called when EOF is detected on a connection.
int remselcon(NetconP con);
/// Set a function to be called periodically, or a time before return.
/// @param handler the function to be called.
/// - if it is 0, doLoop() will return after ms mS (and can be called
/// again)
/// - if it is not 0, it will be called at ms mS intervals. If its return
/// value is <= 0, selectloop will return.
/// @param clp client data to be passed to handler at every call.
/// @param ms milliseconds interval between handler calls or
/// before return. Set to 0 for no periodic handler.
void setperiodichandler(int (*handler)(void *), void *clp, int ms);
/// Loop waiting for events on the connections and call the
/// cando() method on the object when something happens (this will in
@ -149,47 +156,13 @@ public:
/// timeout (should call back in after processing)
int doLoop();
/// Call from data handler: make selectloop return the param value
void loopReturn(int value) {
m_selectloopDoReturn = true;
m_selectloopReturnValue = value;
}
/// Add a connection to be monitored (this will usually be called
/// from the server's listen connection's accept callback)
int addselcon(NetconP con, int events);
/// Remove a connection from the monitored set. This is
/// automatically called when EOF is detected on a connection.
int remselcon(NetconP con);
/// Set a function to be called periodically, or a time before return.
/// @param handler the function to be called.
/// - if it is 0, selectloop() will return after ms mS (and can be called
/// again
/// - if it is not 0, it will be called at ms mS intervals. If its return
/// value is <= 0, selectloop will return.
/// @param clp client data to be passed to handler at every call.
/// @param ms milliseconds interval between handler calls or
/// before return. Set to 0 for no periodic handler.
void setperiodichandler(int (*handler)(void *), void *clp, int ms);
/// Call from data handler: make doLoop() return @param value
void loopReturn(int value);
friend class Netcon;
private:
// Set by client callback to tell selectloop to return.
bool m_selectloopDoReturn;
int m_selectloopReturnValue;
int m_placetostart;
// Map of NetconP indexed by fd
std::map<int, NetconP> m_polldata;
// The last time we did the periodic thing. Initialized by setperiodic()
struct timeval m_lasthdlcall;
// The call back function and its parameter
int (*m_periodichandler)(void *);
void *m_periodicparam;
// The periodic interval
int m_periodicmillis;
void periodictimeout(struct timeval *tv);
int maybecallperiodic();
class Internal;
Internal *m;
};
///////////////////////