daandemeyer / rpms / systemd

Forked from rpms/systemd 2 years ago
Clone
ecbff1
From 32244f8d21a3e06f6519c47234289da696f6b151 Mon Sep 17 00:00:00 2001
ecbff1
From: Lennart Poettering <lennart@poettering.net>
ecbff1
Date: Sun, 8 Oct 2017 09:05:59 +0200
ecbff1
Subject: [PATCH] journald: make maximum size of stream log lines configurable
ecbff1
 and bump it to 48K (#6838)
ecbff1
ecbff1
This adds a new setting LineMax= to journald.conf, and sets it by
ecbff1
default to 48K. When we convert stream-based stdout/stderr logging into
ecbff1
record-based log entries, read up to the specified amount of bytes
ecbff1
before forcing a line-break.
ecbff1
ecbff1
This also makes three related changes:
ecbff1
ecbff1
- When a NUL byte is read we'll not recognize this as alternative line
ecbff1
  break, instead of silently dropping everything after it. (see #4863)
ecbff1
ecbff1
- The reason for a line-break is now encoded in the log record, if it
ecbff1
  wasn't a plain newline. Specifically, we distuingish "nul",
ecbff1
  "line-max" and "eof", for line breaks due to NUL byte, due to the
ecbff1
  maximum line length as configured with LineMax= or due to end of
ecbff1
  stream. This data is stored in the new implicit _LINE_BREAK= field.
ecbff1
  It's not synthesized for plain \n line breaks.
ecbff1
ecbff1
- A randomized 128bit ID is assigned to each log stream.
ecbff1
ecbff1
With these three changes in place it's (mostly) possible to reconstruct
ecbff1
the original byte streams from log data, as (most) of the context of
ecbff1
the conversion from the byte stream to log records is saved now. (So,
ecbff1
the only bits we still drop are empty lines. Which might be something to
ecbff1
look into in a future change, and which is outside of the scope of this
ecbff1
work)
ecbff1
ecbff1
Fixes: https://bugs.freedesktop.org/show_bug.cgi?id=86465
ecbff1
See: #4863
ecbff1
Replaces: #4875
ecbff1
ecbff1
(cherry picked from commit ec20fe5ffb8a00469bab209fff6c069bb93c6db2)
ecbff1
ecbff1
Resolves: #1442262
ecbff1
ecbff1
[msekleta: I had to manually rewrite upstream commit, because git
ecbff1
did very poor job merging old and new code and identified a lot of merge
ecbff1
conflicts in a code that was totally unrelated.]
ecbff1
---
ecbff1
 man/journald.conf.xml            |  18 ++++++
ecbff1
 man/systemd.journal-fields.xml   |  22 ++++++++
ecbff1
 src/journal/journald-gperf.gperf |   1 +
ecbff1
 src/journal/journald-server.c    |  68 +++++++++++++++++++++++
ecbff1
 src/journal/journald-server.h    |   3 +
ecbff1
 src/journal/journald-stream.c    | 116 ++++++++++++++++++++++++++++++++-------
ecbff1
 src/journal/journald.conf        |   1 +
ecbff1
 7 files changed, 209 insertions(+), 20 deletions(-)
ecbff1
ecbff1
diff --git a/man/journald.conf.xml b/man/journald.conf.xml
ecbff1
index 46a498b67..e2d6a1225 100644
ecbff1
--- a/man/journald.conf.xml
ecbff1
+++ b/man/journald.conf.xml
ecbff1
@@ -354,6 +354,24 @@
ecbff1
         <filename>/dev/console</filename>.</para></listitem>
ecbff1
       </varlistentry>
ecbff1
 
ecbff1
+      <varlistentry>
ecbff1
+        <term><varname>LineMax=</varname></term>
ecbff1
+
ecbff1
+        <listitem><para>The maximum line length to permit when converting stream logs into record logs. When a systemd
ecbff1
+        unit's standard output/error are connected to the journal via a stream socket, the data read is split into
ecbff1
+        individual log records at newline (<literal>\n</literal>, ASCII 10) and NUL characters. If no such delimiter is
ecbff1
+        read for the specified number of bytes a hard log record boundary is artifically inserted, breaking up overly
ecbff1
+        long lines into multiple log records. Selecting overly large values increases the possible memory usage of the
ecbff1
+        Journal daemon for each stream client, as in the worst case the journal daemon needs to buffer the specified
ecbff1
+        number of bytes in memory before it can flush a new log record to disk. Also note that permitting overly large
ecbff1
+        line maximum line lengths affects compatibility with traditional log protocols as log records might not fit
ecbff1
+        anymore into a single <constant>AF_UNIX</constant> or <constant>AF_INET</constant> datagram. Takes a size in
ecbff1
+        bytes. If the value is suffixed with K, M, G or T, the specified size is parsed as Kilobytes, Megabytes,
ecbff1
+        Gigabytes, or Terabytes (with the base 1024), respectively. Defaults to 48K, which is relatively large but
ecbff1
+        still small enough so that log records likely fit into network datagrams along with extra room for
ecbff1
+        metadata. Note that values below 79 are not accepted and will be bumped to 79.</para></listitem>
ecbff1
+      </varlistentry>
ecbff1
+
ecbff1
     </variablelist>
ecbff1
 
ecbff1
   </refsect1>
ecbff1
diff --git a/man/systemd.journal-fields.xml b/man/systemd.journal-fields.xml
ecbff1
index 7d6c5c715..a53f8def2 100644
ecbff1
--- a/man/systemd.journal-fields.xml
ecbff1
+++ b/man/systemd.journal-fields.xml
ecbff1
@@ -311,6 +311,28 @@
ecbff1
           </variablelist>
ecbff1
         </listitem>
ecbff1
       </varlistentry>
ecbff1
+      <varlistentry>
ecbff1
+        <term><varname>_STREAM_ID=</varname></term>
ecbff1
+        <listitem>
ecbff1
+          <para>Only applies to <literal>_TRANSPORT=stream</literal> records: specifies a randomized 128bit ID assigned
ecbff1
+          to the stream connection when it was first created. This ID is useful to reconstruct individual log streams
ecbff1
+          from the log records: all log records carrying the same stream ID originate from the same stream.</para>
ecbff1
+        </listitem>
ecbff1
+      </varlistentry>
ecbff1
+      <varlistentry>
ecbff1
+        <term><varname>_LINE_BREAK=</varname></term>
ecbff1
+        <listitem>
ecbff1
+          <para>Only applies to <literal>_TRANSPORT=stream</literal> records: indicates that the log message in the
ecbff1
+          standard output/error stream was not terminated with a normal newline character (<literal>\n</literal>,
ecbff1
+          i.e. ASCII 10). Specifically, when set this field is one of <option>nul</option> (in case the line was
ecbff1
+          terminated by a NUL byte), <option>line-max</option> (in case the maximum log line length was reached, as
ecbff1
+          configured with <varname>LineMax=</varname> in
ecbff1
+          <citerefentry><refentrytitle>journald.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>) or
ecbff1
+          <option>eof</option> (if this was the last log record of a stream and the stream ended without a final
ecbff1
+          newline character). Note that this record is not generated when a normal newline character was used for
ecbff1
+          marking the log line end.</para>
ecbff1
+        </listitem>
ecbff1
+      </varlistentry>
ecbff1
     </variablelist>
ecbff1
   </refsect1>
ecbff1
 
ecbff1
diff --git a/src/journal/journald-gperf.gperf b/src/journal/journald-gperf.gperf
ecbff1
index 74554c1c3..73327c10e 100644
ecbff1
--- a/src/journal/journald-gperf.gperf
ecbff1
+++ b/src/journal/journald-gperf.gperf
ecbff1
@@ -40,3 +40,4 @@ Journal.MaxLevelKMsg,       config_parse_log_level,  0, offsetof(Server, max_lev
ecbff1
 Journal.MaxLevelConsole,    config_parse_log_level,  0, offsetof(Server, max_level_console)
ecbff1
 Journal.MaxLevelWall,       config_parse_log_level,  0, offsetof(Server, max_level_wall)
ecbff1
 Journal.SplitMode,          config_parse_split_mode, 0, offsetof(Server, split_mode)
ecbff1
+Journal.LineMax,            config_parse_line_max,   0, offsetof(Server, line_max)
ecbff1
\ No newline at end of file
ecbff1
diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c
ecbff1
index f6f8c30eb..daeecd519 100644
ecbff1
--- a/src/journal/journald-server.c
ecbff1
+++ b/src/journal/journald-server.c
ecbff1
@@ -67,6 +67,10 @@
ecbff1
 
ecbff1
 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
ecbff1
 
ecbff1
+/* Pick a good default that is likely to fit into AF_UNIX and AF_INET SOCK_DGRAM datagrams, and even leaves some room
ecbff1
++ * for a bit of additional metadata. */
ecbff1
+#define DEFAULT_LINE_MAX (48*1024)
ecbff1
+
ecbff1
 static const char* const storage_table[_STORAGE_MAX] = {
ecbff1
         [STORAGE_AUTO] = "auto",
ecbff1
         [STORAGE_VOLATILE] = "volatile",
ecbff1
@@ -83,9 +87,71 @@ static const char* const split_mode_table[_SPLIT_MAX] = {
ecbff1
         [SPLIT_NONE] = "none",
ecbff1
 };
ecbff1
 
ecbff1
+
ecbff1
 DEFINE_STRING_TABLE_LOOKUP(split_mode, SplitMode);
ecbff1
 DEFINE_CONFIG_PARSE_ENUM(config_parse_split_mode, split_mode, SplitMode, "Failed to parse split mode setting");
ecbff1
 
ecbff1
+int config_parse_line_max(
ecbff1
+                const char* unit,
ecbff1
+                const char *filename,
ecbff1
+                unsigned line,
ecbff1
+                const char *section,
ecbff1
+                unsigned section_line,
ecbff1
+                const char *lvalue,
ecbff1
+                int ltype,
ecbff1
+                const char *rvalue,
ecbff1
+                void *data,
ecbff1
+                void *userdata) {
ecbff1
+
ecbff1
+        size_t *sz = data;
ecbff1
+        int r;
ecbff1
+
ecbff1
+        assert(filename);
ecbff1
+        assert(lvalue);
ecbff1
+        assert(rvalue);
ecbff1
+        assert(data);
ecbff1
+
ecbff1
+        if (isempty(rvalue))
ecbff1
+                /* Empty assignment means default */
ecbff1
+                *sz = DEFAULT_LINE_MAX;
ecbff1
+        else {
ecbff1
+                uint64_t v;
ecbff1
+                off_t u;
ecbff1
+
ecbff1
+                r = parse_size(rvalue, 1024, &u);
ecbff1
+                if (r < 0) {
ecbff1
+                        log_syntax(unit, LOG_ERR, filename, line, r, "Failed to parse LineMax= value, ignoring: %s", rvalue);
ecbff1
+                        return 0;
ecbff1
+                }
ecbff1
+
ecbff1
+                /* Backport note */
ecbff1
+                /* Upstream ditched use of off_t however our parse_size implementation still takes off_t*
ecbff1
+                 * as an argument. Since we compile with -Werror, we have two choices, either disable sign-compare
ecbff1
+                 * warning or do this casting so we don't have to change rest of the code. I think it is
ecbff1
+                 * better to do cast here instead of rewriting the code so it deals with off_t instead of
ecbff1
+                 * uint64_t. Doing conversion off_t -> uint64_t is something that we should think about. */
ecbff1
+                v = (uint64_t) u;
ecbff1
+
ecbff1
+                if (v < 79) {
ecbff1
+                        /* Why specify 79 here as minimum line length? Simply, because the most common traditional
ecbff1
+                         * terminal size is 80ch, and it might make sense to break one character before the natural
ecbff1
+                         * line break would occur on that. */
ecbff1
+                        log_syntax(unit, LOG_WARNING, filename, line, 0, "LineMax= too small, clamping to 79: %s", rvalue);
ecbff1
+                        *sz = 79;
ecbff1
+                } else if (v > (uint64_t) (SSIZE_MAX-1)) {
ecbff1
+                        /* So, why specify SSIZE_MAX-1 here? Because that's one below the largest size value read()
ecbff1
+                         * can return, and we need one extra byte for the trailing NUL byte. Of course IRL such large
ecbff1
+                         * memory allocations will fail anyway, hence this limit is mostly theoretical anyway, as we'll
ecbff1
+                         * fail much earlier anyway. */
ecbff1
+                        log_syntax(unit, LOG_WARNING, filename, line, 0, "LineMax= too large, clamping to %" PRIu64 ": %s", (uint64_t) (SSIZE_MAX-1), rvalue);
ecbff1
+                        *sz = SSIZE_MAX-1;
ecbff1
+                } else
ecbff1
+                        *sz = (size_t) v;
ecbff1
+        }
ecbff1
+
ecbff1
+        return 0;
ecbff1
+}
ecbff1
+
ecbff1
 static uint64_t available_space(Server *s, bool verbose) {
ecbff1
         char ids[33];
ecbff1
         _cleanup_free_ char *p = NULL;
ecbff1
@@ -1518,6 +1584,8 @@ int server_init(Server *s) {
ecbff1
         s->max_level_console = LOG_INFO;
ecbff1
         s->max_level_wall = LOG_EMERG;
ecbff1
 
ecbff1
+        s->line_max = DEFAULT_LINE_MAX;
ecbff1
+
ecbff1
         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
ecbff1
         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
ecbff1
 
ecbff1
diff --git a/src/journal/journald-server.h b/src/journal/journald-server.h
ecbff1
index 7a456c2d5..b29410778 100644
ecbff1
--- a/src/journal/journald-server.h
ecbff1
+++ b/src/journal/journald-server.h
ecbff1
@@ -143,6 +143,8 @@ typedef struct Server {
ecbff1
 
ecbff1
         /* Cached cgroup root, so that we don't have to query that all the time */
ecbff1
         char *cgroup_root;
ecbff1
+
ecbff1
+        size_t line_max;
ecbff1
 } Server;
ecbff1
 
ecbff1
 #define N_IOVEC_META_FIELDS 20
ecbff1
@@ -157,6 +159,7 @@ void server_driver_message(Server *s, sd_id128_t message_id, const char *format,
ecbff1
 const struct ConfigPerfItem* journald_gperf_lookup(const char *key, unsigned length);
ecbff1
 
ecbff1
 int config_parse_storage(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
ecbff1
+int config_parse_line_max(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
ecbff1
 
ecbff1
 const char *storage_to_string(Storage s) _const_;
ecbff1
 Storage storage_from_string(const char *s) _pure_;
ecbff1
diff --git a/src/journal/journald-stream.c b/src/journal/journald-stream.c
ecbff1
index 15c9110c0..4d6b7ad18 100644
ecbff1
--- a/src/journal/journald-stream.c
ecbff1
+++ b/src/journal/journald-stream.c
ecbff1
@@ -53,6 +53,16 @@ typedef enum StdoutStreamState {
ecbff1
         STDOUT_STREAM_RUNNING
ecbff1
 } StdoutStreamState;
ecbff1
 
ecbff1
+/* The different types of log record terminators: a real \n was read, a NUL character was read, the maximum line length
ecbff1
+ * was reached, or the end of the stream was reached */
ecbff1
+
ecbff1
+typedef enum LineBreak {
ecbff1
+        LINE_BREAK_NEWLINE,
ecbff1
+        LINE_BREAK_NUL,
ecbff1
+        LINE_BREAK_LINE_MAX,
ecbff1
+        LINE_BREAK_EOF,
ecbff1
+} LineBreak;
ecbff1
+
ecbff1
 struct StdoutStream {
ecbff1
         Server *server;
ecbff1
         StdoutStreamState state;
ecbff1
@@ -71,14 +81,17 @@ struct StdoutStream {
ecbff1
 
ecbff1
         bool fdstore:1;
ecbff1
 
ecbff1
-        char buffer[LINE_MAX+1];
ecbff1
+        char *buffer;
ecbff1
         size_t length;
ecbff1
+        size_t allocated;
ecbff1
 
ecbff1
         sd_event_source *event_source;
ecbff1
 
ecbff1
         char *state_file;
ecbff1
 
ecbff1
         LIST_FIELDS(StdoutStream, stdout_stream);
ecbff1
+
ecbff1
+        char id_field[sizeof("_STREAM_ID=")-1 + SD_ID128_STRING_MAX];
ecbff1
 };
ecbff1
 
ecbff1
 void stdout_stream_free(StdoutStream *s) {
ecbff1
@@ -101,6 +114,7 @@ void stdout_stream_free(StdoutStream *s) {
ecbff1
         free(s->identifier);
ecbff1
         free(s->unit_id);
ecbff1
         free(s->state_file);
ecbff1
+        free(s->buffer);
ecbff1
 
ecbff1
         free(s);
ecbff1
 }
ecbff1
@@ -151,12 +165,14 @@ static int stdout_stream_save(StdoutStream *s) {
ecbff1
                 "LEVEL_PREFIX=%i\n"
ecbff1
                 "FORWARD_TO_SYSLOG=%i\n"
ecbff1
                 "FORWARD_TO_KMSG=%i\n"
ecbff1
-                "FORWARD_TO_CONSOLE=%i\n",
ecbff1
+                "FORWARD_TO_CONSOLE=%i\n"
ecbff1
+                "STREAM_ID=%s\n",
ecbff1
                 s->priority,
ecbff1
                 s->level_prefix,
ecbff1
                 s->forward_to_syslog,
ecbff1
                 s->forward_to_kmsg,
ecbff1
-                s->forward_to_console);
ecbff1
+                s->forward_to_console,
ecbff1
+                s->id_field + strlen("_STREAM_ID="));
ecbff1
 
ecbff1
         if (!isempty(s->identifier)) {
ecbff1
                 _cleanup_free_ char *escaped;
ecbff1
@@ -211,8 +227,8 @@ finish:
ecbff1
         return r;
ecbff1
 }
ecbff1
 
ecbff1
-static int stdout_stream_log(StdoutStream *s, const char *p) {
ecbff1
-        struct iovec iovec[N_IOVEC_META_FIELDS + 5];
ecbff1
+static int stdout_stream_log(StdoutStream *s, const char *p, LineBreak line_break) {
ecbff1
+        struct iovec iovec[N_IOVEC_META_FIELDS + 7];
ecbff1
         int priority;
ecbff1
         char syslog_priority[] = "PRIORITY=\0";
ecbff1
         char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
ecbff1
@@ -245,6 +261,8 @@ static int stdout_stream_log(StdoutStream *s, const char *p) {
ecbff1
 
ecbff1
         IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
ecbff1
 
ecbff1
+        IOVEC_SET_STRING(iovec[n++], s->id_field);
ecbff1
+
ecbff1
         syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
ecbff1
         IOVEC_SET_STRING(iovec[n++], syslog_priority);
ecbff1
 
ecbff1
@@ -259,6 +277,18 @@ static int stdout_stream_log(StdoutStream *s, const char *p) {
ecbff1
                         IOVEC_SET_STRING(iovec[n++], syslog_identifier);
ecbff1
         }
ecbff1
 
ecbff1
+        if (line_break != LINE_BREAK_NEWLINE) {
ecbff1
+                const char *c;
ecbff1
+
ecbff1
+                /* If this log message was generated due to an uncommon line break then mention this in the log
ecbff1
+                 * entry */
ecbff1
+
ecbff1
+                c =     line_break == LINE_BREAK_NUL ?      "_LINE_BREAK=nul" :
ecbff1
+                        line_break == LINE_BREAK_LINE_MAX ? "_LINE_BREAK=line-max" :
ecbff1
+                                                            "_LINE_BREAK=eof";
ecbff1
+                IOVEC_SET_STRING(iovec[n++], c);
ecbff1
+        }
ecbff1
+
ecbff1
         message = strappend("MESSAGE=", p);
ecbff1
         if (message)
ecbff1
                 IOVEC_SET_STRING(iovec[n++], message);
ecbff1
@@ -268,12 +298,18 @@ static int stdout_stream_log(StdoutStream *s, const char *p) {
ecbff1
         return 0;
ecbff1
 }
ecbff1
 
ecbff1
-static int stdout_stream_line(StdoutStream *s, char *p) {
ecbff1
+static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) {
ecbff1
         int r;
ecbff1
 
ecbff1
         assert(s);
ecbff1
         assert(p);
ecbff1
 
ecbff1
+        /* line breaks by NUL, line max length or EOF are not permissible during the negotiation part of the protocol */
ecbff1
+        if (line_break != LINE_BREAK_NEWLINE && s->state != STDOUT_STREAM_RUNNING) {
ecbff1
+                log_warning("Control protocol line not properly terminated.");
ecbff1
+                return -EINVAL;
ecbff1
+        }
ecbff1
+
ecbff1
         p = strstrip(p);
ecbff1
 
ecbff1
         switch (s->state) {
ecbff1
@@ -362,7 +398,7 @@ static int stdout_stream_line(StdoutStream *s, char *p) {
ecbff1
                 return 0;
ecbff1
 
ecbff1
         case STDOUT_STREAM_RUNNING:
ecbff1
-                return stdout_stream_log(s, p);
ecbff1
+                return stdout_stream_log(s, p, line_break);
ecbff1
         }
ecbff1
 
ecbff1
         assert_not_reached("Unknown stream state");
ecbff1
@@ -378,21 +414,32 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
ecbff1
         p = s->buffer;
ecbff1
         remaining = s->length;
ecbff1
         for (;;) {
ecbff1
-                char *end;
ecbff1
+                LineBreak line_break;
ecbff1
                 size_t skip;
ecbff1
 
ecbff1
-                end = memchr(p, '\n', remaining);
ecbff1
-                if (end)
ecbff1
-                        skip = end - p + 1;
ecbff1
-                else if (remaining >= sizeof(s->buffer) - 1) {
ecbff1
-                        end = p + sizeof(s->buffer) - 1;
ecbff1
+                char *end1, *end2;
ecbff1
+
ecbff1
+                end1 = memchr(p, '\n', remaining);
ecbff1
+                end2 = memchr(p, 0, end1 ? (size_t) (end1 - p) : remaining);
ecbff1
+
ecbff1
+                if (end2) {
ecbff1
+                        /* We found a NUL terminator */
ecbff1
+                        skip = end2 - p + 1;
ecbff1
+                        line_break = LINE_BREAK_NUL;
ecbff1
+                } else if (end1) {
ecbff1
+                        /* We found a \n terminator */
ecbff1
+                        *end1 = 0;
ecbff1
+                        skip = end1 - p + 1;
ecbff1
+                        line_break = LINE_BREAK_NEWLINE;
ecbff1
+                } else if (remaining >= s->server->line_max) {
ecbff1
+                        /* Force a line break after the maximum line length */
ecbff1
+                        *(p + s->server->line_max) = 0;
ecbff1
                         skip = remaining;
ecbff1
+                        line_break = LINE_BREAK_LINE_MAX;
ecbff1
                 } else
ecbff1
                         break;
ecbff1
 
ecbff1
-                *end = 0;
ecbff1
-
ecbff1
-                r = stdout_stream_line(s, p);
ecbff1
+                r = stdout_stream_line(s, p, line_break);
ecbff1
                 if (r < 0)
ecbff1
                         return r;
ecbff1
 
ecbff1
@@ -402,7 +449,7 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
ecbff1
 
ecbff1
         if (force_flush && remaining > 0) {
ecbff1
                 p[remaining] = 0;
ecbff1
-                r = stdout_stream_line(s, p);
ecbff1
+                r = stdout_stream_line(s, p, LINE_BREAK_EOF);
ecbff1
                 if (r < 0)
ecbff1
                         return r;
ecbff1
 
ecbff1
@@ -420,6 +467,7 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
ecbff1
 
ecbff1
 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
ecbff1
         StdoutStream *s = userdata;
ecbff1
+        size_t limit;
ecbff1
         ssize_t l;
ecbff1
         int r;
ecbff1
 
ecbff1
@@ -430,9 +478,20 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents,
ecbff1
                 goto terminate;
ecbff1
         }
ecbff1
 
ecbff1
-        l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
ecbff1
-        if (l < 0) {
ecbff1
+        /* If the buffer is full already (discounting the extra NUL we need), add room for another 1K */
ecbff1
+        if (s->length + 1 >= s->allocated) {
ecbff1
+                if (!GREEDY_REALLOC(s->buffer, s->allocated, s->length + 1 + 1024)) {
ecbff1
+                        log_oom();
ecbff1
+                        goto terminate;
ecbff1
+                }
ecbff1
+        }
ecbff1
+
ecbff1
+        /* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also,
ecbff1
+         * always leave room for a terminating NUL we might need to add. */
ecbff1
+        limit = MIN(s->allocated - 1, s->server->line_max);
ecbff1
 
ecbff1
+        l = read(s->fd, s->buffer + s->length, limit - s->length);
ecbff1
+        if (l < 0) {
ecbff1
                 if (errno == EAGAIN)
ecbff1
                         return 0;
ecbff1
 
ecbff1
@@ -459,11 +518,16 @@ terminate:
ecbff1
 
ecbff1
 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
ecbff1
         _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
ecbff1
+        sd_id128_t id;
ecbff1
         int r;
ecbff1
 
ecbff1
         assert(s);
ecbff1
         assert(fd >= 0);
ecbff1
 
ecbff1
+        r = sd_id128_randomize(&id;;
ecbff1
+        if (r < 0)
ecbff1
+                return log_error_errno(r, "Failed to generate stream ID: %m");
ecbff1
+
ecbff1
         stream = new0(StdoutStream, 1);
ecbff1
         if (!stream)
ecbff1
                 return log_oom();
ecbff1
@@ -471,6 +535,8 @@ static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
ecbff1
         stream->fd = -1;
ecbff1
         stream->priority = LOG_INFO;
ecbff1
 
ecbff1
+        xsprintf(stream->id_field, "_STREAM_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(id));
ecbff1
+
ecbff1
         r = getpeercred(fd, &stream->ucred);
ecbff1
         if (r < 0)
ecbff1
                 return log_error_errno(r, "Failed to determine peer credentials: %m");
ecbff1
@@ -545,7 +611,8 @@ static int stdout_stream_load(StdoutStream *stream, const char *fname) {
ecbff1
                 *level_prefix = NULL,
ecbff1
                 *forward_to_syslog = NULL,
ecbff1
                 *forward_to_kmsg = NULL,
ecbff1
-                *forward_to_console = NULL;
ecbff1
+                *forward_to_console = NULL,
ecbff1
+                *stream_id = NULL;
ecbff1
         int r;
ecbff1
 
ecbff1
         assert(stream);
ecbff1
@@ -565,6 +632,7 @@ static int stdout_stream_load(StdoutStream *stream, const char *fname) {
ecbff1
                            "FORWARD_TO_CONSOLE", &forward_to_console,
ecbff1
                            "IDENTIFIER", &stream->identifier,
ecbff1
                            "UNIT", &stream->unit_id,
ecbff1
+                           "STREAM_ID", &stream_id,
ecbff1
                            NULL);
ecbff1
         if (r < 0)
ecbff1
                 return log_error_errno(r, "Failed to read: %s", stream->state_file);
ecbff1
@@ -601,6 +669,14 @@ static int stdout_stream_load(StdoutStream *stream, const char *fname) {
ecbff1
                         stream->forward_to_console = r;
ecbff1
         }
ecbff1
 
ecbff1
+        if (stream_id) {
ecbff1
+                sd_id128_t id;
ecbff1
+
ecbff1
+                r = sd_id128_from_string(stream_id, &id;;
ecbff1
+                if (r >= 0)
ecbff1
+                        xsprintf(stream->id_field, "_STREAM_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(id));
ecbff1
+        }
ecbff1
+
ecbff1
         return 0;
ecbff1
 }
ecbff1
 
ecbff1
diff --git a/src/journal/journald.conf b/src/journal/journald.conf
ecbff1
index 3907dfb7f..5355ec2b2 100644
ecbff1
--- a/src/journal/journald.conf
ecbff1
+++ b/src/journal/journald.conf
ecbff1
@@ -37,3 +37,4 @@
ecbff1
 #MaxLevelKMsg=notice
ecbff1
 #MaxLevelConsole=info
ecbff1
 #MaxLevelWall=emerg
ecbff1
+#LineMax=48K