1ff636
From 47ac92420da9ecbffaf3aa0046d170be358639a2 Mon Sep 17 00:00:00 2001
1ff636
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl>
1ff636
Date: Fri, 13 Mar 2015 00:02:28 -0400
1ff636
Subject: [PATCH] journal-remote: process events without delay
1ff636
1ff636
journal-remote buffers input, and then parses it handling one journal entry at a time.
1ff636
It was possible for useful data to be left in the buffer after some entries were
1ff636
processesed. But all data would be already read from the fd, so there would be
1ff636
no reason for the event loop to call the handler again. After some new data came in,
1ff636
the handler would be called again, and would then process the "old" data in the buffer.
1ff636
1ff636
Fix this by enabling a handler wherever we process input data and do not exhaust data
1ff636
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until
1ff636
we encounter EAGAIN.
1ff636
1ff636
Looping over the input data is done in this roundabout way to allow the event loop
1ff636
to dispatch other events in the meanwhile. If the loop was inside the handler, a
1ff636
source which produced data fast enough could completely monopolize the process.
1ff636
1ff636
https://bugs.freedesktop.org/show_bug.cgi?id=89516
1ff636
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b)
1ff636
---
1ff636
 src/journal-remote/journal-remote-parse.c |  1 +
1ff636
 src/journal-remote/journal-remote-parse.h |  1 +
1ff636
 src/journal-remote/journal-remote.c       | 65 +++++++++++++++++++++++++++----
1ff636
 3 files changed, 59 insertions(+), 8 deletions(-)
1ff636
1ff636
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
181b3f
index 6c096de03..7e6295435 100644
1ff636
--- a/src/journal-remote/journal-remote-parse.c
1ff636
+++ b/src/journal-remote/journal-remote-parse.c
1ff636
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
1ff636
         writer_unref(source->writer);
1ff636
 
1ff636
         sd_event_source_unref(source->event);
1ff636
+        sd_event_source_unref(source->buffer_event);
1ff636
 
1ff636
         free(source);
1ff636
 }
1ff636
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
181b3f
index 22db55091..06a50296a 100644
1ff636
--- a/src/journal-remote/journal-remote-parse.h
1ff636
+++ b/src/journal-remote/journal-remote-parse.h
1ff636
@@ -54,6 +54,7 @@ typedef struct RemoteSource {
1ff636
         Writer *writer;
1ff636
 
1ff636
         sd_event_source *event;
1ff636
+        sd_event_source *buffer_event;
1ff636
 } RemoteSource;
1ff636
 
1ff636
 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
1ff636
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
181b3f
index d1486e7cd..b7cc6d717 100644
1ff636
--- a/src/journal-remote/journal-remote.c
1ff636
+++ b/src/journal-remote/journal-remote.c
1ff636
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event,
1ff636
                                      int fd,
1ff636
                                      uint32_t revents,
1ff636
                                      void *userdata);
1ff636
+static int dispatch_raw_source_until_block(sd_event_source *event,
1ff636
+                                           void *userdata);
1ff636
 static int dispatch_blocking_source_event(sd_event_source *event,
1ff636
                                           void *userdata);
1ff636
 static int dispatch_raw_connection_event(sd_event_source *event,
1ff636
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
1ff636
 
1ff636
         r = sd_event_add_io(s->events, &source->event,
1ff636
                             fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
1ff636
-                            dispatch_raw_source_event, s);
1ff636
-        if (r == -EPERM) {
1ff636
+                            dispatch_raw_source_event, source);
1ff636
+        if (r == 0) {
1ff636
+                /* Add additional source for buffer processing. It will be
1ff636
+                 * enabled later. */
1ff636
+                r = sd_event_add_defer(s->events, &source->buffer_event,
1ff636
+                                       dispatch_raw_source_until_block, source);
1ff636
+                if (r == 0)
1ff636
+                        sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
1ff636
+        } else if (r == -EPERM) {
1ff636
                 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
1ff636
                 r = sd_event_add_defer(s->events, &source->event,
1ff636
                                        dispatch_blocking_source_event, source);
1ff636
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) {
1ff636
  **********************************************************************
1ff636
  **********************************************************************/
1ff636
 
1ff636
-static int dispatch_raw_source_event(sd_event_source *event,
1ff636
-                                     int fd,
1ff636
-                                     uint32_t revents,
1ff636
-                                     void *userdata) {
1ff636
+static int handle_raw_source(sd_event_source *event,
1ff636
+                             int fd,
1ff636
+                             uint32_t revents,
1ff636
+                             RemoteServer *s) {
1ff636
 
1ff636
-        RemoteServer *s = userdata;
1ff636
         RemoteSource *source;
1ff636
         int r;
1ff636
 
1ff636
+        /* Returns 1 if there might be more data pending,
1ff636
+         * 0 if data is currently exhausted, negative on error.
1ff636
+         */
1ff636
+
1ff636
         assert(fd >= 0 && fd < (ssize_t) s->sources_size);
1ff636
         source = s->sources[fd];
1ff636
         assert(source->fd == fd);
1ff636
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event,
1ff636
                 return 1;
1ff636
 }
1ff636
 
1ff636
+static int dispatch_raw_source_until_block(sd_event_source *event,
1ff636
+                                           void *userdata) {
1ff636
+        RemoteSource *source = userdata;
1ff636
+        int r;
1ff636
+
1ff636
+        /* Make sure event stays around even if source is destroyed */
1ff636
+        sd_event_source_ref(event);
1ff636
+
1ff636
+        r = handle_raw_source(event, source->fd, EPOLLIN, server);
1ff636
+        if (r != 1)
1ff636
+                /* No more data for now */
1ff636
+                sd_event_source_set_enabled(event, SD_EVENT_OFF);
1ff636
+
1ff636
+        sd_event_source_unref(event);
1ff636
+
1ff636
+        return r;
1ff636
+}
1ff636
+
1ff636
+static int dispatch_raw_source_event(sd_event_source *event,
1ff636
+                                     int fd,
1ff636
+                                     uint32_t revents,
1ff636
+                                     void *userdata) {
1ff636
+        RemoteSource *source = userdata;
1ff636
+        int r;
1ff636
+
1ff636
+        assert(source->event);
1ff636
+        assert(source->buffer_event);
1ff636
+
1ff636
+        r = handle_raw_source(event, fd, EPOLLIN, server);
1ff636
+        if (r == 1)
1ff636
+                /* Might have more data. We need to rerun the handler
1ff636
+                 * until we are sure the buffer is exhausted. */
1ff636
+                sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
1ff636
+
1ff636
+        return r;
1ff636
+}
1ff636
+
1ff636
 static int dispatch_blocking_source_event(sd_event_source *event,
1ff636
                                           void *userdata) {
1ff636
         RemoteSource *source = userdata;
1ff636
 
1ff636
-        return dispatch_raw_source_event(event, source->fd, EPOLLIN, server);
1ff636
+        return handle_raw_source(event, source->fd, EPOLLIN, server);
1ff636
 }
1ff636
 
1ff636
 static int accept_connection(const char* type, int fd,