Blob Blame History Raw
From 493279b790a8cdace8ccbc2c5136985e820dd2fa Mon Sep 17 00:00:00 2001
From: Jiri Vymazal <jvymazal@redhat.com>
Date: Thu, 6 Aug 2020 16:21:03 +0000
Subject: [PATCH] imudp & nsdsel_ptcp: replace select() by poll()

select was used when epoll was not available. This could cause issues
if during startup fds > 1024 were generated. This is extremely unlikely
and we have never seen any such report in practice. Nevertheless, it is
better to have this fixed. The performance will probably also increase 
in most cases.

Note this is not a replacement for the epoll drivers, but a general
stability improvement when epoll() is not available for some reason.

closes https://github.com/rsyslog/rsyslog/issues/2615
closes https://github.com/rsyslog/rsyslog/issues/1728
closes https://github.com/rsyslog/rsyslog/issues/1459
---
 plugins/imudp/imudp.c |  73 ++++++++++++++++++++++++++-----------------
 runtime/nsdsel_ptcp.c | 142 ++++++++++++++++++++----------------------
 runtime/nsdsel_ptcp.h |  14 ++---
 runtime/rsyslog.h     |   1 +
 4 files changed, 116 insertions(+), 114 deletions(-)

diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 4303f0b952..24d90ab665 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -34,6 +34,7 @@
 #include <sys/socket.h>
 #include <pthread.h>
 #include <signal.h>
+#include <poll.h>
 #ifdef HAVE_SYS_EPOLL_H
 #	include <sys/epoll.h>
 #endif
@@ -836,61 +837,75 @@ rcvMainLoop(struct wrkrInfo_s *const __restrict__ pWrkr)
 }
 #else /* #if HAVE_EPOLL_CREATE1 */
 /* this is the code for the select() interface */
-static rsRetVal
+static rsRetVal
 rcvMainLoop(struct wrkrInfo_s *const __restrict__ pWrkr)
 {
 	DEFiRet;
-	int maxfds;
 	int nfds;
-	fd_set readfds;
 	struct sockaddr_storage frominetPrev;
 	int bIsPermitted;
+	int i = 0;
 	struct lstn_s *lstn;
 
+	DBGPRINTF("imudp uses poll() [ex-select]\n");
 	/* start "name caching" algo by making sure the previous system indicator
-	 * is invalidated.
-	 */
+	 * is invalidated. */
 	bIsPermitted = 0;
 	memset(&frominetPrev, 0, sizeof(frominetPrev));
-	DBGPRINTF("imudp uses select()\n");
 
-	while(1) {
-		/* Add the Unix Domain Sockets to the list of read descriptors.
-		 */
-	        maxfds = 0;
-	        FD_ZERO(&readfds);
-
-		/* Add the UDP listen sockets to the list of read descriptors. */
-		for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
-			if (lstn->sock != -1) {
-				if(Debug)
-					net.debugListenInfo(lstn->sock, (char*)"UDP");
-				FD_SET(lstn->sock, &readfds);
-				if(lstn->sock>maxfds) maxfds=lstn->sock;
+	/* setup poll() subsystem */
+	int nfd = 0;
+	for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+		if(lstn->sock != -1) {
+			if(Debug) {
+				net.debugListenInfo(lstn->sock, (char*)"UDP");
 			}
+			++nfd;
 		}
-		if(Debug) {
-			dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds);
-			for (nfds = 0; nfds <= maxfds; ++nfds)
-				if(FD_ISSET(nfds, &readfds))
-					dbgprintf("%d ", nfds);
-			dbgprintf("\n");
+	}
+	struct pollfd *const pollfds = calloc(nfd, sizeof(struct pollfd));
+	CHKmalloc(pollfds);
+
+	for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+		assert(i < nfd);
+		if (lstn->sock != -1) {
+			pollfds[i].fd = lstn->sock;
+			pollfds[i].events = POLLIN;
+			++i;
 		}
+	}
 
-		/* wait for io to become ready */
-		nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
+	while(1) {
+		DBGPRINTF("--------imudp calling poll() on %d fds\n", nfd);
+		nfds = poll(pollfds, nfd, -1);
 		if(glbl.GetGlobalInputTermState() == 1)
 			break; /* terminate input! */
 
+		if(nfds < 0) {
+			if(errno == EINTR) {
+				DBGPRINTF("imudp: EINTR occured\n");
+			} else {
+				LogMsg(errno, RS_RET_POLL_ERR, LOG_WARNING, "imudp: poll "
+					"system call failed, may cause further troubles");
+			}
+			nfds = 0;
+		}
+
+		i = 0;
 		for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) {
-			if(FD_ISSET(lstn->sock, &readfds)) {
+			assert(i < nfd);
+			if(glbl.GetGlobalInputTermState() == 1)
+				ABORT_FINALIZE(RS_RET_FORCE_TERM); /* terminate input! */
+			if(pollfds[i].revents & POLLIN) {
 		       		processSocket(pWrkr, lstn, &frominetPrev, &bIsPermitted);
-			--nfds; /* indicate we have processed one descriptor */
+				--nfds;
 			}
+			++i;
 	       }
 	       /* end of a run, back to loop for next recv() */
 	}
 
+finalize_it:
 	RETiRet;
 }
 #endif /* #if HAVE_EPOLL_CREATE1 */
diff --git a/runtime/nsdsel_ptcp.c b/runtime/nsdsel_ptcp.c
index e2cfca7caa..0ea6af4a1d 100644
--- a/runtime/nsdsel_ptcp.c
+++ b/runtime/nsdsel_ptcp.c
@@ -2,7 +2,7 @@
  *
  * An implementation of the nsd select() interface for plain tcp sockets.
  * 
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2018 Rainer Gerhards and Adiscon GmbH.
  *
  * This file is part of the rsyslog runtime library.
  *
@@ -38,70 +38,61 @@
 #include "nsdsel_ptcp.h"
 #include "unlimited_select.h"
 
+#define FDSET_INCREMENT 1024 /* increment for struct pollfds array allocation */
 /* static data */
 DEFobjStaticHelpers
-DEFobjCurrIf(errmsg)
 DEFobjCurrIf(glbl)
 
 
-/* Standard-Constructor
- */
+/* Standard-Constructor */
 BEGINobjConstruct(nsdsel_ptcp) /* be sure to specify the object type also in END macro! */
-	pThis->maxfds = 0;
-#ifdef USE_UNLIMITED_SELECT
-        pThis->pReadfds = calloc(1, glbl.GetFdSetSize());
-        pThis->pWritefds = calloc(1, glbl.GetFdSetSize());
-#else
-	FD_ZERO(&pThis->readfds);
-	FD_ZERO(&pThis->writefds);
-#endif
+	pThis->currfds = 0;
+	pThis->maxfds = FDSET_INCREMENT;
+        CHKmalloc(pThis->fds = calloc(FDSET_INCREMENT, sizeof(struct pollfd)));
+finalize_it:
 ENDobjConstruct(nsdsel_ptcp)
 
 
 /* destructor for the nsdsel_ptcp object */
 BEGINobjDestruct(nsdsel_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
 CODESTARTobjDestruct(nsdsel_ptcp)
-#ifdef USE_UNLIMITED_SELECT
-	freeFdSet(pThis->pReadfds);
-	freeFdSet(pThis->pWritefds);
-#endif
+	free(pThis->fds);
 ENDobjDestruct(nsdsel_ptcp)
 
 
 /* Add a socket to the select set */
-static rsRetVal
-Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp)
+static rsRetVal
+Add(nsdsel_t *const pNsdsel, nsd_t *const pNsd, const nsdsel_waitOp_t waitOp)
 {
 	DEFiRet;
-	nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
-	nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
-#ifdef USE_UNLIMITED_SELECT
-        fd_set *pReadfds = pThis->pReadfds;
-        fd_set *pWritefds = pThis->pWritefds;
-#else
-        fd_set *pReadfds = &pThis->readfds;
-        fd_set *pWritefds = &pThis->writefds;
-#endif
-
+	nsdsel_ptcp_t *const pThis = (nsdsel_ptcp_t*) pNsdsel;
+	const nsd_ptcp_t *const pSock = (nsd_ptcp_t*) pNsd;
 	ISOBJ_TYPE_assert(pSock, nsd_ptcp);
 	ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
 
+	if(pThis->currfds == pThis->maxfds) {
+		struct pollfd *newfds;
+		CHKmalloc(newfds = realloc(pThis->fds,
+			sizeof(struct pollfd) * (pThis->maxfds + FDSET_INCREMENT)));
+		pThis->maxfds += FDSET_INCREMENT;
+		pThis->fds = newfds;
+	}
+
 	switch(waitOp) {
 		case NSDSEL_RD:
-			FD_SET(pSock->sock, pReadfds);
+			pThis->fds[pThis->currfds].events = POLLIN;
 			break;
 		case NSDSEL_WR:
-			FD_SET(pSock->sock, pWritefds);
+			pThis->fds[pThis->currfds].events = POLLOUT;
 			break;
 		case NSDSEL_RDWR:
-			FD_SET(pSock->sock, pReadfds);
-			FD_SET(pSock->sock, pWritefds);
+			pThis->fds[pThis->currfds].events = POLLIN | POLLOUT;
 			break;
 	}
+	pThis->fds[pThis->currfds].fd = pSock->sock;
+	++pThis->currfds;
 
-	if(pSock->sock > pThis->maxfds)
-		pThis->maxfds = pSock->sock;
-
+finalize_it:
 	RETiRet;
 }
 
@@ -109,71 +100,77 @@ Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp)
 /* perform the select()  piNumReady returns how many descriptors are ready for IO 
  * TODO: add timeout!
  */
-static rsRetVal
-Select(nsdsel_t *pNsdsel, int *piNumReady)
+static rsRetVal
+Select(nsdsel_t *const pNsdsel, int *const piNumReady)
 {
 	DEFiRet;
-	int i;
 	nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
-#ifdef USE_UNLIMITED_SELECT
-        fd_set *pReadfds = pThis->pReadfds;
-        fd_set *pWritefds = pThis->pWritefds;
-#else
-        fd_set *pReadfds = &pThis->readfds;
-        fd_set *pWritefds = &pThis->writefds;
-#endif
 
 	ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
 	assert(piNumReady != NULL);
 
-	if(Debug) { // TODO: debug setting!
-		// TODO: name in dbgprintf!
-		dbgprintf("--------<NSDSEL_PTCP> calling select, active fds (max %d): ", pThis->maxfds);
-		for(i = 0; i <= pThis->maxfds; ++i)
-			if(FD_ISSET(i, pReadfds) || FD_ISSET(i, pWritefds))
-				dbgprintf("%d ", i);
+	assert(pThis->currfds >= 1);
+	if(Debug) {
+		dbgprintf("--------<NSDSEL_PTCP> calling poll, active fds (%d): ", pThis->currfds);
+		for(uint32_t i = 0; i <= pThis->currfds; ++i)
+			dbgprintf("%d ", pThis->fds[i].fd);
 		dbgprintf("\n");
 	}
 
 	/* now do the select */
-	*piNumReady = select(pThis->maxfds+1, pReadfds, pWritefds, NULL, NULL);
+	*piNumReady = poll(pThis->fds, pThis->currfds, -1);
+	if(*piNumReady < 0) {
+		if(errno == EINTR) {
+			DBGPRINTF("nsdsel_ptcp received EINTR\n");
+		} else {
+			LogMsg(errno, RS_RET_POLL_ERR, LOG_WARNING,
+				"ndssel_ptcp: poll system call failed, may cause further troubles");
+		}
+		*piNumReady = 0;
+	}
 
 	RETiRet;
 }
 
 
 /* check if a socket is ready for IO */
-static rsRetVal
-IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
+static rsRetVal
+IsReady(nsdsel_t *const pNsdsel, nsd_t *const pNsd, const nsdsel_waitOp_t waitOp, int *const pbIsReady)
 {
 	DEFiRet;
-	nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
-	nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
-#ifdef USE_UNLIMITED_SELECT
-        fd_set *pReadfds = pThis->pReadfds;
-        fd_set *pWritefds = pThis->pWritefds;
-#else
-        fd_set *pReadfds = &pThis->readfds;
-        fd_set *pWritefds = &pThis->writefds;
-#endif
-
+	const nsdsel_ptcp_t *const pThis = (nsdsel_ptcp_t*) pNsdsel;
+	const nsd_ptcp_t *const pSock = (nsd_ptcp_t*) pNsd;
 	ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
 	ISOBJ_TYPE_assert(pSock, nsd_ptcp);
-	assert(pbIsReady != NULL);
+	const int sock = pSock->sock;
+	// TODO: consider doing a binary search
 
+	uint32_t idx;
+	for(idx = 0 ; idx < pThis->currfds ; ++idx) {
+		if(pThis->fds[idx].fd == sock)
+			break;
+	}
+	if(idx >= pThis->currfds) {
+		LogMsg(0, RS_RET_INTERNAL_ERROR, LOG_ERR,
+			"ndssel_ptcp: could not find socket %d which should be present", sock);
+		ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
+	}
+
+	const short revent = pThis->fds[idx].revents;
+	assert(!(revent & POLLNVAL));
 	switch(waitOp) {
 		case NSDSEL_RD:
-			*pbIsReady = FD_ISSET(pSock->sock, pReadfds);
+			*pbIsReady = revent & POLLIN;
 			break;
 		case NSDSEL_WR:
-			*pbIsReady = FD_ISSET(pSock->sock, pWritefds);
+			*pbIsReady = revent & POLLOUT;
 			break;
 		case NSDSEL_RDWR:
-			*pbIsReady =   FD_ISSET(pSock->sock, pReadfds)
-				     | FD_ISSET(pSock->sock, pWritefds);
+			*pbIsReady = revent & (POLLIN | POLLOUT);
 			break;
 	}
 
+finalize_it:
 	RETiRet;
 }
 
@@ -208,7 +205,6 @@ BEGINObjClassExit(nsdsel_ptcp, OBJ_IS_CORE_MODULE) /* CHANGE class also in END M
 CODESTARTObjClassExit(nsdsel_ptcp)
 	/* release objects we no longer need */
 	objRelease(glbl, CORE_COMPONENT);
-	objRelease(errmsg, CORE_COMPONENT);
 ENDObjClassExit(nsdsel_ptcp)
 
 
@@ -217,11 +213,5 @@ ENDObjClassExit(nsdsel_ptcp)
  * rgerhards, 2008-02-19
  */
 BEGINObjClassInit(nsdsel_ptcp, 1, OBJ_IS_CORE_MODULE) /* class, version */
-	/* request objects we use */
-	CHKiRet(objUse(errmsg, CORE_COMPONENT));
 	CHKiRet(objUse(glbl, CORE_COMPONENT));
-
-	/* set our own handlers */
 ENDObjClassInit(nsdsel_ptcp)
-/* vi:set ai:
- */
diff --git a/runtime/nsdsel_ptcp.h b/runtime/nsdsel_ptcp.h
index f9ec8210c2..87270c9f9f 100644
--- a/runtime/nsdsel_ptcp.h
+++ b/runtime/nsdsel_ptcp.h
@@ -24,20 +24,16 @@
 #ifndef INCLUDED_NSDSEL_PTCP_H
 #define INCLUDED_NSDSEL_PTCP_H
 
+#include <poll.h>
 #include "nsd.h"
 typedef nsdsel_if_t nsdsel_ptcp_if_t; /* we just *implement* this interface */
 
 /* the nsdsel_ptcp object */
 struct nsdsel_ptcp_s {
-	BEGINobjInstance;	/* Data to implement generic object - MUST be the first data element! */
-	int maxfds;
-#ifdef USE_UNLIMITED_SELECT
-	fd_set *pReadfds;
-	fd_set *pWritefds;
-#else
-	fd_set readfds;
-	fd_set writefds;
-#endif
+	BEGINobjInstance;
+	uint32_t maxfds;
+	uint32_t currfds;
+	struct pollfd *fds;
 };
 
 /* interface is defined in nsd.h, we just implement it! */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 9b40ba2159..e0dbab9af2 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -478,6 +478,7 @@
 	RS_RET_FILE_CHOWN_ERROR = -2434, /**< error during chown() */
 	RS_RET_RENAME_TMP_QI_ERROR = -2435, /**< renaming temporary .qi file failed */
 	RS_RET_ERR_SETENV = -2436, /**< error setting an environment variable */
+	RS_RET_POLL_ERR = -2444, /**< error in poll() system call */
 
 	/* RainerScript error messages (range 1000.. 1999) */
 	RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */