execcmd: type clean up + make sure that duplex / filter (not used by recoll) mode works by closing outgoing pipe at end of data

--HG--
branch : WINDOWSPORT
This commit is contained in:
Jean-Francois Dockes 2015-09-08 07:56:50 +02:00
parent a29af22b15
commit fa556413f3
2 changed files with 83 additions and 77 deletions

View File

@ -110,7 +110,7 @@ static void stringToTokens(const string& str, vector<string>& tokens,
} }
} }
} }
#endif // RECOLL_DATADIR #endif // BUILDING_RECOLL
class ExecCmd::Internal { class ExecCmd::Internal {
public: public:
@ -127,13 +127,13 @@ public:
bool m_killRequest; bool m_killRequest;
int m_timeoutMs; int m_timeoutMs;
int m_rlimit_as_mbytes; int m_rlimit_as_mbytes;
std::string m_stderrFile; string m_stderrFile;
// Pipe for data going to the command // Pipe for data going to the command
int m_pipein[2]; int m_pipein[2];
NetconP m_tocmd; STD_SHARED_PTR<NetconCli> m_tocmd;
// Pipe for data coming out // Pipe for data coming out
int m_pipeout[2]; int m_pipeout[2];
NetconP m_fromcmd; STD_SHARED_PTR<NetconCli> m_fromcmd;
// Subprocess id // Subprocess id
pid_t m_pid; pid_t m_pid;
// Saved sigmask // Saved sigmask
@ -278,42 +278,46 @@ static void msleep(int millis)
* during method executions */ * during method executions */
class ExecCmdRsrc { class ExecCmdRsrc {
public: public:
ExecCmdRsrc(ExecCmd *parent) : m_parent(parent), m_active(true) {} ExecCmdRsrc(ExecCmd::Internal *parent)
void inactivate() {m_active = false;} : m_parent(parent), m_active(true) {
}
void inactivate() {
m_active = false;
}
~ExecCmdRsrc() { ~ExecCmdRsrc() {
if (!m_active || !m_parent) if (!m_active || !m_parent)
return; return;
LOGDEB1(("~ExecCmdRsrc: working. mypid: %d\n", (int)getpid())); LOGDEB1(("~ExecCmdRsrc: working. mypid: %d\n", (int)getpid()));
// Better to close the descs first in case the child is waiting in read // Better to close the descs first in case the child is waiting in read
if (m_parent->m->m_pipein[0] >= 0) if (m_parent->m_pipein[0] >= 0)
close(m_parent->m->m_pipein[0]); close(m_parent->m_pipein[0]);
if (m_parent->m->m_pipein[1] >= 0) if (m_parent->m_pipein[1] >= 0)
close(m_parent->m->m_pipein[1]); close(m_parent->m_pipein[1]);
if (m_parent->m->m_pipeout[0] >= 0) if (m_parent->m_pipeout[0] >= 0)
close(m_parent->m->m_pipeout[0]); close(m_parent->m_pipeout[0]);
if (m_parent->m->m_pipeout[1] >= 0) if (m_parent->m_pipeout[1] >= 0)
close(m_parent->m->m_pipeout[1]); close(m_parent->m_pipeout[1]);
// It's apparently possible for m_pid to be > 0 and getpgid to fail. In // It's apparently possible for m_pid to be > 0 and getpgid to fail. In
// this case, we have to conclude that the child process does // this case, we have to conclude that the child process does
// not exist. Not too sure what causes this, but the previous code // not exist. Not too sure what causes this, but the previous code
// definitely tried to call killpg(-1,) from time to time. // definitely tried to call killpg(-1,) from time to time.
pid_t grp; pid_t grp;
if (m_parent->m->m_pid > 0 && (grp = getpgid(m_parent->m->m_pid)) > 0) { if (m_parent->m_pid > 0 && (grp = getpgid(m_parent->m_pid)) > 0) {
LOGDEB(("ExecCmd: killpg(%d, SIGTERM)\n", grp)); LOGDEB(("ExecCmd: killpg(%d, SIGTERM)\n", grp));
int ret = killpg(grp, SIGTERM); int ret = killpg(grp, SIGTERM);
if (ret == 0) { if (ret == 0) {
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
msleep(i == 0 ? 5 : (i == 1 ? 100 : 2000)); msleep(i == 0 ? 5 : (i == 1 ? 100 : 2000));
int status; int status;
(void)waitpid(m_parent->m->m_pid, &status, WNOHANG); (void)waitpid(m_parent->m_pid, &status, WNOHANG);
if (kill(m_parent->m->m_pid, 0) != 0) if (kill(m_parent->m_pid, 0) != 0)
break; break;
if (i == 2) { if (i == 2) {
LOGDEB(("ExecCmd: killpg(%d, SIGKILL)\n", grp)); LOGDEB(("ExecCmd: killpg(%d, SIGKILL)\n", grp));
killpg(grp, SIGKILL); killpg(grp, SIGKILL);
(void)waitpid(m_parent->m->m_pid, &status, WNOHANG); (void)waitpid(m_parent->m_pid, &status, WNOHANG);
} }
} }
} else { } else {
@ -321,19 +325,19 @@ public:
grp, errno)); grp, errno));
} }
} }
m_parent->m->m_tocmd.reset(); m_parent->m_tocmd.reset();
m_parent->m->m_fromcmd.reset(); m_parent->m_fromcmd.reset();
pthread_sigmask(SIG_UNBLOCK, &m_parent->m->m_blkcld, 0); pthread_sigmask(SIG_UNBLOCK, &m_parent->m_blkcld, 0);
m_parent->m->reset(); m_parent->reset();
} }
private: private:
ExecCmd *m_parent; ExecCmd::Internal *m_parent;
bool m_active; bool m_active;
}; };
ExecCmd::~ExecCmd() ExecCmd::~ExecCmd()
{ {
ExecCmdRsrc(this); ExecCmdRsrc(this->m);
if (m) if (m)
delete m; delete m;
} }
@ -480,7 +484,7 @@ int ExecCmd::startExec(const string &cmd, const vector<string>& args,
} }
// The resource manager ensures resources are freed if we return early // The resource manager ensures resources are freed if we return early
ExecCmdRsrc e(this); ExecCmdRsrc e(this->m);
if (has_input && pipe(m->m_pipein) < 0) { if (has_input && pipe(m->m_pipein) < 0) {
LOGERR(("ExecCmd::startExec: pipe(2) failed. errno %d\n", errno)); LOGERR(("ExecCmd::startExec: pipe(2) failed. errno %d\n", errno));
@ -665,14 +669,14 @@ int ExecCmd::startExec(const string &cmd, const vector<string>& args,
m->m_pipein[0] = -1; m->m_pipein[0] = -1;
NetconCli *iclicon = new NetconCli(); NetconCli *iclicon = new NetconCli();
iclicon->setconn(m->m_pipein[1]); iclicon->setconn(m->m_pipein[1]);
m->m_tocmd = NetconP(iclicon); m->m_tocmd = STD_SHARED_PTR<NetconCli>(iclicon);
} }
if (has_output) { if (has_output) {
close(m->m_pipeout[1]); close(m->m_pipeout[1]);
m->m_pipeout[1] = -1; m->m_pipeout[1] = -1;
NetconCli *oclicon = new NetconCli(); NetconCli *oclicon = new NetconCli();
oclicon->setconn(m->m_pipeout[0]); oclicon->setconn(m->m_pipeout[0]);
m->m_fromcmd = NetconP(oclicon); m->m_fromcmd = STD_SHARED_PTR<NetconCli>(oclicon);
} }
/* Don't want to undo what we just did ! */ /* Don't want to undo what we just did ! */
@ -684,28 +688,38 @@ int ExecCmd::startExec(const string &cmd, const vector<string>& args,
// Netcon callback. Send data to the command's input // Netcon callback. Send data to the command's input
class ExecWriter : public NetconWorker { class ExecWriter : public NetconWorker {
public: public:
ExecWriter(const string *input, ExecCmdProvide *provide) ExecWriter(const string *input, ExecCmdProvide *provide,
: m_input(input), m_cnt(0), m_provide(provide) ExecCmd::Internal *parent)
{} : m_cmd(parent), m_input(input), m_cnt(0), m_provide(provide) {
}
void shutdown() {
close(m_cmd->m_pipein[1]);
m_cmd->m_pipein[1] = -1;
m_cmd->m_tocmd.reset();
}
virtual int data(NetconData *con, Netcon::Event reason) virtual int data(NetconData *con, Netcon::Event reason)
{ {
if (!m_input) return -1; if (!m_input)
return -1;
LOGDEB1(("ExecWriter: input m_cnt %d input length %d\n", m_cnt, LOGDEB1(("ExecWriter: input m_cnt %d input length %d\n", m_cnt,
m_input->length())); m_input->length()));
if (m_cnt >= m_input->length()) { if (m_cnt >= m_input->length()) {
// Fd ready for more but we got none. // Fd ready for more but we got none. Try to get data, else
if (m_provide) { // shutdown;
m_provide->newData(); if (!m_provide) {
if (m_input->empty()) { shutdown();
return 0;
} else {
m_cnt = 0;
}
LOGDEB2(("ExecWriter: provide m_cnt %d input length %d\n",
m_cnt, m_input->length()));
} else {
return 0; return 0;
} }
m_provide->newData();
if (m_input->empty()) {
shutdown();
return 0;
} else {
// Ready with new buffer, reset use count
m_cnt = 0;
}
LOGDEB2(("ExecWriter: provide m_cnt %d input length %d\n",
m_cnt, m_input->length()));
} }
int ret = con->send(m_input->c_str() + m_cnt, int ret = con->send(m_input->c_str() + m_cnt,
m_input->length() - m_cnt); m_input->length() - m_cnt);
@ -718,6 +732,7 @@ public:
return ret; return ret;
} }
private: private:
ExecCmd::Internal *m_cmd;
const string *m_input; const string *m_input;
unsigned int m_cnt; // Current offset inside m_input unsigned int m_cnt; // Current offset inside m_input
ExecCmdProvide *m_provide; ExecCmdProvide *m_provide;
@ -758,13 +773,13 @@ int ExecCmd::doexec(const string &cmd, const vector<string>& args,
} }
// Cleanup in case we return early // Cleanup in case we return early
ExecCmdRsrc e(this); ExecCmdRsrc e(this->m);
SelectLoop myloop; SelectLoop myloop;
int ret = 0; int ret = 0;
if (input || output) { if (input || output) {
// Setup output // Setup output
if (output) { if (output) {
NetconCli *oclicon = dynamic_cast<NetconCli *>(m->m_fromcmd.get()); NetconCli *oclicon = m->m_fromcmd.get();
if (!oclicon) { if (!oclicon) {
LOGERR(("ExecCmd::doexec: no connection from command\n")); LOGERR(("ExecCmd::doexec: no connection from command\n"));
return -1; return -1;
@ -777,13 +792,13 @@ int ExecCmd::doexec(const string &cmd, const vector<string>& args,
} }
// Setup input // Setup input
if (input) { if (input) {
NetconCli *iclicon = dynamic_cast<NetconCli *>(m->m_tocmd.get()); NetconCli *iclicon = m->m_tocmd.get();
if (!iclicon) { if (!iclicon) {
LOGERR(("ExecCmd::doexec: no connection from command\n")); LOGERR(("ExecCmd::doexec: no connection from command\n"));
return -1; return -1;
} }
iclicon->setcallback(STD_SHARED_PTR<NetconWorker> iclicon->setcallback(STD_SHARED_PTR<NetconWorker>
(new ExecWriter(input, m->m_provide))); (new ExecWriter(input, m->m_provide, m)));
myloop.addselcon(m->m_tocmd, Netcon::NETCONPOLL_WRITE); myloop.addselcon(m->m_tocmd, Netcon::NETCONPOLL_WRITE);
// Give up ownership // Give up ownership
m->m_tocmd.reset(); m->m_tocmd.reset();
@ -829,7 +844,7 @@ int ExecCmd::doexec(const string &cmd, const vector<string>& args,
int ExecCmd::send(const string& data) int ExecCmd::send(const string& data)
{ {
NetconCli *con = dynamic_cast<NetconCli *>(m->m_tocmd.get()); NetconCli *con = m->m_tocmd.get();
if (con == 0) { if (con == 0) {
LOGERR(("ExecCmd::send: outpipe is closed\n")); LOGERR(("ExecCmd::send: outpipe is closed\n"));
return -1; return -1;
@ -850,7 +865,7 @@ int ExecCmd::send(const string& data)
int ExecCmd::receive(string& data, int cnt) int ExecCmd::receive(string& data, int cnt)
{ {
NetconCli *con = dynamic_cast<NetconCli *>(m->m_fromcmd.get()); NetconCli *con = m->m_fromcmd.get();
if (con == 0) { if (con == 0) {
LOGERR(("ExecCmd::receive: inpipe is closed\n")); LOGERR(("ExecCmd::receive: inpipe is closed\n"));
return -1; return -1;
@ -877,7 +892,7 @@ int ExecCmd::receive(string& data, int cnt)
int ExecCmd::getline(string& data) int ExecCmd::getline(string& data)
{ {
NetconCli *con = dynamic_cast<NetconCli *>(m->m_fromcmd.get()); NetconCli *con = m->m_fromcmd.get();
if (con == 0) { if (con == 0) {
LOGERR(("ExecCmd::receive: inpipe is closed\n")); LOGERR(("ExecCmd::receive: inpipe is closed\n"));
return -1; return -1;
@ -898,7 +913,7 @@ int ExecCmd::getline(string& data)
// Wait for command status and clean up all resources. // Wait for command status and clean up all resources.
int ExecCmd::wait() int ExecCmd::wait()
{ {
ExecCmdRsrc e(this); ExecCmdRsrc e(this->m);
int status = -1; int status = -1;
if (!m->m_killRequest && m->m_pid > 0) { if (!m->m_killRequest && m->m_pid > 0) {
if (waitpid(m->m_pid, &status, 0) < 0) { if (waitpid(m->m_pid, &status, 0) < 0) {
@ -914,7 +929,7 @@ int ExecCmd::wait()
bool ExecCmd::maybereap(int *status) bool ExecCmd::maybereap(int *status)
{ {
ExecCmdRsrc e(this); ExecCmdRsrc e(this->m);
*status = -1; *status = -1;
if (m->m_pid <= 0) { if (m->m_pid <= 0) {
@ -1089,16 +1104,15 @@ using namespace std;
// Testing the rclexecm protocol outside of recoll. Here we use the // Testing the rclexecm protocol outside of recoll. Here we use the
// rcldoc.py filter, you can try with rclaudio too, adjust the file arg // rcldoc.py filter, you can try with rclaudio too, adjust the file arg
// accordingly // accordingly
static const string tstcmd("/home/dockes/projets/fulltext/win-recoll/src/filters/rcldoc.py"); bool exercise_mhexecm(const string& cmdstr, const string& mimetype,
static const string mimetype("text/html"); vector<string>& files)
bool exercise_mhexecm(vector<string>& files)
{ {
ExecCmd cmd; ExecCmd cmd;
vector<string>myparams; vector<string> myparams;
if (cmd.startExec(tstcmd, myparams, 1, 1) < 0) { if (cmd.startExec(cmdstr, myparams, 1, 1) < 0) {
cerr << "startExec " << tstcmd << " failed. Missing command?\n"; cerr << "startExec " << cmdstr << " failed. Missing command?\n";
return false; return false;
} }
@ -1189,7 +1203,9 @@ static char usage [] =
" -o : command produces output\n" " -o : command produces output\n"
" If -i is set, we send /etc/group contents to whatever command is run\n" " If -i is set, we send /etc/group contents to whatever command is run\n"
" If -o is set, we print whatever comes out\n" " If -o is set, we print whatever comes out\n"
"trexecmd -m <file.doc> [...]: test execm:\n" "trexecmd -m <filter> <mimetype> <file> [file ...]: test execm:\n"
" <filter> should be the path to an execm filter\n"
" <mimetype> the type of the file parameters\n"
"trexecmd -w cmd : do the 'which' thing\n" "trexecmd -w cmd : do the 'which' thing\n"
; ;
@ -1208,7 +1224,8 @@ static int op_flags;
#define OPT_m 0x40 #define OPT_m 0x40
#define OPT_o 0x80 #define OPT_o 0x80
// Data sink for data coming out of the command. We also use it to set a cancellation after a moment. // Data sink for data coming out of the command. We also use it to set
// a cancellation after a moment.
class MEAdv : public ExecCmdAdvise { class MEAdv : public ExecCmdAdvise {
public: public:
void newData(int cnt) { void newData(int cnt) {
@ -1325,22 +1342,12 @@ int main(int argc, char *argv[])
} }
return 1; return 1;
} else if (op_flags & OPT_m) { } else if (op_flags & OPT_m) {
l.insert(l.begin(), arg1); if (l.size() < 2)
return exercise_mhexecm(l) ? 0 : 1; Usage();
string mimetype = l[0];
l.erase(l.begin());
return exercise_mhexecm(arg1, mimetype, l) ? 0 : 1;
} else { } else {
if ((op_flags& OPT_i) && (op_flags & OPT_o)) {
// It's a chance we don't use this in recoll because there
// is a bug: the input (to cmd) fd is not closed at the
// end of input so the command blocks and we deadlock
// (until a possible cancellation). We need to add code to
// actually signal the end of input (provide::newData()
// must not be void, but for example return -1 at eof, or
// if it is not set we do it when the input string is
// empty), and actually close the descriptor. Netcon won't
// do it for us, this has to be done in execcmd.
cerr << "Can't set -i and -o at the moment\n";
return 1;
}
// Default: execute command line arguments // Default: execute command line arguments
ExecCmd mexec; ExecCmd mexec;

View File

@ -188,9 +188,8 @@ class ExecCmd {
*/ */
static bool backtick(const std::vector<std::string> cmd, std::string& out); static bool backtick(const std::vector<std::string> cmd, std::string& out);
friend class ExecCmdRsrc;
private:
class Internal; class Internal;
private:
Internal *m; Internal *m;
/* Copyconst and assignment are private and forbidden */ /* Copyconst and assignment are private and forbidden */
ExecCmd(const ExecCmd &) {} ExecCmd(const ExecCmd &) {}