From: Jiri Vymazal Date: Mon, 28 Jun 2018 15:07:55 +0100 Subject: Imfile rewrite with symlink support This commit greatly refactors imfile internal workings. It changes the handling of inotify, FEN, and polling modes. Mostly unchanged is the processing of the way a file is read and state files are kept. This is about a 50% rewrite of the module. Polling, inotify, and FEN modes now use greatly unified code. Some differences still exists and may be changed with further commits. The internal handling of wildcards and file detection has been completely re-written from scratch. For example, previously when multi-level wildcards were used these were not reliably detected. The code also now provides much of the same functionality in all modes, most importantly wildcards are now also supported in polling mode. The refactoring sets ground for further enhancements and smaller refactorings. This commit provides the same feature set that imfile had previously. Some specific changes: bugfix: imfile did not pick up all files when not present at startup bugfix: directories only support "*" wildcard, no others bugfix: parameter "sortfiles" did only work in FEN mode provides the ability to dynamically add and remove files via multi-level wildcards the state file name currently has been changed to inode number We change it to json and also change the way it is stored and loaded. This sets base to additional improvements in imfile. When imfile rewrites state files, it does not truncate previous content. If the new content is smaller than the existing one, the existing part will not be overwritten, resulting in invalid json. That in turn can lead to some other failures. This introduces symlink detection and following as well as monitoring changes on them. stream/bugfix: memory leak on stream open if filename as already generated - this can happen if imfile reads a state file. On each open, memory for the file name can be lost. (cherry picked from commit a03dccf8484d621fe06cb2d11816fbe921751e54 - https://gitlab.cee.redhat.com/rsyslog/rsyslog) --- plugins/imfile/imfile.c | 2264 ++++++++++++++++++++++--------------------- runtime/msg.c | 22 ++++++++++++++++++++++ runtime/msg.h | 1 + runtime/stream.c | 136 ++++++++++++++++++++------- runtime/stream.h | 17 ++++++++++++++--- 5 files changed, 1303 insertions(+), 1137 deletitions(-) diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index b0bc860bcd16beaecd67ce1b7c61991356ea5471..f8225d7068d8fc98edde7bbed194be1105b1696b 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #ifdef HAVE_SYS_INOTIFY_H #include @@ -56,6 +57,7 @@ #include "stringbuf.h" #include "ruleset.h" #include "ratelimit.h" +#include "parserif.h" #include // TODO: fix via own module @@ -77,50 +81,19 @@ static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config para #define NUM_MULTISUB 1024 /* default max number of submits */ #define DFLT_PollInterval 10 - -#define INIT_FILE_TAB_SIZE 4 /* default file table size - is extended as needed, use 2^x value */ -#define INIT_FILE_IN_DIR_TAB_SIZE 1 /* initial size for "associated files tab" in directory table */ #define INIT_WDMAP_TAB_SIZE 1 /* default wdMap table size - is extended as needed, use 2^x value */ - #define ADD_METADATA_UNSPECIFIED -1 +#define CONST_LEN_CEE_COOKIE 5 +#define CONST_CEE_COOKIE "@cee:" + +/* If set to 1, fileTableDisplay will be compiled and used for debugging */ +#define ULTRA_DEBUG 0 + +/* Setting GLOB_BRACE to ZERO which disables support for GLOB_BRACE if not available on current platform */ +#ifndef GLOB_BRACE + #define GLOB_BRACE 0 +#endif -/* this structure is used in pure polling mode as well one of the support - * structures for inotify. - */ -typedef struct lstn_s { - struct lstn_s *next, *prev; - struct lstn_s *masterLstn;/* if dynamic file (via wildcard), this points to the configured - * master entry. For master entries, it is always NULL. Only - * dynamic files can be deleted from the "files" list. */ - uchar *pszFileName; - uchar *pszDirName; - uchar *pszBaseName; - uchar *pszTag; - size_t lenTag; - uchar *pszStateFile; /* file in which state between runs is to be stored (dynamic if NULL) */ - int readTimeout; - int iFacility; - int iSeverity; - int maxLinesAtOnce; - uint32_t trimLineOverBytes; - int nRecords; /**< How many records did we process before persisting the stream? */ - int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ - strm_t *pStrm; /* its stream (NULL if not assigned) */ - sbool bRMStateOnDel; - sbool hasWildcard; - uint8_t readMode; /* which mode to use in ReadMulteLine call? */ - uchar *startRegex; /* regex that signifies end of message (NULL if unset) */ - regex_t end_preg; /* compiled version of startRegex */ - uchar *prevLineSegment; /* previous line segment (in regex mode) */ - sbool escapeLF; /* escape LF inside the MSG content? */ - sbool reopenOnTruncate; - sbool addMetadata; - sbool addCeeTag; - sbool freshStartTail; /* read from tail of file on fresh start? */ - ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ - ratelimit_t *ratelimiter; - multi_submit_t multiSub; -} lstn_t; static struct configSettings_s { uchar *pszFileName; @@ -138,9 +111,11 @@ static struct configSettings_s { struct instanceConf_s { uchar *pszFileName; + uchar *pszFileName_forOldStateFile; /* we unfortunately needs this to read old state files */ uchar *pszDirName; uchar *pszFileBaseName; uchar *pszTag; + size_t lenTag; uchar *pszStateFile; uchar *pszBindRuleset; int nMultiSub; @@ -151,11 +126,15 @@ struct instanceConf_s { sbool bRMStateOnDel; uint8_t readMode; uchar *startRegex; + regex_t end_preg; /* compiled version of startRegex */ + sbool discardTruncatedMsg; + sbool msgDiscardingError; sbool escapeLF; sbool reopenOnTruncate; sbool addCeeTag; sbool addMetadata; sbool freshStartTail; + sbool fileNotFoundError; int maxLinesAtOnce; uint32_t trimLineOverBytes; ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ @@ -163,9 +142,54 @@ struct instanceConf_s { }; +/* file system objects */ +typedef struct fs_edge_s fs_edge_t; +typedef struct fs_node_s fs_node_t; +typedef struct act_obj_s act_obj_t; +struct act_obj_s { + act_obj_t *prev; + act_obj_t *next; + fs_edge_t *edge; /* edge which this object belongs to */ + char *name; /* full path name of active object */ + char *basename; /* only basename */ //TODO: remove when refactoring rename support + char *source_name; /* if this object is target of a symlink, source_name is its name (else NULL) */ + //char *statefile; /* base name of state file (for move operations) */ + int wd; + time_t timeoutBase; /* what time to calculate the timeout against? */ + /* file dynamic data */ + int in_move; /* workaround for inotify move: if set, state file must not be deleted */ + ino_t ino; /* current inode nbr */ + strm_t *pStrm; /* its stream (NULL if not assigned) */ + int nRecords; /**< How many records did we process before persisting the stream? */ + ratelimit_t *ratelimiter; + multi_submit_t multiSub; + int is_symlink; +}; +struct fs_edge_s { + fs_node_t *parent; + fs_node_t *node; /* node this edge points to */ + fs_edge_t *next; + uchar *name; + uchar *path; + act_obj_t *active; + int is_file; + int ninst; /* nbr of instances in instarr */ + instanceConf_t **instarr; +}; +struct fs_node_s { + fs_edge_t *edges; + fs_node_t *root; +}; + + /* forward definitions */ -static rsRetVal persistStrmState(lstn_t *pInfo); +static rsRetVal persistStrmState(act_obj_t *); static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); +static rsRetVal pollFile(act_obj_t *act); +static int getBasename(uchar *const __restrict__ basen, uchar *const __restrict__ path); +static void act_obj_unlink(act_obj_t *act); +static uchar * getStateFileName(const act_obj_t *, uchar *, const size_t); +static int getFullStateFileName(const uchar *const, uchar *const pszout, const size_t ilenout); #define OPMODE_POLLING 0 @@ -178,57 +200,23 @@ struct modConfData_s { int readTimeout; int timeoutGranularity; /* value in ms */ instanceConf_t *root, *tail; - lstn_t *pRootLstn; - lstn_t *pTailLstn; + fs_node_t *conf_tree; uint8_t opMode; sbool configSetViaV2Method; + sbool sortFiles; + sbool normalizePath; /* normalize file system pathes (all start with root dir) */ sbool haveReadTimeouts; /* use special processing if read timeouts exist */ + sbool bHadFileData; /* actually a global variable: + 1 - last call to pollFile() had data + 0 - last call to pollFile() had NO data + Must be manually reset to 0 if desired. Helper for + polling mode. + */ }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ #ifdef HAVE_INOTIFY_INIT -/* support for inotify mode */ - -/* we need to track directories */ -struct dirInfoFiles_s { /* associated files */ - lstn_t *pLstn; - int refcnt; /* due to inotify's async nature, we may have multiple - * references to a single file inside our cache - e.g. when - * inodes are removed, and the file name is re-created BUT another - * process (like rsyslogd ;)) holds open the old inode. - */ -}; -typedef struct dirInfoFiles_s dirInfoFiles_t; - -/* This structure is a dynamic table to track file entries */ -struct fileTable_s { - dirInfoFiles_t *listeners; - int currMax; - int allocMax; -}; -typedef struct fileTable_s fileTable_t; - -/* The dirs table (defined below) contains one entry for each directory that - * is to be monitored. For each directory, it contains array which point to - * the associated *active* files as well as *configured* files. Note that - * the configured files may currently not exist, but will be processed - * when they are created. - */ -struct dirInfo_s { - uchar *dirName; - fileTable_t active; /* associated active files */ - fileTable_t configured; /* associated configured files */ -}; -typedef struct dirInfo_s dirInfo_t; -static dirInfo_t *dirs = NULL; -static int allocMaxDirs; -static int currMaxDirs; -/* the following two macros are used to select the correct file table */ -#define ACTIVE_FILE 1 -#define CONFIGURED_FILE 0 - - /* We need to map watch descriptors to our actual objects. Unfortunately, the * inotify API does not provide us with any cookie, so a simple O(1) algorithm * cannot be done (what a shame...). We assume that maintaining the array is much @@ -238,9 +226,7 @@ static int currMaxDirs; */ struct wd_map_s { int wd; /* ascending sort key */ - lstn_t *pLstn; /* NULL, if this is a dir entry, otherwise pointer into listener(file) table */ - int dirIdx; /* index into dirs table, undefined if pLstn == NULL */ - time_t timeoutBase; /* what time to calculate the timeout against? */ + act_obj_t *act; /* point to related active object */ }; typedef struct wd_map_s wd_map_t; static wd_map_t *wdmap = NULL; @@ -257,6 +243,8 @@ static struct cnfparamdescr modpdescr[] = { { "pollinginterval", eCmdHdlrPositiveInt, 0 }, { "readtimeout", eCmdHdlrPositiveInt, 0 }, { "timeoutgranularity", eCmdHdlrPositiveInt, 0 }, + { "sortfiles", eCmdHdlrBinary, 0 }, + { "normalizepath", eCmdHdlrBinary, 0 }, { "mode", eCmdHdlrGetWord, 0 } }; static struct cnfparamblk modpblk = @@ -286,7 +274,8 @@ static struct cnfparamdescr inppdescr[] = { { "addceetag", eCmdHdlrBinary, 0 }, { "statefile", eCmdHdlrString, CNFPARAM_DEPRECATED }, { "readtimeout", eCmdHdlrPositiveInt, 0 }, - { "freshstarttail", eCmdHdlrBinary, 0} + { "freshstarttail", eCmdHdlrBinary, 0}, + { "filenotfounderror", eCmdHdlrBinary, 0} }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -297,18 +286,106 @@ static struct cnfparamblk inppblk = #include "im-helper.h" /* must be included AFTER the type definitions! */ -#ifdef HAVE_INOTIFY_INIT -/* support for inotify mode */ +/* Support for "old cruft" state files will potentially become optional in the + * future (hopefully). To prepare so, we use conditional compilation with a + * fixed-true condition ;-) -- rgerhards, 2018-03-28 + * reason: https://github.com/rsyslog/rsyslog/issues/2231#issuecomment-376862280 + */ +#define ENABLE_V1_STATE_FILE_FORMAT_SUPPORT 1 +#ifdef ENABLE_V1_STATE_FILE_FORMAT_SUPPORT +static uchar * +OLD_getStateFileName(const instanceConf_t *const inst, + uchar *const __restrict__ buf, + const size_t lenbuf) +{ + DBGPRINTF("OLD_getStateFileName trying '%s'\n", inst->pszFileName_forOldStateFile); + snprintf((char*)buf, lenbuf - 1, "imfile-state:%s", inst->pszFileName_forOldStateFile); + buf[lenbuf-1] = '\0'; /* be on the safe side... */ + uchar *p = buf; + for( ; *p ; ++p) { + if(*p == '/') + *p = '-'; + } + return buf; +} -#if 0 /* enable if you need this for debugging */ +/* try to open an old-style state file for given file. If the state file does not + * exist or cannot be read, an error is returned. + */ +static rsRetVal +OLD_openFileWithStateFile(act_obj_t *const act) +{ + DEFiRet; + strm_t *psSF = NULL; + uchar pszSFNam[MAXFNAME]; + size_t lenSFNam; + struct stat stat_buf; + uchar statefile[MAXFNAME]; + const instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? + + uchar *const statefn = OLD_getStateFileName(inst, statefile, sizeof(statefile)); + DBGPRINTF("OLD_openFileWithStateFile: trying to open state for '%s', state file '%s'\n", + act->name, statefn); + + /* Get full path and file name */ + lenSFNam = getFullStateFileName(statefn, pszSFNam, sizeof(pszSFNam)); + + /* check if the file exists */ + if(stat((char*) pszSFNam, &stat_buf) == -1) { + if(errno == ENOENT) { + DBGPRINTF("OLD_openFileWithStateFile: NO state file (%s) exists for '%s'\n", + pszSFNam, act->name); + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + } else { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("OLD_openFileWithStateFile: error trying to access state " + "file for '%s':%s\n", act->name, errStr); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } + + /* If we reach this point, we have a state file */ + + DBGPRINTF("old state file found - instantiating from it\n"); + CHKiRet(strm.Construct(&psSF)); + CHKiRet(strm.SettOperationsMode(psSF, STREAMMODE_READ)); + CHKiRet(strm.SetsType(psSF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psSF, pszSFNam, lenSFNam)); + CHKiRet(strm.SetFileNotFoundError(psSF, inst->fileNotFoundError)); + CHKiRet(strm.ConstructFinalize(psSF)); + + /* read back in the object */ + CHKiRet(obj.Deserialize(&act->pStrm, (uchar*) "strm", psSF, NULL, act)); + free(act->pStrm->pszFName); + CHKmalloc(act->pStrm->pszFName = ustrdup(act->name)); + + strm.CheckFileChange(act->pStrm); + CHKiRet(strm.SeekCurrOffs(act->pStrm)); + + /* we now persist the new state file and delete the old one, so we will + * never have to deal with the old one. */ + persistStrmState(act); + unlink((char*)pszSFNam); + +finalize_it: + if(psSF != NULL) + strm.Destruct(&psSF); + RETiRet; +} +#endif /* #ifdef ENABLE_V1_STATE_FILE_FORMAT_SUPPORT */ + + +#ifdef HAVE_INOTIFY_INIT +#if ULTRA_DEBUG == 1 static void -dbg_wdmapPrint(char *msg) +dbg_wdmapPrint(const char *msg) { int i; DBGPRINTF("%s\n", msg); for(i = 0 ; i < nWdmap ; ++i) - DBGPRINTF("wdmap[%d]: wd: %d, file %d, dir %d\n", i, - wdmap[i].wd, wdmap[i].fIdx, wdmap[i].dirIdx); + DBGPRINTF("wdmap[%d]: wd: %d, act %p, name: %s\n", + i, wdmap[i].wd, wdmap[i].act, wdmap[i].act->name); } #endif @@ -324,48 +401,10 @@ finalize_it: RETiRet; } -/* looks up a wdmap entry by dirIdx and returns it's index if found - * or -1 if not found. - */ -static int -wdmapLookupListner(lstn_t* pLstn) -{ - int i = 0; - int wd = -1; - /* Loop through */ - for(i = 0 ; i < nWdmap; ++i) { - if (wdmap[i].pLstn == pLstn) - wd = wdmap[i].wd; - } - - return wd; -} - -/* compare function for bsearch() */ -static int -wdmap_cmp(const void *k, const void *a) -{ - int key = *((int*) k); - wd_map_t *etry = (wd_map_t*) a; - if(key < etry->wd) - return -1; - else if(key > etry->wd) - return 1; - else - return 0; -} -/* looks up a wdmap entry and returns it's index if found - * or -1 if not found. - */ -static wd_map_t * -wdmapLookup(int wd) -{ - return bsearch(&wd, wdmap, nWdmap, sizeof(wd_map_t), wdmap_cmp); -} /* note: we search backwards, as inotify tends to return increasing wd's */ static rsRetVal -wdmapAdd(int wd, const int dirIdx, lstn_t *const pLstn) +wdmapAdd(int wd, act_obj_t *const act) { wd_map_t *newmap; int newmapsize; @@ -375,7 +414,7 @@ wdmapAdd(int wd, const int dirIdx, lstn_t *const pLstn) for(i = nWdmap-1 ; i >= 0 && wdmap[i].wd > wd ; --i) ; /* just scan */ if(i >= 0 && wdmap[i].wd == wd) { - DBGPRINTF("imfile: wd %d already in wdmap!\n", wd); + LogError(0, RS_RET_INTERNAL_ERROR, "imfile: wd %d already in wdmap!", wd); ABORT_FINALIZE(RS_RET_FILE_ALREADY_IN_TABLE); } ++i; @@ -392,17 +431,59 @@ wdmapAdd(int wd, const int dirIdx, lstn_t *const pLstn) memmove(wdmap + i + 1, wdmap + i, sizeof(wd_map_t) * (nWdmap - i)); } wdmap[i].wd = wd; - wdmap[i].dirIdx = dirIdx; - wdmap[i].pLstn = pLstn; + wdmap[i].act = act; ++nWdmap; - DBGPRINTF("imfile: enter into wdmap[%d]: wd %d, dir %d, lstn %s:%s\n",i,wd,dirIdx, - (pLstn == NULL) ? "DIRECTORY" : "FILE", - (pLstn == NULL) ? dirs[dirIdx].dirName : pLstn->pszFileName); + DBGPRINTF("add wdmap[%d]: wd %d, act obj %p, path %s\n", i, wd, act, act->name); finalize_it: RETiRet; } +/* return wd or -1 on error */ +static int +in_setupWatch(act_obj_t *const act, const int is_file) +{ + int wd = -1; + if(runModConf->opMode != OPMODE_INOTIFY) + goto done; + + wd = inotify_add_watch(ino_fd, act->name, + (is_file) ? IN_MODIFY|IN_DONT_FOLLOW : IN_CREATE|IN_DELETE|IN_MOVED_FROM|IN_MOVED_TO); + if(wd < 0) { /* There is high probability of selinux denial on top-level paths */ + if (errno != EACCES) + LogError(errno, RS_RET_IO_ERROR, "imfile: cannot watch object '%s'", act->name); + else + DBGPRINTF("Access denied when creating watch on '%s'\n", act->name); + goto done; + } + wdmapAdd(wd, act); + DBGPRINTF("in_setupWatch: watch %d added for %s(object %p)\n", wd, act->name, act); +done: return wd; +} + +/* compare function for bsearch() */ +static int +wdmap_cmp(const void *k, const void *a) +{ + int key = *((int*) k); + wd_map_t *etry = (wd_map_t*) a; + if(key < etry->wd) + return -1; + else if(key > etry->wd) + return 1; + else + return 0; +} +/* looks up a wdmap entry and returns it's index if found + * or -1 if not found. + */ +static wd_map_t * +wdmapLookup(int wd) +{ + return bsearch(&wd, wdmap, nWdmap, sizeof(wd_map_t), wdmap_cmp); +} + + static rsRetVal wdmapDel(const int wd) { @@ -427,46 +506,570 @@ finalize_it: RETiRet; } -#endif /* #if HAVE_INOTIFY_INIT */ +#endif // #ifdef HAVE_INOTIFY_INIT + +static void +fen_setupWatch(act_obj_t *const __attribute__((unused)) act) +{ + DBGPRINTF("fen_setupWatch: DUMMY CALLED - not on Solaris?\n"); +} + +static void +fs_node_print(const fs_node_t *const node, const int level) +{ + fs_edge_t *chld; + act_obj_t *act; + dbgprintf("node print[%2.2d]: %p edges:\n", level, node); + + for(chld = node->edges ; chld != NULL ; chld = chld->next) { + dbgprintf("node print[%2.2d]: child %p '%s' isFile %d, path: '%s'\n", + level, chld->node, chld->name, chld->is_file, chld->path); + for(int i = 0 ; i < chld->ninst ; ++i) { + dbgprintf("\tinst: %p\n", chld->instarr[i]); + } + for(act = chld->active ; act != NULL ; act = act->next) { + dbgprintf("\tact : %p\n", act); + dbgprintf("\tact : %p: name '%s', wd: %d\n", + act, act->name, act->wd); + } + } + for(chld = node->edges ; chld != NULL ; chld = chld->next) { + fs_node_print(chld->node, level+1); + } +} + +/* add a new file system object if it not yet exists, ignore call + * if it already does. + */ +static rsRetVal +act_obj_add(fs_edge_t *const edge, const char *const name, const int is_file, + const ino_t ino, const int is_symlink, const char *const source) +{ + act_obj_t *act; + char basename[MAXFNAME]; + DEFiRet; + + DBGPRINTF("act_obj_add: edge %p, name '%s' (source '%s')\n", edge, name, source? source : "---"); + for(act = edge->active ; act != NULL ; act = act->next) { + if(!strcmp(act->name, name)) { + if (!source || !act->source_name || !strcmp(act->source_name, source)) { + DBGPRINTF("active object '%s' already exists in '%s' - no need to add\n", + name, edge->path); + FINALIZE; + } + } + } + DBGPRINTF("add new active object '%s' in '%s'\n", name, edge->path); + CHKmalloc(act = calloc(sizeof(act_obj_t), 1)); + CHKmalloc(act->name = strdup(name)); + if (-1 == getBasename((uchar*)basename, (uchar*)name)) { + CHKmalloc(act->basename = strdup(name)); /* assume basename is same as name */ + } else { + CHKmalloc(act->basename = strdup(basename)); + } + act->edge = edge; + act->ino = ino; + act->is_symlink = is_symlink; + if (source) { /* we are target of symlink */ + CHKmalloc(act->source_name = strdup(source)); + } else { + act->source_name = NULL; + } + #ifdef HAVE_INOTIFY_INIT + act->wd = in_setupWatch(act, is_file); + #endif + fen_setupWatch(act); + if(is_file && !is_symlink) { + const instanceConf_t *const inst = edge->instarr[0];// TODO: same file, multiple instances? + CHKiRet(ratelimitNew(&act->ratelimiter, "imfile", name)); + CHKmalloc(act->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(smsg_t *))); + act->multiSub.maxElem = inst->nMultiSub; + act->multiSub.nElem = 0; + pollFile(act); + } + + /* all well, add to active list */ + if(edge->active != NULL) { + edge->active->prev = act; + } + act->next = edge->active; + edge->active = act; +//dbgprintf("printout of fs tree after act_obj_add for '%s'\n", name); +//fs_node_print(runModConf->conf_tree, 0); +//dbg_wdmapPrint("wdmap after act_obj_add"); +finalize_it: + if(iRet != RS_RET_OK) { + if(act != NULL) { + free(act->name); + free(act); + } + } + RETiRet; +} + + +/* this walks an edges active list and detects and acts on any changes + * seen there. It does NOT detect newly appeared files, as they are not + * inside the active list! + */ +static void +detect_updates(fs_edge_t *const edge) +{ + act_obj_t *act; + struct stat fileInfo; + int restart = 0; + + for(act = edge->active ; act != NULL ; ) { + DBGPRINTF("detect_updates checking active obj '%s'\n", act->name); + const int r = lstat(act->name, &fileInfo); + if(r == -1) { /* object gone away? */ + DBGPRINTF("object gone away, unlinking: '%s'\n", act->name); + act_obj_unlink(act); + restart = 1; + break; + } + // TODO: add inode check for change notification! + + /* Note: active nodes may get deleted, so we need to do the + * pointer advancement at the end of the for loop! + */ + act = act->next; + } + if (restart) + detect_updates(edge); +} + + +/* check if active files need to be processed. This is only needed in + * polling mode. + */ +static void +poll_active_files(fs_edge_t *const edge) +{ + if( runModConf->opMode != OPMODE_POLLING + || !edge->is_file + || glbl.GetGlobalInputTermState() != 0) { + return; + } + + act_obj_t *act; + for(act = edge->active ; act != NULL ; act = act->next) { + fen_setupWatch(act); + DBGPRINTF("poll_active_files: polling '%s'\n", act->name); + pollFile(act); + } +} + +static rsRetVal +process_symlink(fs_edge_t *const chld, const char *symlink) +{ + DEFiRet; + char *target = NULL; + CHKmalloc(target = realpath(symlink, target)); + struct stat fileInfo; + if(lstat(target, &fileInfo) != 0) { + LogError(errno, RS_RET_ERR, "imfile: process_symlink cannot stat file '%s' - ignored", target); + FINALIZE; + } + const int is_file = (S_ISREG(fileInfo.st_mode)); + DBGPRINTF("process_symlink: found '%s', File: %d (config file: %d), symlink: %d\n", + target, is_file, chld->is_file, 0); + if (act_obj_add(chld, target, is_file, fileInfo.st_ino, 0, symlink) == RS_RET_OK) { + /* need to watch parent target as well for proper rotation support */ + uint idx = ustrlen(chld->active->name) - ustrlen(chld->active->basename); + if (idx) { /* basename is different from name */ + char parent[MAXFNAME]; + memcpy(parent, chld->active->name, idx-1); + parent[idx-1] = '\0'; + if(lstat(parent, &fileInfo) != 0) { + LogError(errno, RS_RET_ERR, + "imfile: process_symlink: cannot stat directory '%s' - ignored", parent); + FINALIZE; + } + DBGPRINTF("process_symlink: adding parent '%s' of target '%s'\n", parent, target); + act_obj_add(chld->parent->root->edges, parent, 0, fileInfo.st_ino, 0, NULL); + } + } + +finalize_it: + free(target); + RETiRet; +} + +static void +poll_tree(fs_edge_t *const chld) +{ + struct stat fileInfo; + glob_t files; + int issymlink; + DBGPRINTF("poll_tree: chld %p, name '%s', path: %s\n", chld, chld->name, chld->path); + detect_updates(chld); + const int ret = glob((char*)chld->path, runModConf->sortFiles|GLOB_BRACE, NULL, &files); + DBGPRINTF("poll_tree: glob returned %d\n", ret); + if(ret == 0) { + DBGPRINTF("poll_tree: processing %d files\n", (int) files.gl_pathc); + for(unsigned i = 0 ; i < files.gl_pathc ; i++) { + if(glbl.GetGlobalInputTermState() != 0) { + goto done; + } + char *const file = files.gl_pathv[i]; + if(lstat(file, &fileInfo) != 0) { + LogError(errno, RS_RET_ERR, + "imfile: poll_tree cannot stat file '%s' - ignored", file); + continue; + } + + if (S_ISLNK(fileInfo.st_mode)) { + rsRetVal slink_ret = process_symlink(chld, file); + if (slink_ret != RS_RET_OK) { + continue; + } + issymlink = 1; + } else { + issymlink = 0; + } + const int is_file = (S_ISREG(fileInfo.st_mode) || issymlink); + DBGPRINTF("poll_tree: found '%s', File: %d (config file: %d), symlink: %d\n", + file, is_file, chld->is_file, issymlink); + if(!is_file && S_ISREG(fileInfo.st_mode)) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, + "imfile: '%s' is neither a regular file, symlink, nor a " + "directory - ignored", file); + continue; + } + if(chld->is_file != is_file) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, + "imfile: '%s' is %s but %s expected - ignored", + file, (is_file) ? "FILE" : "DIRECTORY", + (chld->is_file) ? "FILE" : "DIRECTORY"); + continue; + } + act_obj_add(chld, file, is_file, fileInfo.st_ino, issymlink, NULL); + } + globfree(&files); + } + + poll_active_files(chld); + +done: return; +} + +#ifdef HAVE_INOTIFY_INIT // TODO: shouldn't we use that in polling as well? +static void +poll_timeouts(fs_edge_t *const edge) +{ + if(edge->is_file) { + act_obj_t *act; + for(act = edge->active ; act != NULL ; act = act->next) { + if(strmReadMultiLine_isTimedOut(act->pStrm)) { + DBGPRINTF("timeout occured on %s\n", act->name); + pollFile(act); + } + } + } +} +#endif + + +/* destruct a single act_obj object */ +static void +act_obj_destroy(act_obj_t *const act, const int is_deleted) +{ + uchar *statefn; + uchar statefile[MAXFNAME]; + uchar toDel[MAXFNAME]; + + if(act == NULL) + return; + + DBGPRINTF("act_obj_destroy: act %p '%s', (source '%s'), wd %d, pStrm %p, is_deleted %d, in_move %d\n", + act, act->name, act->source_name? act->source_name : "---", act->wd, act->pStrm, is_deleted, act->in_move); + if(act->is_symlink && is_deleted) { + act_obj_t *target_act; + for(target_act = act->edge->active ; target_act != NULL ; target_act = target_act->next) { + if(target_act->source_name && !strcmp(target_act->source_name, act->name)) { + DBGPRINTF("act_obj_destroy: unlinking slink target %s of %s " + "symlink\n", target_act->name, act->name); + act_obj_unlink(target_act); + break; + } + } + } + if(act->ratelimiter != NULL) { + ratelimitDestruct(act->ratelimiter); + } + if(act->pStrm != NULL) { + const instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? + pollFile(act); /* get any left-over data */ + if(inst->bRMStateOnDel) { + statefn = getStateFileName(act, statefile, sizeof(statefile)); + getFullStateFileName(statefn, toDel, sizeof(toDel)); + statefn = toDel; + } + persistStrmState(act); + strm.Destruct(&act->pStrm); + /* we delete state file after destruct in case strm obj initiated a write */ + if(is_deleted && !act->in_move && inst->bRMStateOnDel) { + DBGPRINTF("act_obj_destroy: deleting state file %s\n", statefn); + unlink((char*)statefn); + } + } + #ifdef HAVE_INOTIFY_INIT + if(act->wd != -1) { + wdmapDel(act->wd); + } + #endif + #if defined(OS_SOLARIS) && defined (HAVE_PORT_SOURCE_FILE) + if(act->pfinf != NULL) { + free(act->pfinf->fobj.fo_name); + free(act->pfinf); + } + #endif + free(act->basename); + free(act->source_name); + //free(act->statefile); + free(act->multiSub.ppMsgs); + #if defined(OS_SOLARIS) && defined (HAVE_PORT_SOURCE_FILE) + act->is_deleted = 1; + #else + free(act->name); + free(act); + #endif +} + +/* destroy complete act list starting at given node */ +static void +act_obj_destroy_all(act_obj_t *act) +{ + if(act == NULL) + return; + + DBGPRINTF("act_obj_destroy_all: act %p '%s', wd %d, pStrm %p\n", act, act->name, act->wd, act->pStrm); + while(act != NULL) { + act_obj_t *const toDel = act; + act = act->next; + act_obj_destroy(toDel, 0); + } +} + +#if 0 +/* debug: find if ptr is still present in list */ +static void +chk_active(const act_obj_t *act, const act_obj_t *const deleted) +{ + while(act != NULL) { + DBGPRINTF("chk_active %p vs %p\n", act, deleted); + if(act->prev == deleted) + DBGPRINTF("chk_active %p prev points to %p\n", act, deleted); + if(act->next == deleted) + DBGPRINTF("chk_active %p next points to %p\n", act, deleted); + act = act->next; + DBGPRINTF("chk_active next %p\n", act); + } +} +#endif + +/* unlink act object from linked list and then + * destruct it. + */ +static void +act_obj_unlink(act_obj_t *act) +{ + DBGPRINTF("act_obj_unlink %p: %s\n", act, act->name); + if(act->prev == NULL) { + act->edge->active = act->next; + } else { + act->prev->next = act->next; + } + if(act->next != NULL) { + act->next->prev = act->prev; + } + act_obj_destroy(act, 1); + act = NULL; +//dbgprintf("printout of fs tree post unlink\n"); +//fs_node_print(runModConf->conf_tree, 0); +//dbg_wdmapPrint("wdmap after"); +} + +static void +fs_node_destroy(fs_node_t *const node) +{ + fs_edge_t *edge; + DBGPRINTF("node destroy: %p edges:\n", node); + + for(edge = node->edges ; edge != NULL ; ) { + fs_node_destroy(edge->node); + fs_edge_t *const toDel = edge; + edge = edge->next; + act_obj_destroy_all(toDel->active); + free(toDel->name); + free(toDel->path); + free(toDel->instarr); + free(toDel); + } + free(node); +} + +static void +fs_node_walk(fs_node_t *const node, + void (*f_usr)(fs_edge_t*const)) +{ + DBGPRINTF("node walk: %p edges:\n", node); + + fs_edge_t *edge; + for(edge = node->edges ; edge != NULL ; edge = edge->next) { + DBGPRINTF("node walk: child %p '%s'\n", edge->node, edge->name); + f_usr(edge); + fs_node_walk(edge->node, f_usr); + } +} + + + +/* add a file system object to config tree (or update existing node with new monitor) + */ +static rsRetVal +fs_node_add(fs_node_t *const node, fs_node_t *const source, + const uchar *const toFind, + const size_t pathIdx, + instanceConf_t *const inst) +{ + DEFiRet; + fs_edge_t *newchld = NULL; + int i; + + DBGPRINTF("fs_node_add(%p, '%s') enter, idx %zd\n", + node, toFind+pathIdx, pathIdx); + assert(toFind[0] != '\0'); + for(i = pathIdx ; (toFind[i] != '\0') && (toFind[i] != '/') ; ++i) + /*JUST SKIP*/; + const int isFile = (toFind[i] == '\0') ? 1 : 0; + uchar ourPath[PATH_MAX]; + if(i == 0) { + ourPath[0] = '/'; + ourPath[1] = '\0'; + } else { + memcpy(ourPath, toFind, i); + ourPath[i] = '\0'; + } + const size_t nextPathIdx = i+1; + const size_t len = i - pathIdx; + uchar name[PATH_MAX]; + memcpy(name, toFind+pathIdx, len); + name[len] = '\0'; + DBGPRINTF("fs_node_add: name '%s'\n", name); node->root = source; + + fs_edge_t *chld; + for(chld = node->edges ; chld != NULL ; chld = chld->next) { + if(!ustrcmp(chld->name, name)) { + DBGPRINTF("fs_node_add(%p, '%s') found '%s'\n", chld->node, toFind, name); + /* add new instance */ + chld->ninst++; + CHKmalloc(chld->instarr = realloc(chld->instarr, sizeof(instanceConf_t*) * chld->ninst)); + chld->instarr[chld->ninst-1] = inst; + /* recurse */ + if(!isFile) { + CHKiRet(fs_node_add(chld->node, node, toFind, nextPathIdx, inst)); + } + FINALIZE; + } + } + + /* could not find node --> add it */ + DBGPRINTF("fs_node_add(%p, '%s') did not find '%s' - adding it\n", + node, toFind, name); + CHKmalloc(newchld = calloc(sizeof(fs_edge_t), 1)); + CHKmalloc(newchld->name = ustrdup(name)); + CHKmalloc(newchld->node = calloc(sizeof(fs_node_t), 1)); + CHKmalloc(newchld->path = ustrdup(ourPath)); + CHKmalloc(newchld->instarr = calloc(sizeof(instanceConf_t*), 1)); + newchld->instarr[0] = inst; + newchld->is_file = isFile; + newchld->ninst = 1; + newchld->parent = node; + + DBGPRINTF("fs_node_add(%p, '%s') returns %p\n", node, toFind, newchld->node); + + if(!isFile) { + CHKiRet(fs_node_add(newchld->node, node, toFind, nextPathIdx, inst)); + } + + /* link to list */ + newchld->next = node->edges; + node->edges = newchld; +finalize_it: + if(iRet != RS_RET_OK) { + if(newchld != NULL) { + free(newchld->name); + free(newchld->node); + free(newchld->path); + free(newchld->instarr); + free(newchld); + } + } + RETiRet; +} + +/* Helper function to combine statefile and workdir + * This function is guranteed to work only on config data and DOES NOT + * open or otherwise modify disk file state. + */ +static int +getFullStateFileName(const uchar *const pszstatefile, uchar *const pszout, const size_t ilenout) +{ + int lenout; + const uchar* pszworkdir; -/* this generates a state file name suitable for the current file. To avoid + /* Get Raw Workdir, if it is NULL we need to propper handle it */ + pszworkdir = glblGetWorkDirRaw(); + + /* Construct file name */ + lenout = snprintf((char*)pszout, ilenout, "%s/%s", + (char*) (pszworkdir == NULL ? "." : (char*) pszworkdir), (char*)pszstatefile); + + /* return out length */ + return lenout; +} + + +/* this generates a state file name suitable for the given file. To avoid * malloc calls, it must be passed a buffer which should be MAXFNAME large. * Note: the buffer is not necessarily populated ... always ONLY use the * RETURN VALUE! + * This function is guranteed to work only on config data and DOES NOT + * open or otherwise modify disk file state. */ static uchar * -getStateFileName(lstn_t *const __restrict__ pLstn, +getStateFileName(const act_obj_t *const act, uchar *const __restrict__ buf, const size_t lenbuf) { - uchar *ret; - if(pLstn->pszStateFile == NULL) { - snprintf((char*)buf, lenbuf - 1, "imfile-state:%s", pLstn->pszFileName); - buf[lenbuf-1] = '\0'; /* be on the safe side... */ - uchar *p = buf; - for( ; *p ; ++p) { - if(*p == '/') - *p = '-'; - } - ret = buf; - } else { - ret = pLstn->pszStateFile; - } - return ret; + DBGPRINTF("getStateFileName for '%s'\n", act->name); + snprintf((char*)buf, lenbuf - 1, "imfile-state:%lld", (long long) act->ino); + DBGPRINTF("getStateFileName: stat file name now is %s\n", buf); + return buf; } /* enqueue the read file line as a message. The provided string is - * not freed - thuis must be done by the caller. + * not freed - this must be done by the caller. */ -static rsRetVal enqLine(lstn_t *const __restrict__ pLstn, - cstr_t *const __restrict__ cstrLine) +#define MAX_OFFSET_REPRESENTATION_NUM_BYTES 20 +static rsRetVal +enqLine(act_obj_t *const act, + cstr_t *const __restrict__ cstrLine, + const int64 strtOffs) { DEFiRet; + const instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? smsg_t *pMsg; + uchar file_offset[MAX_OFFSET_REPRESENTATION_NUM_BYTES+1]; + const uchar *metadata_names[2] = {(uchar *)"filename",(uchar *)"fileoffset"} ; + const uchar *metadata_values[2] ; + const size_t msgLen = cstrLen(cstrLine); - if(rsCStrLen(cstrLine) == 0) { + if(msgLen == 0) { /* we do not process empty lines */ FINALIZE; } @@ -474,27 +1180,34 @@ static rsRetVal enqLine(lstn_t *const __restrict__ pLstn, CHKiRet(msgConstruct(&pMsg)); MsgSetFlowControlType(pMsg, eFLOWCTL_FULL_DELAY); MsgSetInputName(pMsg, pInputName); - if (pLstn->addCeeTag) { - size_t msgLen = cstrLen(cstrLine); - const char *const ceeToken = "@cee:"; - size_t ceeMsgSize = msgLen + strlen(ceeToken) +1; + if(inst->addCeeTag) { + /* Make sure we account for terminating null byte */ + size_t ceeMsgSize = msgLen + CONST_LEN_CEE_COOKIE + 1; char *ceeMsg; CHKmalloc(ceeMsg = MALLOC(ceeMsgSize)); - strcpy(ceeMsg, ceeToken); + strcpy(ceeMsg, CONST_CEE_COOKIE); strcat(ceeMsg, (char*)rsCStrGetSzStrNoNULL(cstrLine)); MsgSetRawMsg(pMsg, ceeMsg, ceeMsgSize); free(ceeMsg); } else { - MsgSetRawMsg(pMsg, (char*)rsCStrGetSzStrNoNULL(cstrLine), cstrLen(cstrLine)); + MsgSetRawMsg(pMsg, (char*)rsCStrGetSzStrNoNULL(cstrLine), msgLen); } MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); - MsgSetTAG(pMsg, pLstn->pszTag, pLstn->lenTag); - msgSetPRI(pMsg, pLstn->iFacility | pLstn->iSeverity); - MsgSetRuleset(pMsg, pLstn->pRuleset); - if(pLstn->addMetadata) - msgAddMetadata(pMsg, (uchar*)"filename", pLstn->pszFileName); - ratelimitAddMsg(pLstn->ratelimiter, &pLstn->multiSub, pMsg); + MsgSetTAG(pMsg, inst->pszTag, inst->lenTag); + msgSetPRI(pMsg, inst->iFacility | inst->iSeverity); + MsgSetRuleset(pMsg, inst->pBindRuleset); + if(inst->addMetadata) { + if (act->source_name) { + metadata_values[0] = (const uchar*)act->source_name; + } else { + metadata_values[0] = (const uchar*)act->name; + } + snprintf((char *)file_offset, MAX_OFFSET_REPRESENTATION_NUM_BYTES+1, "%lld", strtOffs); + metadata_values[1] = file_offset; + msgAddMultiMetadata(pMsg, metadata_names, metadata_values, 2); + } + ratelimitAddMsg(act->ratelimiter, &act->multiSub, pMsg); finalize_it: RETiRet; } @@ -504,70 +1213,89 @@ finalize_it: * exist or cannot be read, an error is returned. */ static rsRetVal -openFileWithStateFile(lstn_t *const __restrict__ pLstn) +openFileWithStateFile(act_obj_t *const act) { DEFiRet; - strm_t *psSF = NULL; uchar pszSFNam[MAXFNAME]; - size_t lenSFNam; - struct stat stat_buf; uchar statefile[MAXFNAME]; + int fd = -1; + const instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? - uchar *const statefn = getStateFileName(pLstn, statefile, sizeof(statefile)); - DBGPRINTF("imfile: trying to open state for '%s', state file '%s'\n", - pLstn->pszFileName, statefn); - /* Construct file name */ - lenSFNam = snprintf((char*)pszSFNam, sizeof(pszSFNam), "%s/%s", - (char*) glbl.GetWorkDir(), (char*)statefn); + uchar *const statefn = getStateFileName(act, statefile, sizeof(statefile)); + + getFullStateFileName(statefn, pszSFNam, sizeof(pszSFNam)); + DBGPRINTF("trying to open state for '%s', state file '%s'\n", act->name, pszSFNam); /* check if the file exists */ - if(stat((char*) pszSFNam, &stat_buf) == -1) { + fd = open((char*)pszSFNam, O_CLOEXEC | O_NOCTTY | O_RDONLY, 0600); + if(fd < 0) { if(errno == ENOENT) { - DBGPRINTF("imfile: NO state file exists for '%s'\n", pLstn->pszFileName); - ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + DBGPRINTF("NO state file (%s) exists for '%s' - trying to see if " + "old-style file exists\n", pszSFNam, act->name); + CHKiRet(OLD_openFileWithStateFile(act)); + FINALIZE; } else { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - DBGPRINTF("imfile: error trying to access state file for '%s':%s\n", - pLstn->pszFileName, errStr); + LogError(errno, RS_RET_IO_ERROR, + "imfile error trying to access state file for '%s'", + act->name); ABORT_FINALIZE(RS_RET_IO_ERROR); } } - /* If we reach this point, we have a state file */ + CHKiRet(strm.Construct(&act->pStrm)); - CHKiRet(strm.Construct(&psSF)); - CHKiRet(strm.SettOperationsMode(psSF, STREAMMODE_READ)); - CHKiRet(strm.SetsType(psSF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psSF, pszSFNam, lenSFNam)); - CHKiRet(strm.ConstructFinalize(psSF)); + struct json_object *jval; + struct json_object *json = fjson_object_from_fd(fd); + if(json == NULL) { + LogError(0, RS_RET_ERR, "imfile: error reading state file for '%s'", act->name); + } - /* read back in the object */ - CHKiRet(obj.Deserialize(&pLstn->pStrm, (uchar*) "strm", psSF, NULL, pLstn)); - DBGPRINTF("imfile: deserialized state file, state file base name '%s', " - "configured base name '%s'\n", pLstn->pStrm->pszFName, - pLstn->pszFileName); - if(ustrcmp(pLstn->pStrm->pszFName, pLstn->pszFileName)) { - errmsg.LogError(0, RS_RET_STATEFILE_WRONG_FNAME, "imfile: state file '%s' " - "contains file name '%s', but is used for file '%s'. State " - "file deleted, starting from begin of file.", - pszSFNam, pLstn->pStrm->pszFName, pLstn->pszFileName); + /* we access some data items a bit dirty, as we need to refactor the whole + * thing in any case - TODO + */ + /* Note: we ignore filname property - it is just an aid to the user. Most + * importantly it *is wrong* after a file move! + */ + fjson_object_object_get_ex(json, "prev_was_nl", &jval); + act->pStrm->bPrevWasNL = fjson_object_get_int(jval); - unlink((char*)pszSFNam); - ABORT_FINALIZE(RS_RET_STATEFILE_WRONG_FNAME); + fjson_object_object_get_ex(json, "curr_offs", &jval); + act->pStrm->iCurrOffs = fjson_object_get_int64(jval); + + fjson_object_object_get_ex(json, "strt_offs", &jval); + act->pStrm->strtOffs = fjson_object_get_int64(jval); + + fjson_object_object_get_ex(json, "prev_line_segment", &jval); + const uchar *const prev_line_segment = (const uchar*)fjson_object_get_string(jval); + if(jval != NULL) { + CHKiRet(rsCStrConstructFromszStr(&act->pStrm->prevLineSegment, prev_line_segment)); + cstrFinalize(act->pStrm->prevLineSegment); + uchar *ret = rsCStrGetSzStrNoNULL(act->pStrm->prevLineSegment); + DBGPRINTF("prev_line_segment present in state file 2, is: %s\n", ret); } - strm.CheckFileChange(pLstn->pStrm); - CHKiRet(strm.SeekCurrOffs(pLstn->pStrm)); + fjson_object_object_get_ex(json, "prev_msg_segment", &jval); + const uchar *const prev_msg_segment = (const uchar*)fjson_object_get_string(jval); + if(jval != NULL) { + CHKiRet(rsCStrConstructFromszStr(&act->pStrm->prevMsgSegment, prev_msg_segment)); + cstrFinalize(act->pStrm->prevMsgSegment); + uchar *ret = rsCStrGetSzStrNoNULL(act->pStrm->prevMsgSegment); + DBGPRINTF("prev_msg_segment present in state file 2, is: %s\n", ret); + } + fjson_object_put(json); - /* note: we do not delete the state file, so that the last position remains - * known even in the case that rsyslogd aborts for some reason (like powerfail) - */ + CHKiRet(strm.SetFName(act->pStrm, (uchar*)act->name, strlen(act->name))); + CHKiRet(strm.SettOperationsMode(act->pStrm, STREAMMODE_READ)); + CHKiRet(strm.SetsType(act->pStrm, STREAMTYPE_FILE_MONITOR)); + CHKiRet(strm.SetFileNotFoundError(act->pStrm, inst->fileNotFoundError)); + CHKiRet(strm.ConstructFinalize(act->pStrm)); -finalize_it: - if(psSF != NULL) - strm.Destruct(&psSF); + CHKiRet(strm.SeekCurrOffs(act->pStrm)); +finalize_it: + if(fd >= 0) { + close(fd); + } RETiRet; } @@ -576,30 +1304,32 @@ finalize_it: * checked before calling it. */ static rsRetVal -openFileWithoutStateFile(lstn_t *const __restrict__ pLstn) +openFileWithoutStateFile(act_obj_t *const act) { DEFiRet; struct stat stat_buf; - DBGPRINTF("imfile: clean startup withOUT state file for '%s'\n", pLstn->pszFileName); - if(pLstn->pStrm != NULL) - strm.Destruct(&pLstn->pStrm); - CHKiRet(strm.Construct(&pLstn->pStrm)); - CHKiRet(strm.SettOperationsMode(pLstn->pStrm, STREAMMODE_READ)); - CHKiRet(strm.SetsType(pLstn->pStrm, STREAMTYPE_FILE_MONITOR)); - CHKiRet(strm.SetFName(pLstn->pStrm, pLstn->pszFileName, strlen((char*) pLstn->pszFileName))); - CHKiRet(strm.ConstructFinalize(pLstn->pStrm)); + const instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? + + DBGPRINTF("clean startup withOUT state file for '%s'\n", act->name); + if(act->pStrm != NULL) + strm.Destruct(&act->pStrm); + CHKiRet(strm.Construct(&act->pStrm)); + CHKiRet(strm.SettOperationsMode(act->pStrm, STREAMMODE_READ)); + CHKiRet(strm.SetsType(act->pStrm, STREAMTYPE_FILE_MONITOR)); + CHKiRet(strm.SetFName(act->pStrm, (uchar*)act->name, strlen(act->name))); + CHKiRet(strm.SetFileNotFoundError(act->pStrm, inst->fileNotFoundError)); + CHKiRet(strm.ConstructFinalize(act->pStrm)); /* As a state file not exist, this is a fresh start. seek to file end * when freshStartTail is on. */ - if(pLstn->freshStartTail){ - if(stat((char*) pLstn->pszFileName, &stat_buf) != -1) { - pLstn->pStrm->iCurrOffs = stat_buf.st_size; - CHKiRet(strm.SeekCurrOffs(pLstn->pStrm)); + if(inst->freshStartTail){ + if(stat((char*) act->name, &stat_buf) != -1) { + act->pStrm->iCurrOffs = stat_buf.st_size; + CHKiRet(strm.SeekCurrOffs(act->pStrm)); } } - strmSetReadTimeout(pLstn->pStrm, pLstn->readTimeout); finalize_it: RETiRet; @@ -608,17 +1338,18 @@ finalize_it: * if so, reading it in. Processing continues from the last know location. */ static rsRetVal -openFile(lstn_t *const __restrict__ pLstn) +openFile(act_obj_t *const act) { DEFiRet; + const instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? - CHKiRet_Hdlr(openFileWithStateFile(pLstn)) { - CHKiRet(openFileWithoutStateFile(pLstn)); + CHKiRet_Hdlr(openFileWithStateFile(act)) { + CHKiRet(openFileWithoutStateFile(act)); } - DBGPRINTF("imfile: breopenOnTruncate %d for '%s'\n", - pLstn->reopenOnTruncate, pLstn->pszFileName); - CHKiRet(strm.SetbReopenOnTruncate(pLstn->pStrm, pLstn->reopenOnTruncate)); + DBGPRINTF("breopenOnTruncate %d for '%s'\n", inst->reopenOnTruncate, act->name); + CHKiRet(strm.SetbReopenOnTruncate(act->pStrm, inst->reopenOnTruncate)); + strmSetReadTimeout(act->pStrm, inst->readTimeout); finalize_it: RETiRet; @@ -638,58 +1369,72 @@ static void pollFileCancelCleanup(void *pArg) } -/* poll a file, need to check file rollover etc. open file if not open */ -#if !defined(_AIX) -#pragma GCC diagnostic ignored "-Wempty-body" -#endif +/* pollFile needs to be split due to the unfortunate pthread_cancel_push() macros. */ static rsRetVal -pollFile(lstn_t *pLstn, int *pbHadFileData) +pollFileReal(act_obj_t *act, cstr_t **pCStr) { - cstr_t *pCStr = NULL; + int64 strtOffs; DEFiRet; - - /* Note: we must do pthread_cleanup_push() immediately, because the POXIS macros - * otherwise do not work if I include the _cleanup_pop() inside an if... -- rgerhards, 2008-08-14 - */ - pthread_cleanup_push(pollFileCancelCleanup, &pCStr); int nProcessed = 0; - if(pLstn->pStrm == NULL) { - CHKiRet(openFile(pLstn)); /* open file */ + + DBGPRINTF("pollFileReal enter, pStrm %p, name '%s'\n", act->pStrm, act->name); + DBGPRINTF("pollFileReal enter, edge %p\n", act->edge); + DBGPRINTF("pollFileReal enter, edge->instarr %p\n", act->edge->instarr); + + instanceConf_t *const inst = act->edge->instarr[0];// TODO: same file, multiple instances? + + if(act->pStrm == NULL) { + CHKiRet(openFile(act)); /* open file */ } /* loop below will be exited when strmReadLine() returns EOF */ while(glbl.GetGlobalInputTermState() == 0) { - if(pLstn->maxLinesAtOnce != 0 && nProcessed >= pLstn->maxLinesAtOnce) + if(inst->maxLinesAtOnce != 0 && nProcessed >= inst->maxLinesAtOnce) break; - if(pLstn->startRegex == NULL) { - CHKiRet(strm.ReadLine(pLstn->pStrm, &pCStr, pLstn->readMode, pLstn->escapeLF, pLstn->trimLineOverBytes)); + if(inst->startRegex == NULL) { + CHKiRet(strm.ReadLine(act->pStrm, pCStr, inst->readMode, inst->escapeLF, + inst->trimLineOverBytes, &strtOffs)); } else { - CHKiRet(strmReadMultiLine(pLstn->pStrm, &pCStr, &pLstn->end_preg, pLstn->escapeLF)); + CHKiRet(strmReadMultiLine(act->pStrm, pCStr, &inst->end_preg, + inst->escapeLF, &strtOffs)); } ++nProcessed; - if(pbHadFileData != NULL) - *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ - CHKiRet(enqLine(pLstn, pCStr)); /* process line */ - rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ - if(pLstn->iPersistStateInterval > 0 && pLstn->nRecords++ >= pLstn->iPersistStateInterval) { - persistStrmState(pLstn); - pLstn->nRecords = 0; + runModConf->bHadFileData = 1; /* this is just a flag, so set it and forget it */ + CHKiRet(enqLine(act, *pCStr, strtOffs)); /* process line */ + rsCStrDestruct(pCStr); /* discard string (must be done by us!) */ + if(inst->iPersistStateInterval > 0 && ++act->nRecords >= inst->iPersistStateInterval) { + persistStrmState(act); + act->nRecords = 0; } } finalize_it: - multiSubmitFlush(&pLstn->multiSub); - pthread_cleanup_pop(0); + multiSubmitFlush(&act->multiSub); - if(pCStr != NULL) { - rsCStrDestruct(&pCStr); + if(*pCStr != NULL) { + rsCStrDestruct(pCStr); } RETiRet; } -#if !defined(_AIX) -#pragma GCC diagnostic warning "-Wempty-body" -#endif + +/* poll a file, need to check file rollover etc. open file if not open */ +static rsRetVal +pollFile(act_obj_t *const act) +{ + cstr_t *pCStr = NULL; + DEFiRet; + if (act->is_symlink) { + FINALIZE; /* no reason to poll symlink file */ + } + /* Note: we must do pthread_cleanup_push() immediately, because the POSIX macros + * otherwise do not work if I include the _cleanup_pop() inside an if... -- rgerhards, 2008-08-14 + */ + pthread_cleanup_push(pollFileCancelCleanup, &pCStr); + iRet = pollFileReal(act, &pCStr); + pthread_cleanup_pop(0); +finalize_it: RETiRet; +} /* create input instance, set default parameters, and @@ -722,6 +1467,7 @@ createInstance(instanceConf_t **pinst) inst->addMetadata = ADD_METADATA_UNSPECIFIED; inst->addCeeTag = 0; inst->freshStartTail = 0; + inst->fileNotFoundError = 1; inst->readTimeout = loadModConf->readTimeout; /* node created, let's add to config */ @@ -767,19 +1513,11 @@ getBasename(uchar *const __restrict__ basen, uchar *const __restrict__ path) } /* this function checks instance parameters and does some required pre-processing - * (e.g. split filename in path and actual name) - * Note: we do NOT use dirname()/basename() as they have portability problems. */ static rsRetVal -checkInstance(instanceConf_t *inst) +checkInstance(instanceConf_t *const inst) { - char dirn[MAXFNAME]; - uchar basen[MAXFNAME]; - int i; - struct stat sb; - int r; - int eno; - char errStr[512]; + uchar curr_wd[MAXFNAME]; DEFiRet; /* this is primarily for the clang static analyzer, but also @@ -788,36 +1526,37 @@ checkInstance(instanceConf_t *inst) if(inst->pszFileName == NULL) ABORT_FINALIZE(RS_RET_INTERNAL_ERROR); - i = getBasename(basen, inst->pszFileName); - if (i == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile: file path '%s' does not include a basename component", - inst->pszFileName); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } - - memcpy(dirn, inst->pszFileName, i); /* do not copy slash */ - dirn[i] = '\0'; - CHKmalloc(inst->pszFileBaseName = (uchar*) strdup((char*)basen)); - CHKmalloc(inst->pszDirName = (uchar*) strdup(dirn)); - - if(dirn[0] == '\0') { - dirn[0] = '/'; - dirn[1] = '\0'; - } - r = stat(dirn, &sb); - if(r != 0) { - eno = errno; - rs_strerror_r(eno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile warning: directory '%s': %s", - dirn, errStr); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } - if(!S_ISDIR(sb.st_mode)) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile warning: configured directory " - "'%s' is NOT a directory", dirn); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + CHKmalloc(inst->pszFileName_forOldStateFile = ustrdup(inst->pszFileName)); + if(loadModConf->normalizePath) { + if(inst->pszFileName[0] == '.' && inst->pszFileName[1] == '/') { + DBGPRINTF("imfile: removing heading './' from name '%s'\n", inst->pszFileName); + memmove(inst->pszFileName, inst->pszFileName+2, ustrlen(inst->pszFileName) - 1); + } + + if(inst->pszFileName[0] != '/') { + if(getcwd((char*)curr_wd, MAXFNAME) == NULL || curr_wd[0] != '/') { + LogError(errno, RS_RET_ERR, "imfile: error querying current working " + "directory - can not continue with %s", inst->pszFileName); + ABORT_FINALIZE(RS_RET_ERR); + } + const size_t len_curr_wd = ustrlen(curr_wd); + if(len_curr_wd + ustrlen(inst->pszFileName) + 1 >= MAXFNAME) { + LogError(0, RS_RET_ERR, "imfile: length of configured file and current " + "working directory exceeds permitted size - ignoring %s", + inst->pszFileName); + ABORT_FINALIZE(RS_RET_ERR); + } + curr_wd[len_curr_wd] = '/'; + strcpy((char*)curr_wd+len_curr_wd+1, (char*)inst->pszFileName); + free(inst->pszFileName); + CHKmalloc(inst->pszFileName = ustrdup(curr_wd)); + } } + dbgprintf("imfile: adding file monitor for '%s'\n", inst->pszFileName); + if(inst->pszTag != NULL) { + inst->lenTag = ustrlen(inst->pszTag); + } finalize_it: RETiRet; } @@ -869,140 +1608,14 @@ addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) inst->bRMStateOnDel = 0; inst->readTimeout = loadModConf->readTimeout; - CHKiRet(checkInstance(inst)); - - /* reset legacy system */ - cs.iPersistStateInterval = 0; - resetConfigVariables(NULL, NULL); /* values are both dummies */ - -finalize_it: - free(pNewVal); /* we do not need it, but we must free it! */ - RETiRet; -} - - -/* This adds a new listener object to the bottom of the list, but - * it does NOT initialize any data members except for the list - * pointers themselves. - */ -static rsRetVal -lstnAdd(lstn_t **newLstn) -{ - lstn_t *pLstn; - DEFiRet; - - CHKmalloc(pLstn = (lstn_t*) MALLOC(sizeof(lstn_t))); - if(runModConf->pRootLstn == NULL) { - runModConf->pRootLstn = pLstn; - pLstn->prev = NULL; - } else { - runModConf->pTailLstn->next = pLstn; - pLstn->prev = runModConf->pTailLstn; - } - runModConf->pTailLstn = pLstn; - pLstn->next = NULL; - *newLstn = pLstn; - -finalize_it: - RETiRet; -} - -/* delete a listener object */ -static void -lstnDel(lstn_t *pLstn) -{ - DBGPRINTF("imfile: lstnDel called for %s\n", pLstn->pszFileName); - if(pLstn->pStrm != NULL) { /* stream open? */ - persistStrmState(pLstn); - strm.Destruct(&(pLstn->pStrm)); - } - ratelimitDestruct(pLstn->ratelimiter); - free(pLstn->multiSub.ppMsgs); - free(pLstn->pszFileName); - free(pLstn->pszTag); - free(pLstn->pszStateFile); - free(pLstn->pszBaseName); - if(pLstn->startRegex != NULL) - regfree(&pLstn->end_preg); - - if(pLstn == runModConf->pRootLstn) - runModConf->pRootLstn = pLstn->next; - if(pLstn == runModConf->pTailLstn) - runModConf->pTailLstn = pLstn->prev; - if(pLstn->next != NULL) - pLstn->next->prev = pLstn->prev; - if(pLstn->prev != NULL) - pLstn->prev->next = pLstn->next; - free(pLstn); -} - -/* This function is called when a new listener shall be added. - * It also does some late stage error checking on the config - * and reports issues it finds. - */ -static rsRetVal -addListner(instanceConf_t *inst) -{ - DEFiRet; - lstn_t *pThis; - sbool hasWildcard; - - hasWildcard = containsGlobWildcard((char*)inst->pszFileBaseName); - if(hasWildcard) { - if(runModConf->opMode == OPMODE_POLLING) { - errmsg.LogError(0, RS_RET_IMFILE_WILDCARD, - "imfile: The to-be-monitored file \"%s\" contains " - "wildcards. This is not supported in " - "polling mode.", inst->pszFileName); - ABORT_FINALIZE(RS_RET_IMFILE_WILDCARD); - } else if(inst->pszStateFile != NULL) { - errmsg.LogError(0, RS_RET_IMFILE_WILDCARD, - "imfile: warning: it looks like to-be-monitored " - "file \"%s\" contains wildcards. This usually " - "does not work well with specifying a state file.", - inst->pszFileName); - } - } + CHKiRet(checkInstance(inst)); + + /* reset legacy system */ + cs.iPersistStateInterval = 0; + resetConfigVariables(NULL, NULL); /* values are both dummies */ - CHKiRet(lstnAdd(&pThis)); - pThis->hasWildcard = hasWildcard; - pThis->pszFileName = (uchar*) strdup((char*) inst->pszFileName); - pThis->pszDirName = inst->pszDirName; /* use memory from inst! */ - pThis->pszBaseName = (uchar*)strdup((char*)inst->pszFileBaseName); /* be consistent with expanded wildcards! */ - pThis->pszTag = (uchar*) strdup((char*) inst->pszTag); - pThis->lenTag = ustrlen(pThis->pszTag); - pThis->pszStateFile = inst->pszStateFile == NULL ? NULL : (uchar*) strdup((char*) inst->pszStateFile); - - CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)inst->pszFileName)); - CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(smsg_t *))); - pThis->multiSub.maxElem = inst->nMultiSub; - pThis->multiSub.nElem = 0; - pThis->iSeverity = inst->iSeverity; - pThis->iFacility = inst->iFacility; - pThis->maxLinesAtOnce = inst->maxLinesAtOnce; - pThis->trimLineOverBytes = inst->trimLineOverBytes; - pThis->iPersistStateInterval = inst->iPersistStateInterval; - pThis->readMode = inst->readMode; - pThis->startRegex = inst->startRegex; /* no strdup, as it is read-only */ - if(pThis->startRegex != NULL) - if(regcomp(&pThis->end_preg, (char*)pThis->startRegex, REG_EXTENDED)) { - DBGPRINTF("imfile: error regex compile\n"); - ABORT_FINALIZE(RS_RET_ERR); - } - pThis->bRMStateOnDel = inst->bRMStateOnDel; - pThis->escapeLF = inst->escapeLF; - pThis->reopenOnTruncate = inst->reopenOnTruncate; - pThis->addMetadata = (inst->addMetadata == ADD_METADATA_UNSPECIFIED) ? - hasWildcard : inst->addMetadata; - pThis->addCeeTag = inst->addCeeTag; - pThis->readTimeout = inst->readTimeout; - pThis->freshStartTail = inst->freshStartTail; - pThis->pRuleset = inst->pBindRuleset; - pThis->nRecords = 0; - pThis->pStrm = NULL; - pThis->prevLineSegment = NULL; - pThis->masterLstn = NULL; /* we *are* a master! */ finalize_it: + free(pNewVal); /* we do not need it, but we must free it! */ RETiRet; } @@ -1055,6 +1668,8 @@ CODESTARTnewInpInst inst->addCeeTag = (sbool) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "freshstarttail")) { inst->freshStartTail = (sbool) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "filenotfounderror")) { + inst->fileNotFoundError = (sbool) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "escapelf")) { inst->escapeLF = (sbool) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "reopenontruncate")) { @@ -1087,6 +1702,16 @@ CODESTARTnewInpInst "at the same time --- remove one of them"); ABORT_FINALIZE(RS_RET_PARAM_NOT_PERMITTED); } + + if(inst->startRegex != NULL) { + const int errcode = regcomp(&inst->end_preg, (char*)inst->startRegex, REG_EXTENDED); + if(errcode != 0) { + char errbuff[512]; + regerror(errcode, &inst->end_preg, errbuff, sizeof(errbuff)); + parser_errmsg("imfile: error in regex expansion: %s", errbuff); + ABORT_FINALIZE(RS_RET_ERR); + } + } if(inst->readTimeout != 0) loadModConf->haveReadTimeouts = 1; CHKiRet(checkInstance(inst)); @@ -1106,6 +1731,10 @@ CODESTARTbeginCnfLoad loadModConf->readTimeout = 0; /* default: no timeout */ loadModConf->timeoutGranularity = 1000; /* default: 1 second */ loadModConf->haveReadTimeouts = 0; /* default: no timeout */ + loadModConf->normalizePath = 1; + loadModConf->sortFiles = GLOB_NOSORT; + loadModConf->conf_tree = calloc(sizeof(fs_node_t), 1); + loadModConf->conf_tree->edges = NULL; bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ cs.pszFileName = NULL; @@ -1148,6 +1777,10 @@ CODESTARTsetModCnf } else if(!strcmp(modpblk.descr[i].name, "timeoutgranularity")) { /* note: we need ms, thus "* 1000" */ loadModConf->timeoutGranularity = (int) pvals[i].val.d.n * 1000; + } else if(!strcmp(modpblk.descr[i].name, "sortfiles")) { + loadModConf->sortFiles = ((sbool) pvals[i].val.d.n) ? 0 : GLOB_NOSORT; + } else if(!strcmp(modpblk.descr[i].name, "normalizepath")) { + loadModConf->normalizePath = (sbool) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "mode")) { if(!es_strconstcmp(pvals[i].val.d.estr, "polling")) loadModConf->opMode = OPMODE_POLLING; @@ -1217,19 +1850,31 @@ BEGINactivateCnf instanceConf_t *inst; CODESTARTactivateCnf runModConf = pModConf; - runModConf->pRootLstn = NULL, - runModConf->pTailLstn = NULL; + if(runModConf->root == NULL) { + LogError(0, NO_ERRCODE, "imfile: no file monitors configured, " + "input not activated.\n"); + ABORT_FINALIZE(RS_RET_NO_RUN); + } for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { - addListner(inst); + // TODO: provide switch to turn off this warning? + if(!containsGlobWildcard((char*)inst->pszFileName)) { + if(access((char*)inst->pszFileName, R_OK) != 0) { + LogError(errno, RS_RET_ERR, + "imfile: on startup file '%s' does not exist " + "but is configured in static file monitor - this " + "may indicate a misconfiguration. If the file " + "appears at a later time, it will automatically " + "be processed. Reason", inst->pszFileName); + } + } + fs_node_add(runModConf->conf_tree, NULL, inst->pszFileName, 0, inst); } - /* if we could not set up any listeners, there is no point in running... */ - if(runModConf->pRootLstn == 0) { - errmsg.LogError(0, NO_ERRCODE, "imfile: no file monitors could be started, " - "input not activated.\n"); - ABORT_FINALIZE(RS_RET_NO_RUN); + if(Debug) { + fs_node_print(runModConf->conf_tree, 0); } + finalize_it: ENDactivateCnf @@ -1237,14 +1882,20 @@ ENDactivateCnf BEGINfreeCnf instanceConf_t *inst, *del; CODESTARTfreeCnf + fs_node_destroy(pModConf->conf_tree); + //move_list_destruct(pModConf); for(inst = pModConf->root ; inst != NULL ; ) { + if(inst->startRegex != NULL) + regfree(&inst->end_preg); free(inst->pszBindRuleset); free(inst->pszFileName); - free(inst->pszDirName); - free(inst->pszFileBaseName); free(inst->pszTag); free(inst->pszStateFile); - free(inst->startRegex); + free(inst->pszFileName_forOldStateFile); + if(inst->startRegex != NULL) { + regfree(&inst->end_preg); + free(inst->startRegex); + } del = inst; inst = inst->next; free(del); @@ -1252,45 +1903,25 @@ CODESTARTfreeCnf ENDfreeCnf -/* Monitor files in traditional polling mode. - * - * We go through all files and remember if at least one had data. If so, we do - * another run (until no data was present in any file). Then we sleep for - * PollInterval seconds and restart the whole process. This ensures that as - * long as there is some data present, it will be processed at the fastest - * possible pace - probably important for busy systmes. If we monitor just a - * single file, the algorithm is slightly modified. In that case, the sleep - * hapens immediately. The idea here is that if we have just one file, we - * returned from the file processer because that file had no additional data. - * So even if we found some lines, it is highly unlikely to find a new one - * just now. Trying it would result in a performance-costly additional try - * which in the very, very vast majority of cases will never find any new - * lines. - * On spamming the main queue: keep in mind that it will automatically rate-limit - * ourselfes if we begin to overrun it. So we really do not need to care here. - */ +/* Monitor files in polling mode. */ static rsRetVal doPolling(void) { - int bHadFileData; /* were there at least one file with data during this run? */ DEFiRet; while(glbl.GetGlobalInputTermState() == 0) { + DBGPRINTF("doPolling: new poll run\n"); do { - lstn_t *pLstn; - bHadFileData = 0; - for(pLstn = runModConf->pRootLstn ; pLstn != NULL ; pLstn = pLstn->next) { - if(glbl.GetGlobalInputTermState() == 1) - break; /* terminate input! */ - pollFile(pLstn, &bHadFileData); - } - } while(bHadFileData == 1 && glbl.GetGlobalInputTermState() == 0); - /* warning: do...while()! */ + runModConf->bHadFileData = 0; + fs_node_walk(runModConf->conf_tree, poll_tree); + DBGPRINTF("doPolling: end poll walk, hadData %d\n", runModConf->bHadFileData); + } while(runModConf->bHadFileData); /* warning: do...while()! */ /* Note: the additional 10ns wait is vitally important. It guards rsyslog * against totally hogging the CPU if the users selects a polling interval * of 0 seconds. It doesn't hurt any other valid scenario. So do not remove. * rgerhards, 2008-02-14 */ + DBGPRINTF("doPolling: poll going to sleep\n"); if(glbl.GetGlobalInputTermState() == 0) srSleep(runModConf->iPollInterval, 10); } @@ -1298,631 +1929,122 @@ doPolling(void) RETiRet; } +#if defined(HAVE_INOTIFY_INIT) -#ifdef HAVE_INOTIFY_INIT -static rsRetVal -fileTableInit(fileTable_t *const __restrict__ tab, const int nelem) -{ - DEFiRet; - CHKmalloc(tab->listeners = malloc(sizeof(dirInfoFiles_t) * nelem)); - tab->allocMax = nelem; - tab->currMax = 0; -finalize_it: - RETiRet; -} -/* uncomment if needed static void -fileTableDisplay(fileTable_t *tab) +in_dbg_showEv(const struct inotify_event *ev) { - int f; - uchar *baseName; - DBGPRINTF("imfile: dirs.currMaxfiles %d\n", tab->currMax); - for(f = 0 ; f < tab->currMax ; ++f) { - baseName = tab->listeners[f].pLstn->pszBaseName; - DBGPRINTF("imfile: TABLE %p CONTENTS, %d->%p:'%s'\n", tab, f, tab->listeners[f].pLstn, (char*)baseName); - } -} -*/ - -static int -fileTableSearch(fileTable_t *const __restrict__ tab, uchar *const __restrict__ fn) -{ - int f; - uchar *baseName = NULL; - /* UNCOMMENT FOR DEBUG fileTableDisplay(tab); */ - for(f = 0 ; f < tab->currMax ; ++f) { - baseName = tab->listeners[f].pLstn->pszBaseName; - if(!fnmatch((char*)baseName, (char*)fn, FNM_PATHNAME | FNM_PERIOD)) - break; /* found */ - } - if(f == tab->currMax) - f = -1; - DBGPRINTF("imfile: fileTableSearch file '%s' - '%s', found:%d\n", fn, baseName, f); - return f; -} - -static int -fileTableSearchNoWildcard(fileTable_t *const __restrict__ tab, uchar *const __restrict__ fn) -{ - int f; - uchar *baseName = NULL; - /* UNCOMMENT FOR DEBUG fileTableDisplay(tab); */ - for(f = 0 ; f < tab->currMax ; ++f) { - baseName = tab->listeners[f].pLstn->pszBaseName; - if (strcmp((const char*)baseName, (const char*)fn) == 0) - break; /* found */ - } - if(f == tab->currMax) - f = -1; - DBGPRINTF("imfile: fileTableSearchNoWildcard file '%s' - '%s', found:%d\n", fn, baseName, f); - return f; -} - -/* add file to file table */ -static rsRetVal -fileTableAddFile(fileTable_t *const __restrict__ tab, lstn_t *const __restrict__ pLstn) -{ - int j; - DEFiRet; - /* UNCOMMENT FOR DEBUG fileTableDisplay(tab); */ - for(j = 0 ; j < tab->currMax && tab->listeners[j].pLstn != pLstn ; ++j) - ; /* just scan */ - if(j < tab->currMax) { - ++tab->listeners[j].refcnt; - DBGPRINTF("imfile: file '%s' already registered, refcnt now %d\n", - pLstn->pszFileName, tab->listeners[j].refcnt); - FINALIZE; + if(ev->mask & IN_IGNORED) { + dbgprintf("INOTIFY event: watch was REMOVED\n"); } - - if(tab->currMax == tab->allocMax) { - const int newMax = 2 * tab->allocMax; - dirInfoFiles_t *newListenerTab = realloc(tab->listeners, newMax * sizeof(dirInfoFiles_t)); - if(newListenerTab == NULL) { - errmsg.LogError(0, RS_RET_OUT_OF_MEMORY, - "cannot alloc memory to map directory/file relationship " - "for '%s' - ignoring", pLstn->pszFileName); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - tab->listeners = newListenerTab; - tab->allocMax = newMax; - DBGPRINTF("imfile: increased dir table to %d entries\n", allocMaxDirs); + if(ev->mask & IN_MODIFY) { + dbgprintf("INOTIFY event: watch was MODIFID\n"); } - - tab->listeners[tab->currMax].pLstn = pLstn; - tab->listeners[tab->currMax].refcnt = 1; - tab->currMax++; -finalize_it: - RETiRet; -} - -/* delete a file from file table */ -static rsRetVal -fileTableDelFile(fileTable_t *const __restrict__ tab, lstn_t *const __restrict__ pLstn) -{ - int j; - DEFiRet; - - for(j = 0 ; j < tab->currMax && tab->listeners[j].pLstn != pLstn ; ++j) - ; /* just scan */ - if(j == tab->currMax) { - DBGPRINTF("imfile: no association for file '%s'\n", pLstn->pszFileName); - FINALIZE; + if(ev->mask & IN_ACCESS) { + dbgprintf("INOTIFY event: watch IN_ACCESS\n"); } - tab->listeners[j].refcnt--; - if(tab->listeners[j].refcnt == 0) { - /* we remove that entry (but we never shrink the table) */ - if(j < tab->currMax - 1) { - /* entry in middle - need to move others */ - memmove(tab->listeners+j, tab->listeners+j+1, - (tab->currMax -j-1) * sizeof(dirInfoFiles_t)); - } - --tab->currMax; + if(ev->mask & IN_ATTRIB) { + dbgprintf("INOTIFY event: watch IN_ATTRIB\n"); } -finalize_it: - RETiRet; -} -/* add entry to dirs array */ -static rsRetVal -dirsAdd(uchar *dirName) -{ - int newMax; - dirInfo_t *newDirTab; - DEFiRet; - - if(currMaxDirs == allocMaxDirs) { - newMax = 2 * allocMaxDirs; - newDirTab = realloc(dirs, newMax * sizeof(dirInfo_t)); - if(newDirTab == NULL) { - errmsg.LogError(0, RS_RET_OUT_OF_MEMORY, - "cannot alloc memory to monitor directory '%s' - ignoring", - dirName); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - dirs = newDirTab; - allocMaxDirs = newMax; - DBGPRINTF("imfile: increased dir table to %d entries\n", allocMaxDirs); + if(ev->mask & IN_CLOSE_WRITE) { + dbgprintf("INOTIFY event: watch IN_CLOSE_WRITE\n"); } - - /* if we reach this point, there is space in the file table for the new entry */ - dirs[currMaxDirs].dirName = dirName; - CHKiRet(fileTableInit(&dirs[currMaxDirs].active, INIT_FILE_IN_DIR_TAB_SIZE)); - CHKiRet(fileTableInit(&dirs[currMaxDirs].configured, INIT_FILE_IN_DIR_TAB_SIZE)); - - ++currMaxDirs; - DBGPRINTF("imfile: added to dirs table: '%s'\n", dirName); -finalize_it: - RETiRet; -} - - -/* checks if a dir name is already inside the dirs array. If so, returns - * its index. If not present, -1 is returned. - */ -static int -dirsFindDir(uchar *dir) -{ - int i; - - for(i = 0 ; i < currMaxDirs && ustrcmp(dir, dirs[i].dirName) ; ++i) - ; /* just scan, all done in for() */ - if(i == currMaxDirs) - i = -1; - return i; -} - -static rsRetVal -dirsInit(void) -{ - instanceConf_t *inst; - DEFiRet; - - free(dirs); - CHKmalloc(dirs = malloc(sizeof(dirInfo_t) * INIT_FILE_TAB_SIZE)); - allocMaxDirs = INIT_FILE_TAB_SIZE; - currMaxDirs = 0; - - for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { - if(dirsFindDir(inst->pszDirName) == -1) - dirsAdd(inst->pszDirName); + if(ev->mask & IN_CLOSE_NOWRITE) { + dbgprintf("INOTIFY event: watch IN_CLOSE_NOWRITE\n"); } - -finalize_it: - RETiRet; -} - -/* add file to directory (create association) - * fIdx is index into file table, all other information is pulled from that table. - * bActive is 1 if the file is to be added to active set, else zero - */ -static rsRetVal -dirsAddFile(lstn_t *__restrict__ pLstn, const int bActive) -{ - int dirIdx; - dirInfo_t *dir; - DEFiRet; - - dirIdx = dirsFindDir(pLstn->pszDirName); - if(dirIdx == -1) { - errmsg.LogError(0, RS_RET_INTERNAL_ERROR, "imfile: could not find " - "directory '%s' in dirs array - ignoring", - pLstn->pszDirName); - FINALIZE; + if(ev->mask & IN_CREATE) { + dbgprintf("INOTIFY event: file was CREATED: %s\n", ev->name); } - - dir = dirs + dirIdx; - CHKiRet(fileTableAddFile((bActive ? &dir->active : &dir->configured), pLstn)); - DBGPRINTF("imfile: associated file [%s] to directory %d[%s], Active = %d\n", - pLstn->pszFileName, dirIdx, dir->dirName, bActive); - /* UNCOMMENT FOR DEBUG fileTableDisplay(bActive ? &dir->active : &dir->configured); */ -finalize_it: - RETiRet; -} - - -static void -in_setupDirWatch(const int dirIdx) -{ - int wd; - wd = inotify_add_watch(ino_fd, (char*)dirs[dirIdx].dirName, IN_CREATE|IN_DELETE|IN_MOVED_FROM); - if(wd < 0) { - DBGPRINTF("imfile: could not create dir watch for '%s'\n", - dirs[dirIdx].dirName); - goto done; + if(ev->mask & IN_DELETE) { + dbgprintf("INOTIFY event: watch IN_DELETE\n"); } - wdmapAdd(wd, dirIdx, NULL); - DBGPRINTF("imfile: watch %d added for dir %s\n", wd, dirs[dirIdx].dirName); -done: return; -} - -/* Setup a new file watch for a known active file. It must already have - * been entered into the correct tables. - * Note: we need to try to read this file, as it may already contain data this - * needs to be processed, and we won't get an event for that as notifications - * happen only for things after the watch has been activated. - * Note: newFileName is NULL for configured files, and non-NULL for dynamically - * detected files (e.g. wildcards!) - */ -static void -startLstnFile(lstn_t *const __restrict__ pLstn) -{ - rsRetVal localRet; - const int wd = inotify_add_watch(ino_fd, (char*)pLstn->pszFileName, IN_MODIFY); - if(wd < 0) { - char errStr[512]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - DBGPRINTF("imfile: could not create file table entry for '%s' - " - "not processing it now: %s\n", - pLstn->pszFileName, errStr); - goto done; + if(ev->mask & IN_DELETE_SELF) { + dbgprintf("INOTIFY event: watch IN_DELETE_SELF\n"); } - if((localRet = wdmapAdd(wd, -1, pLstn)) != RS_RET_OK) { - DBGPRINTF("imfile: error %d adding file to wdmap, ignoring\n", localRet); - goto done; + if(ev->mask & IN_MOVE_SELF) { + dbgprintf("INOTIFY event: watch IN_MOVE_SELF\n"); } - DBGPRINTF("imfile: watch %d added for file %s\n", wd, pLstn->pszFileName); - dirsAddFile(pLstn, ACTIVE_FILE); - pollFile(pLstn, NULL); -done: return; -} - -/* Duplicate an existing listener. This is called when a new file is to - * be monitored due to wildcard detection. Returns the new pLstn in - * the ppExisting parameter. - */ -static rsRetVal -lstnDup(lstn_t **ppExisting, uchar *const __restrict__ newname) -{ - DEFiRet; - lstn_t *const existing = *ppExisting; - lstn_t *pThis; - - CHKiRet(lstnAdd(&pThis)); - pThis->pszDirName = existing->pszDirName; /* read-only */ - pThis->pszBaseName = (uchar*)strdup((char*)newname); - if(asprintf((char**)&pThis->pszFileName, "%s/%s", (char*)pThis->pszDirName, (char*)newname) == -1) { - DBGPRINTF("imfile/lstnDup: asprintf failed, malfunction can happen\n"); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - pThis->pszTag = (uchar*) strdup((char*) existing->pszTag); - pThis->lenTag = ustrlen(pThis->pszTag); - pThis->pszStateFile = existing->pszStateFile == NULL ? NULL : (uchar*) strdup((char*) existing->pszStateFile); - - CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)pThis->pszFileName)); - pThis->multiSub.maxElem = existing->multiSub.maxElem; - pThis->multiSub.nElem = 0; - CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(pThis->multiSub.maxElem * sizeof(smsg_t*))); - pThis->iSeverity = existing->iSeverity; - pThis->iFacility = existing->iFacility; - pThis->maxLinesAtOnce = existing->maxLinesAtOnce; - pThis->trimLineOverBytes = existing->trimLineOverBytes; - pThis->iPersistStateInterval = existing->iPersistStateInterval; - pThis->readMode = existing->readMode; - pThis->startRegex = existing->startRegex; /* no strdup, as it is read-only */ - if(pThis->startRegex != NULL) // TODO: make this a single function with better error handling - if(regcomp(&pThis->end_preg, (char*)pThis->startRegex, REG_EXTENDED)) { - DBGPRINTF("imfile: error regex compile\n"); - ABORT_FINALIZE(RS_RET_ERR); - } - pThis->bRMStateOnDel = existing->bRMStateOnDel; - pThis->hasWildcard = existing->hasWildcard; - pThis->escapeLF = existing->escapeLF; - pThis->reopenOnTruncate = existing->reopenOnTruncate; - pThis->addMetadata = existing->addMetadata; - pThis->addCeeTag = existing->addCeeTag; - pThis->readTimeout = existing->readTimeout; - pThis->freshStartTail = existing->freshStartTail; - pThis->pRuleset = existing->pRuleset; - pThis->nRecords = 0; - pThis->pStrm = NULL; - pThis->prevLineSegment = NULL; - pThis->masterLstn = existing; - *ppExisting = pThis; -finalize_it: - RETiRet; -} - -/* Setup a new file watch for dynamically discovered files (via wildcards). - * Note: we need to try to read this file, as it may already contain data this - * needs to be processed, and we won't get an event for that as notifications - * happen only for things after the watch has been activated. - */ -static void -in_setupFileWatchDynamic(lstn_t *pLstn, uchar *const __restrict__ newBaseName) -{ - char fullfn[MAXFNAME]; - struct stat fileInfo; - snprintf(fullfn, MAXFNAME, "%s/%s", pLstn->pszDirName, newBaseName); - if(stat(fullfn, &fileInfo) != 0) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - DBGPRINTF("imfile: ignoring file '%s' cannot stat(): %s\n", - fullfn, errStr); - goto done; + if(ev->mask & IN_MOVED_FROM) { + dbgprintf("INOTIFY event: watch IN_MOVED_FROM, cookie %u, name '%s'\n", ev->cookie, ev->name); } - - if(S_ISDIR(fileInfo.st_mode)) { - DBGPRINTF("imfile: ignoring directory '%s'\n", fullfn); - goto done; + if(ev->mask & IN_MOVED_TO) { + dbgprintf("INOTIFY event: watch IN_MOVED_TO, cookie %u, name '%s'\n", ev->cookie, ev->name); } - - if(lstnDup(&pLstn, newBaseName) != RS_RET_OK) - goto done; - - startLstnFile(pLstn); -done: return; -} - -/* Setup a new file watch for static (configured) files. - * Note: we need to try to read this file, as it may already contain data this - * needs to be processed, and we won't get an event for that as notifications - * happen only for things after the watch has been activated. - */ -static void -in_setupFileWatchStatic(lstn_t *pLstn) -{ - DBGPRINTF("imfile: adding file '%s' to configured table\n", - pLstn->pszFileName); - dirsAddFile(pLstn, CONFIGURED_FILE); - - if(pLstn->hasWildcard) { - DBGPRINTF("imfile: file '%s' has wildcard, doing initial " - "expansion\n", pLstn->pszFileName); - glob_t files; - const int ret = glob((char*)pLstn->pszFileName, - GLOB_MARK|GLOB_NOSORT|GLOB_BRACE, NULL, &files); - if(ret == 0) { - for(unsigned i = 0 ; i < files.gl_pathc ; i++) { - uchar basen[MAXFNAME]; - uchar *const file = (uchar*)files.gl_pathv[i]; - if(file[strlen((char*)file)-1] == '/') - continue;/* we cannot process subdirs! */ - getBasename(basen, file); - in_setupFileWatchDynamic(pLstn, basen); - } - globfree(&files); - } - } else { - /* Duplicate static object as well, otherwise the configobject could be deleted later! */ - if(lstnDup(&pLstn, pLstn->pszBaseName) != RS_RET_OK) { - DBGPRINTF("imfile: in_setupFileWatchStatic failed to duplicate listener for '%s'\n", pLstn->pszFileName); - goto done; - } - startLstnFile(pLstn); + if(ev->mask & IN_OPEN) { + dbgprintf("INOTIFY event: watch IN_OPEN\n"); } -done: return; -} - -/* setup our initial set of watches, based on user config */ -static void -in_setupInitialWatches(void) -{ - int i; - for(i = 0 ; i < currMaxDirs ; ++i) { - in_setupDirWatch(i); - } - lstn_t *pLstn; - for(pLstn = runModConf->pRootLstn ; pLstn != NULL ; pLstn = pLstn->next) { - if(pLstn->masterLstn == NULL) { - /* we process only static (master) entries */ - in_setupFileWatchStatic(pLstn); - } + if(ev->mask & IN_ISDIR) { + dbgprintf("INOTIFY event: watch IN_ISDIR\n"); } } -static void -in_dbg_showEv(struct inotify_event *ev) -{ - if(ev->mask & IN_IGNORED) { - DBGPRINTF("INOTIFY event: watch was REMOVED\n"); - } else if(ev->mask & IN_MODIFY) { - DBGPRINTF("INOTIFY event: watch was MODIFID\n"); - } else if(ev->mask & IN_ACCESS) { - DBGPRINTF("INOTIFY event: watch IN_ACCESS\n"); - } else if(ev->mask & IN_ATTRIB) { - DBGPRINTF("INOTIFY event: watch IN_ATTRIB\n"); - } else if(ev->mask & IN_CLOSE_WRITE) { - DBGPRINTF("INOTIFY event: watch IN_CLOSE_WRITE\n"); - } else if(ev->mask & IN_CLOSE_NOWRITE) { - DBGPRINTF("INOTIFY event: watch IN_CLOSE_NOWRITE\n"); - } else if(ev->mask & IN_CREATE) { - DBGPRINTF("INOTIFY event: file was CREATED: %s\n", ev->name); - } else if(ev->mask & IN_DELETE) { - DBGPRINTF("INOTIFY event: watch IN_DELETE\n"); - } else if(ev->mask & IN_DELETE_SELF) { - DBGPRINTF("INOTIFY event: watch IN_DELETE_SELF\n"); - } else if(ev->mask & IN_MOVE_SELF) { - DBGPRINTF("INOTIFY event: watch IN_MOVE_SELF\n"); - } else if(ev->mask & IN_MOVED_FROM) { - DBGPRINTF("INOTIFY event: watch IN_MOVED_FROM\n"); - } else if(ev->mask & IN_MOVED_TO) { - DBGPRINTF("INOTIFY event: watch IN_MOVED_TO\n"); - } else if(ev->mask & IN_OPEN) { - DBGPRINTF("INOTIFY event: watch IN_OPEN\n"); - } else if(ev->mask & IN_ISDIR) { - DBGPRINTF("INOTIFY event: watch IN_ISDIR\n"); - } else { - DBGPRINTF("INOTIFY event: unknown mask code %8.8x\n", ev->mask); - } -} - -/* inotify told us that a file's wd was closed. We now need to remove - * the file from our internal structures. Remember that a different inode - * with the same name may already be in processing. - */ static void -in_removeFile(const int dirIdx, - lstn_t *const __restrict__ pLstn) +in_handleFileEvent(struct inotify_event *ev, const wd_map_t *const etry) { - uchar statefile[MAXFNAME]; - uchar toDel[MAXFNAME]; - int bDoRMState; - int wd; - uchar *statefn; - DBGPRINTF("imfile: remove listener '%s', dirIdx %d\n", - pLstn->pszFileName, dirIdx); - if(pLstn->bRMStateOnDel) { - statefn = getStateFileName(pLstn, statefile, sizeof(statefile)); - snprintf((char*)toDel, sizeof(toDel), "%s/%s", - glbl.GetWorkDir(), (char*)statefn); - bDoRMState = 1; + if(ev->mask & IN_MODIFY) { + DBGPRINTF("fs_node_notify_file_update: act->name '%s'\n", etry->act->name); + pollFile(etry->act); } else { - bDoRMState = 0; - } - pollFile(pLstn, NULL); /* one final try to gather data */ - /* delete listener data */ - DBGPRINTF("imfile: DELETING listener data for '%s' - '%s'\n", pLstn->pszBaseName, pLstn->pszFileName); - lstnDel(pLstn); - fileTableDelFile(&dirs[dirIdx].active, pLstn); - if(bDoRMState) { - DBGPRINTF("imfile: unlinking '%s'\n", toDel); - if(unlink((char*)toDel) != 0) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, "imfile: could not remove state " - "file \"%s\": %s", toDel, errStr); - } + DBGPRINTF("got non-expected inotify event:\n"); + in_dbg_showEv(ev); } - wd = wdmapLookupListner(pLstn); - wdmapDel(wd); } -static void -in_handleDirEventCREATE(struct inotify_event *ev, const int dirIdx) -{ - lstn_t *pLstn; - int ftIdx; - ftIdx = fileTableSearch(&dirs[dirIdx].active, (uchar*)ev->name); - if(ftIdx >= 0) { - pLstn = dirs[dirIdx].active.listeners[ftIdx].pLstn; - } else { - DBGPRINTF("imfile: file '%s' not active in dir '%s'\n", - ev->name, dirs[dirIdx].dirName); - ftIdx = fileTableSearch(&dirs[dirIdx].configured, (uchar*)ev->name); - if(ftIdx == -1) { - DBGPRINTF("imfile: file '%s' not associated with dir '%s'\n", - ev->name, dirs[dirIdx].dirName); - goto done; - } - pLstn = dirs[dirIdx].configured.listeners[ftIdx].pLstn; - } - DBGPRINTF("imfile: file '%s' associated with dir '%s'\n", ev->name, dirs[dirIdx].dirName); - in_setupFileWatchDynamic(pLstn, (uchar*)ev->name); -done: return; -} -/* note: we need to care only for active files in the DELETE case. - * Two reasons: a) if this is a configured file, it should be active - * b) if not for some reason, there still is nothing we can do against - * it, and trying to process a *deleted* file really makes no sense - * (remeber we don't have it open, so it actually *is gone*). +/* workaround for IN_MOVED: walk active list and prevent state file deletion of + * IN_MOVED_IN active object + * TODO: replace by a more generic solution. */ static void -in_handleDirEventDELETE(struct inotify_event *const ev, const int dirIdx) -{ - const int ftIdx = fileTableSearch(&dirs[dirIdx].active, (uchar*)ev->name); - if(ftIdx == -1) { - DBGPRINTF("imfile: deleted file '%s' not active in dir '%s'\n", - ev->name, dirs[dirIdx].dirName); - goto done; - } - DBGPRINTF("imfile: imfile delete processing for '%s'\n", - dirs[dirIdx].active.listeners[ftIdx].pLstn->pszFileName); - in_removeFile(dirIdx, dirs[dirIdx].active.listeners[ftIdx].pLstn); -done: return; -} - -static void -in_handleDirEvent(struct inotify_event *const ev, const int dirIdx) +flag_in_move(fs_edge_t *const edge, const char *name_moved) { - DBGPRINTF("imfile: handle dir event for %s\n", dirs[dirIdx].dirName); - if((ev->mask & IN_CREATE)) { - in_handleDirEventCREATE(ev, dirIdx); - } else if((ev->mask & IN_DELETE)) { - in_handleDirEventDELETE(ev, dirIdx); - } else { - DBGPRINTF("imfile: got non-expected inotify event:\n"); - in_dbg_showEv(ev); - } -} + act_obj_t *act; - -static void -in_handleFileEvent(struct inotify_event *ev, const wd_map_t *const etry) -{ - if(ev->mask & IN_MODIFY) { - pollFile(etry->pLstn, NULL); - } else { - DBGPRINTF("imfile: got non-expected inotify event:\n"); - in_dbg_showEv(ev); + for(act = edge->active ; act != NULL ; act = act->next) { + DBGPRINTF("checking active object %s\n", act->basename); + if(!strcmp(act->basename, name_moved)){ + DBGPRINTF("found file\n"); + act->in_move = 1; + break; + } else { + DBGPRINTF("name check fails, '%s' != '%s'\n", act->basename, name_moved); + } } } static void in_processEvent(struct inotify_event *ev) { - wd_map_t *etry; - lstn_t *pLstn; - int iRet; - int ftIdx; - int wd; - if(ev->mask & IN_IGNORED) { - goto done; - } else if(ev->mask & IN_MOVED_FROM) { - /* Find wd entry and remove it */ - etry = wdmapLookup(ev->wd); - if(etry != NULL) { - ftIdx = fileTableSearchNoWildcard(&dirs[etry->dirIdx].active, (uchar*)ev->name); - DBGPRINTF("imfile: IN_MOVED_FROM Event (ftIdx=%d, name=%s)\n", ftIdx, ev->name); - if(ftIdx >= 0) { - /* Find listener and wd table index*/ - pLstn = dirs[etry->dirIdx].active.listeners[ftIdx].pLstn; - wd = wdmapLookupListner(pLstn); - - /* Remove file from inotify watch */ - iRet = inotify_rm_watch(ino_fd, wd); /* Note this will TRIGGER IN_IGNORED Event! */ - if (iRet != 0) { - DBGPRINTF("imfile: inotify_rm_watch error %d (ftIdx=%d, wd=%d, name=%s)\n", errno, ftIdx, wd, ev->name); - } else { - DBGPRINTF("imfile: inotify_rm_watch successfully removed file from watch (ftIdx=%d, wd=%d, name=%s)\n", ftIdx, wd, ev->name); - } - in_removeFile(etry->dirIdx, pLstn); - DBGPRINTF("imfile: IN_MOVED_FROM Event file removed file (wd=%d, name=%s)\n", wd, ev->name); - } - } + DBGPRINTF("imfile: got IN_IGNORED event\n"); goto done; } - etry = wdmapLookup(ev->wd); + + DBGPRINTF("in_processEvent process Event %x for %s\n", ev->mask, ev->name); + const wd_map_t *const etry = wdmapLookup(ev->wd); if(etry == NULL) { - DBGPRINTF("imfile: could not lookup wd %d\n", ev->wd); + LogMsg(0, RS_RET_INTERNAL_ERROR, LOG_WARNING, "imfile: internal error? " + "inotify provided watch descriptor %d which we could not find " + "in our tables - ignored", ev->wd); goto done; } - if(etry->pLstn == NULL) { /* directory? */ - in_handleDirEvent(ev, etry->dirIdx); + DBGPRINTF("in_processEvent process Event %x is_file %d, act->name '%s'\n", + ev->mask, etry->act->edge->is_file, etry->act->name); + + if((ev->mask & IN_MOVED_FROM)) { + flag_in_move(etry->act->edge->node->edges, ev->name); + } + if(ev->mask & (IN_MOVED_FROM | IN_MOVED_TO)) { + fs_node_walk(etry->act->edge->node, poll_tree); + } else if(etry->act->edge->is_file && !(etry->act->is_symlink)) { + in_handleFileEvent(ev, etry); // esentially poll_file()! } else { - in_handleFileEvent(ev, etry); + fs_node_walk(etry->act->edge->node, poll_tree); } done: return; } -static void -in_do_timeout_processing(void) -{ - int i; - DBGPRINTF("imfile: readTimeouts are configured, checking if some apply\n"); - - for(i = 0 ; i < nWdmap ; ++i) { - dbgprintf("imfile: wdmap %d, plstn %p\n", i, wdmap[i].pLstn); - lstn_t *const pLstn = wdmap[i].pLstn; - if(pLstn != NULL && strmReadMultiLine_isTimedOut(pLstn->pStrm)) { - dbgprintf("imfile: wdmap %d, timeout occured\n", i); - pollFile(pLstn, NULL); - } - } - -} - /* Monitor files in inotify mode */ #if !defined(_AIX) @@ -1940,14 +2062,16 @@ do_inotify(void) DEFiRet; CHKiRet(wdmapInit()); - CHKiRet(dirsInit()); ino_fd = inotify_init(); - if(ino_fd < 0) { - errmsg.LogError(1, RS_RET_INOTIFY_INIT_FAILED, "imfile: Init inotify instance failed "); - return RS_RET_INOTIFY_INIT_FAILED; - } - DBGPRINTF("imfile: inotify fd %d\n", ino_fd); - in_setupInitialWatches(); + if(ino_fd < 0) { + LogError(errno, RS_RET_INOTIFY_INIT_FAILED, "imfile: Init inotify " + "instance failed "); + return RS_RET_INOTIFY_INIT_FAILED; + } + DBGPRINTF("inotify fd %d\n", ino_fd); + + /* do watch initialization */ + fs_node_walk(runModConf->conf_tree, poll_tree); while(glbl.GetGlobalInputTermState() == 0) { if(runModConf->haveReadTimeouts) { @@ -1959,7 +2083,8 @@ do_inotify(void) r = poll(&pollfd, 1, runModConf->timeoutGranularity); } while(r == -1 && errno == EINTR); if(r == 0) { - in_do_timeout_processing(); + DBGPRINTF("readTimeouts are configured, checking if some apply\n"); + fs_node_walk(runModConf->conf_tree, poll_timeouts); continue; } else if (r == -1) { char errStr[1024]; @@ -2035,49 +2160,96 @@ CODESTARTwillRun CHKiRet(prop.Construct(&pInputName)); CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imfile"), sizeof("imfile") - 1)); CHKiRet(prop.ConstructFinalize(pInputName)); - finalize_it: ENDwillRun +// TODO: refactor this into a generically-usable "atomic file creation" utility for +// all kinds of "state files" +static rsRetVal +atomicWriteStateFile(const char *fn, const char *content) +{ + DEFiRet; + const int fd = open(fn, O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC, 0600); + if(fd < 0) { + LogError(errno, RS_RET_IO_ERROR, "imfile: cannot open state file '%s' for " + "persisting file state - some data will probably be duplicated " + "on next startup", fn); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + const size_t toWrite = strlen(content); + const ssize_t w = write(fd, content, toWrite); + if(w != (ssize_t) toWrite) { + LogError(errno, RS_RET_IO_ERROR, "imfile: partial write to state file '%s' " + "this may cause trouble in the future. We will try to delete the " + "state file, as this provides most consistent state", fn); + unlink(fn); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + +finalize_it: + if(fd >= 0) { + close(fd); + } + RETiRet; +} + + /* This function persists information for a specific file being monitored. * To do so, it simply persists the stream object. We do NOT abort on error * iRet as that makes matters worse (at least we can try persisting the others...). * rgerhards, 2008-02-13 */ static rsRetVal -persistStrmState(lstn_t *pLstn) +persistStrmState(act_obj_t *const act) { DEFiRet; - strm_t *psSF = NULL; /* state file (stream) */ - size_t lenDir; uchar statefile[MAXFNAME]; + uchar statefname[MAXFNAME]; + + uchar *const statefn = getStateFileName(act, statefile, sizeof(statefile)); + getFullStateFileName(statefn, statefname, sizeof(statefname)); + DBGPRINTF("persisting state for '%s', state file '%s'\n", act->name, statefname); + + struct json_object *jval = NULL; + struct json_object *json = NULL; + CHKmalloc(json = json_object_new_object()); + jval = json_object_new_string((char*) act->name); + json_object_object_add(json, "filename", jval); + jval = json_object_new_int(strmGetPrevWasNL(act->pStrm)); + json_object_object_add(json, "prev_was_nl", jval); + + /* we access some data items a bit dirty, as we need to refactor the whole + * thing in any case - TODO + */ + jval = json_object_new_int64(act->pStrm->iCurrOffs); + json_object_object_add(json, "curr_offs", jval); + jval = json_object_new_int64(act->pStrm->strtOffs); + json_object_object_add(json, "strt_offs", jval); - uchar *const statefn = getStateFileName(pLstn, statefile, sizeof(statefile)); - DBGPRINTF("imfile: persisting state for '%s' to file '%s'\n", - pLstn->pszFileName, statefn); - CHKiRet(strm.Construct(&psSF)); - lenDir = ustrlen(glbl.GetWorkDir()); - if(lenDir > 0) - CHKiRet(strm.SetDir(psSF, glbl.GetWorkDir(), lenDir)); - CHKiRet(strm.SettOperationsMode(psSF, STREAMMODE_WRITE_TRUNC)); - CHKiRet(strm.SetsType(psSF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psSF, statefn, strlen((char*) statefn))); - CHKiRet(strm.ConstructFinalize(psSF)); + const uchar *const prevLineSegment = strmGetPrevLineSegment(act->pStrm); + if(prevLineSegment != NULL) { + jval = json_object_new_string((const char*) prevLineSegment); + json_object_object_add(json, "prev_line_segment", jval); + } - CHKiRet(strm.Serialize(pLstn->pStrm, psSF)); - CHKiRet(strm.Flush(psSF)); + const uchar *const prevMsgSegment = strmGetPrevMsgSegment(act->pStrm); + if(prevMsgSegment != NULL) { + jval = json_object_new_string((const char*) prevMsgSegment); + json_object_object_add(json, "prev_msg_segment", jval); + } - CHKiRet(strm.Destruct(&psSF)); + const char *jstr = json_object_to_json_string_ext(json, JSON_C_TO_STRING_SPACED); -finalize_it: - if(psSF != NULL) - strm.Destruct(&psSF); + CHKiRet(atomicWriteStateFile((const char*)statefname, jstr)); + json_object_put(json); +finalize_it: if(iRet != RS_RET_OK) { errmsg.LogError(0, iRet, "imfile: could not persist state " "file %s - data may be repeated on next " "startup. Is WorkDirectory set?", - statefn); + statefname); } RETiRet; @@ -2089,11 +2261,6 @@ finalize_it: */ BEGINafterRun CODESTARTafterRun - while(runModConf->pRootLstn != NULL) { - /* Note: lstnDel() reasociates root! */ - lstnDel(runModConf->pRootLstn); - } - if(pInputName != NULL) prop.Destruct(&pInputName); ENDafterRun @@ -2118,12 +2285,6 @@ CODESTARTmodExit objRelease(prop, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); #ifdef HAVE_INOTIFY_INIT - /* we use these vars only in inotify mode */ - if(dirs != NULL) { - free(dirs->active.listeners); - free(dirs->configured.listeners); - free(dirs); - } free(wdmap); #endif ENDmodExit diff --git a/runtime/msg.c b/runtime/msg.c index a885d2368bbaeea90a6e92dc0d569d169b1dd2e5..f45d6175283097974023905fc072508a18a8270a 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -4890,6 +4890,28 @@ finalize_it: RETiRet; } +rsRetVal +msgAddMultiMetadata(smsg_t *const __restrict__ pMsg, + const uchar ** __restrict__ metaname, + const uchar ** __restrict__ metaval, + const int count) +{ + DEFiRet; + int i = 0 ; + struct json_object *const json = json_object_new_object(); + CHKmalloc(json); + for ( i = 0 ; i < count ; i++ ) { + struct json_object *const jval = json_object_new_string((char*)metaval[i]); + if(jval == NULL) { + json_object_put(json); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + json_object_object_add(json, (const char *const)metaname[i], jval); + } + iRet = msgAddJSON(pMsg, (uchar*)"!metadata", json, 0, 0); +finalize_it: + RETiRet; +} static struct json_object * jsonDeepCopy(struct json_object *src) diff --git a/runtime/msg.h b/runtime/msg.h index 6521e19b28b013f0d06e357bdb0f33a94dab638b..0e92da43398156f4871b2e567a242cb089f67a08 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -195,6 +195,7 @@ int getPRIi(const smsg_t * const pM); void getRawMsg(smsg_t *pM, uchar **pBuf, int *piLen); rsRetVal msgAddJSON(smsg_t *pM, uchar *name, struct json_object *json, int force_reset, int sharedReference); rsRetVal msgAddMetadata(smsg_t *msg, uchar *metaname, uchar *metaval); +rsRetVal msgAddMultiMetadata(smsg_t *msg, const uchar **metaname, const uchar **metaval, const int count); rsRetVal MsgGetSeverity(smsg_t *pThis, int *piSeverity); rsRetVal MsgDeserialize(smsg_t *pMsg, strm_t *pStrm); rsRetVal MsgSetPropsViaJSON(smsg_t *__restrict__ const pMsg, const uchar *__restrict__ const json); diff --git a/runtime/stream.c b/runtime/stream.c index 701144c0e39d6fbcf9dd63fe60421e1dcd6f01c6..fb1ff11d1890bbaee107658dd3568c2bc67c223d 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -91,6 +91,41 @@ static rsRetVal strmSeekCurrOffs(strm_t *pThis); /* methods */ +/* note: this may return NULL if not line segment is currently set */ +// TODO: due to the cstrFinalize() this is not totally clean, albeit for our +// current use case it does not hurt -- refactor! rgerhards, 2018-03-27 +const uchar * +strmGetPrevLineSegment(strm_t *const pThis) +{ + const uchar *ret = NULL; + if(pThis->prevLineSegment != NULL) { + cstrFinalize(pThis->prevLineSegment); + ret = rsCStrGetSzStrNoNULL(pThis->prevLineSegment); + } + return ret; +} +/* note: this may return NULL if not line segment is currently set */ +// TODO: due to the cstrFinalize() this is not totally clean, albeit for our +// current use case it does not hurt -- refactor! rgerhards, 2018-03-27 +const uchar * +strmGetPrevMsgSegment(strm_t *const pThis) +{ + const uchar *ret = NULL; + if(pThis->prevMsgSegment != NULL) { + cstrFinalize(pThis->prevMsgSegment); + ret = rsCStrGetSzStrNoNULL(pThis->prevMsgSegment); + } + return ret; +} + + +int +strmGetPrevWasNL(const strm_t *const pThis) +{ + return pThis->bPrevWasNL; +} + + /* output (current) file name for debug log purposes. Falls back to various * levels of impreciseness if more precise name is not known. */ @@ -242,17 +277,18 @@ doPhysOpen(strm_t *pThis) } pThis->fd = open((char*)pThis->pszCurrFName, iFlags | O_LARGEFILE, pThis->tOpenMode); + const int errno_save = errno; /* dbgprintf can mangle it! */ DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, pThis->fd, (int) pThis->tOpenMode); if(pThis->fd == -1) { - char errStr[1024]; - int err = errno; - rs_strerror_r(err, errStr, sizeof(errStr)); - DBGOPRINT((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr); - if(err == ENOENT) - ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); - else - ABORT_FINALIZE(RS_RET_FILE_OPEN_ERROR); + const rsRetVal errcode = (errno_save == ENOENT) + ? RS_RET_FILE_NOT_FOUND : RS_RET_FILE_OPEN_ERROR; + if(pThis->fileNotFoundError) { + LogError(errno_save, errcode, "file '%s': open error", pThis->pszCurrFName); + } else { + DBGPRINTF("file '%s': open error", pThis->pszCurrFName); + } + ABORT_FINALIZE(errcode); } if(pThis->tOperationsMode == STREAMMODE_READ) { @@ -344,6 +380,8 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); + + free(pThis->pszCurrFName); pThis->pszCurrFName = NULL; /* used to prevent mem leak in case of error */ if(pThis->pszFName == NULL) @@ -733,11 +771,11 @@ static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) * a line, but following lines that are indented are part of the same log entry */ static rsRetVal -strmReadLine(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, uint32_t trimLineOverBytes) +strmReadLine(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, + uint32_t trimLineOverBytes, int64 *const strtOffs) { uchar c; uchar finished; - rsRetVal readCharRet; DEFiRet; ASSERT(pThis != NULL); @@ -756,12 +794,7 @@ strmReadLine(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, uint if(mode == 0) { while(c != '\n') { CHKiRet(cstrAppendChar(*ppCStr, c)); - readCharRet = strmReadChar(pThis, &c); - if((readCharRet == RS_RET_TIMED_OUT) || - (readCharRet == RS_RET_EOF) ) { /* end reached without \n? */ - CHKiRet(rsCStrConstructFromCStr(&pThis->prevLineSegment, *ppCStr)); - } - CHKiRet(readCharRet); + CHKiRet(strmReadChar(pThis, &c)); } if (trimLineOverBytes > 0 && (uint32_t) cstrLen(*ppCStr) > trimLineOverBytes) { /* Truncate long line at trimLineOverBytes position */ @@ -850,12 +883,19 @@ strmReadLine(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, uint } finalize_it: - if(iRet != RS_RET_OK && *ppCStr != NULL) { - if(cstrLen(*ppCStr) > 0) { - /* we may have an empty string in an unsuccsfull poll or after restart! */ - rsCStrConstructFromCStr(&pThis->prevLineSegment, *ppCStr); + if(iRet == RS_RET_OK) { + if(strtOffs != NULL) { + *strtOffs = pThis->strtOffs; + } + pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */ + } else { + if(*ppCStr != NULL) { + if(cstrLen(*ppCStr) > 0) { + /* we may have an empty string in an unsuccesfull poll or after restart! */ + rsCStrConstructFromCStr(&pThis->prevLineSegment, *ppCStr); + } + cstrDestruct(ppCStr); } - cstrDestruct(ppCStr); } RETiRet; @@ -882,7 +922,8 @@ strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis) * added 2015-05-12 rgerhards */ rsRetVal -strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, const sbool bEscapeLF) +strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, const sbool bEscapeLF, + int64 *const strtOffs) { uchar c; uchar finished = 0; @@ -946,16 +987,24 @@ strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, const sbool bEs } while(finished == 0); finalize_it: - if( pThis->readTimeout - && (iRet != RS_RET_OK) - && (pThis->prevMsgSegment != NULL) - && (tCurr > pThis->lastRead + pThis->readTimeout)) { - CHKiRet(rsCStrConstructFromCStr(ppCStr, pThis->prevMsgSegment)); - cstrDestruct(&pThis->prevMsgSegment); - pThis->lastRead = tCurr; - dbgprintf("stream: generated msg based on timeout: %s\n", cstrGetSzStrNoNULL(*ppCStr)); - FINALIZE; - iRet = RS_RET_OK; + *strtOffs = pThis->strtOffs; + if(thisLine != NULL) { + cstrDestruct(&thisLine); + } + if(iRet == RS_RET_OK) { + pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */ + } else { + if( pThis->readTimeout + && (pThis->prevMsgSegment != NULL) + && (tCurr > pThis->lastRead + pThis->readTimeout)) { + CHKiRet(rsCStrConstructFromCStr(ppCStr, pThis->prevMsgSegment)); + cstrDestruct(&pThis->prevMsgSegment); + pThis->lastRead = tCurr; + pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */ + dbgprintf("stream: generated msg based on timeout: %s\n", cstrGetSzStrNoNULL(*ppCStr)); + FINALIZE; + iRet = RS_RET_OK; + } } RETiRet; } @@ -974,7 +1023,10 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! pThis->pszSizeLimitCmd = NULL; pThis->prevLineSegment = NULL; pThis->prevMsgSegment = NULL; + pThis->strtOffs = 0; + pThis->ignoringMsg = 0; pThis->bPrevWasNL = 0; + pThis->fileNotFoundError = 1; ENDobjConstruct(strm) @@ -1686,7 +1738,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs) DBGPRINTF("strmSeek: error %lld seeking to offset %lld\n", i, (long long) offs); ABORT_FINALIZE(RS_RET_IO_ERROR); } - pThis->iCurrOffs = offs; /* we are now at *this* offset */ + pThis->strtOffs = pThis->iCurrOffs = offs; /* we are now at *this* offset */ pThis->iBufPtr = 0; /* buffer invalidated */ finalize_it: @@ -1738,7 +1790,7 @@ strmMultiFileSeek(strm_t *pThis, unsigned int FNum, off64_t offs, off64_t *bytes } else { *bytesDel = 0; } - pThis->iCurrOffs = offs; + pThis->strtOffs = pThis->iCurrOffs = offs; finalize_it: RETiRet; @@ -1763,7 +1815,7 @@ static rsRetVal strmSeekCurrOffs(strm_t *pThis) /* As the cryprov may use CBC or similiar things, we need to read skip data */ targetOffs = pThis->iCurrOffs; - pThis->iCurrOffs = 0; + pThis->strtOffs = pThis->iCurrOffs = 0; DBGOPRINT((obj_t*) pThis, "encrypted, doing skip read of %lld bytes\n", (long long) targetOffs); while(targetOffs != pThis->iCurrOffs) { @@ -1935,6 +1987,12 @@ static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) return RS_RET_OK; } +static rsRetVal strmSetFileNotFoundError(strm_t *pThis, int pFileNotFoundError) +{ + pThis->fileNotFoundError = pFileNotFoundError; + return RS_RET_OK; +} + /* set the stream's file prefix * The passed-in string is duplicated. So if the caller does not need @@ -2076,6 +2134,9 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) l = pThis->inode; objSerializeSCALAR_VAR(pStrm, inode, INT64, l); + l = pThis->strtOffs; + objSerializeSCALAR_VAR(pStrm, strtOffs, INT64, l); + if(pThis->prevLineSegment != NULL) { cstrFinalize(pThis->prevLineSegment); objSerializePTR(pStrm, prevLineSegment, CSTR); @@ -2188,8 +2249,12 @@ static rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp) pThis->iCurrOffs = pProp->val.num; } else if(isProp("inode")) { pThis->inode = (ino_t) pProp->val.num; + } else if(isProp("strtOffs")) { + pThis->strtOffs = pProp->val.num; } else if(isProp("iMaxFileSize")) { CHKiRet(strmSetiMaxFileSize(pThis, pProp->val.num)); + } else if(isProp("fileNotFoundError")) { + CHKiRet(strmSetFileNotFoundError(pThis, pProp->val.num)); } else if(isProp("iMaxFiles")) { CHKiRet(strmSetiMaxFiles(pThis, pProp->val.num)); } else if(isProp("iFileNumDigits")) { @@ -2253,6 +2318,7 @@ CODESTARTobjQueryInterface(strm) pIf->WriteChar = strmWriteChar; pIf->WriteLong = strmWriteLong; pIf->SetFName = strmSetFName; + pIf->SetFileNotFoundError = strmSetFileNotFoundError; pIf->SetDir = strmSetDir; pIf->Flush = strmFlush; pIf->RecordBegin = strmRecordBegin; diff --git a/runtime/stream.h b/runtime/stream.h index 1eee34979db34620b82e6351111864645187b035..bcb81a14f60f9effa52fffa42d18d66c484ae86d 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -159,6 +159,10 @@ typedef struct strm_s { sbool bIsTTY; /* is this a tty file? */ cstr_t *prevLineSegment; /* for ReadLine, previous, unprocessed part of file */ cstr_t *prevMsgSegment; /* for ReadMultiLine, previous, yet unprocessed part of msg */ + int64 strtOffs; /* start offset in file for current line/msg */ + int fileNotFoundError; + int noRepeatedErrorOutput; /* if a file is missing the Error is only given once */ + int ignoringMsg; } strm_t; @@ -174,6 +178,7 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Write)(strm_t *const pThis, const uchar *const pBuf, size_t lenBuf); rsRetVal (*WriteChar)(strm_t *pThis, uchar c); rsRetVal (*WriteLong)(strm_t *pThis, long i); + rsRetVal (*SetFileNotFoundError)(strm_t *pThis, int pFileNotFoundError); rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal (*SetDir)(strm_t *pThis, uchar *pszDir, size_t iLenDir); rsRetVal (*Flush)(strm_t *pThis); @@ -198,7 +203,8 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ INTERFACEpropSetMeth(strm, iFlushInterval, int); INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*); /* v6 added */ - rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, uint32_t trimLineOverBytes); + rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, + uint32_t trimLineOverBytes, int64 *const strtOffs); /* v7 added 2012-09-14 */ INTERFACEpropSetMeth(strm, bVeryReliableZip, int); /* v8 added 2013-03-21 */ @@ -207,19 +213,24 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ INTERFACEpropSetMeth(strm, cryprov, cryprov_if_t*); INTERFACEpropSetMeth(strm, cryprovData, void*); ENDinterface(strm) -#define strmCURR_IF_VERSION 12 /* increment whenever you change the interface structure! */ +#define strmCURR_IF_VERSION 13 /* increment whenever you change the interface structure! */ /* V10, 2013-09-10: added new parameter bEscapeLF, changed mode to uint8_t (rgerhards) */ /* V11, 2015-12-03: added new parameter bReopenOnTruncate */ /* V12, 2015-12-11: added new parameter trimLineOverBytes, changed mode to uint32_t */ +/* V13, 2017-09-06: added new parameter strtoffs to ReadLine() */ #define strmGetCurrFileNum(pStrm) ((pStrm)->iCurrFNum) /* prototypes */ PROTOTYPEObjClassInit(strm); rsRetVal strmMultiFileSeek(strm_t *pThis, unsigned int fileNum, off64_t offs, off64_t *bytesDel); -rsRetVal strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, sbool bEscapeLF); +rsRetVal strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, + sbool bEscapeLF, int64 *const strtOffs); int strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis); void strmDebugOutBuf(const strm_t *const pThis); void strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val); +const uchar * strmGetPrevLineSegment(strm_t *const pThis); +const uchar * strmGetPrevMsgSegment(strm_t *const pThis); +int strmGetPrevWasNL(const strm_t *const pThis); #endif /* #ifndef STREAM_H_INCLUDED */