b677e7
From 395eb7753a9772f505102fbbe3ba3261b57abbe9 Mon Sep 17 00:00:00 2001
b677e7
From: Lennart Poettering <lennart@poettering.net>
b677e7
Date: Mon, 23 Nov 2020 18:02:40 +0100
b677e7
Subject: [PATCH] sd-event: add ability to ratelimit event sources
b677e7
b677e7
Let's a concept of "rate limiting" to event sources: if specific event
b677e7
sources fire too often in some time interval temporarily take them
b677e7
offline, and take them back online once the interval passed.
b677e7
b677e7
This is a simple scheme of avoiding starvation of event sources if some
b677e7
event source fires too often.
b677e7
b677e7
This introduces the new conceptual states of "offline" and "online" for
b677e7
event sources: an event source is "online" only when enabled *and* not
b677e7
ratelimited, and offline in all other cases. An event source that is
b677e7
online hence has its fds registered in the epoll, its signals in the
b677e7
signalfd and so on.
b677e7
b677e7
(cherry picked from commit b6d5481b3d9f7c9b1198ab54b54326ec73e855bf)
b677e7
b677e7
Related: #1819868
b677e7
---
b677e7
 src/basic/ratelimit.h              |   8 +
b677e7
 src/libsystemd/libsystemd.sym      |   7 +
b677e7
 src/libsystemd/sd-event/sd-event.c | 433 +++++++++++++++++++++++------
b677e7
 src/systemd/sd-event.h             |   3 +
b677e7
 4 files changed, 369 insertions(+), 82 deletions(-)
b677e7
b677e7
diff --git a/src/basic/ratelimit.h b/src/basic/ratelimit.h
b677e7
index de91def28d..0012b49935 100644
b677e7
--- a/src/basic/ratelimit.h
b677e7
+++ b/src/basic/ratelimit.h
b677e7
@@ -38,3 +38,11 @@ typedef struct RateLimit {
b677e7
         } while (false)
b677e7
 
b677e7
 bool ratelimit_below(RateLimit *r);
b677e7
+
b677e7
+static inline void ratelimit_reset(RateLimit *rl) {
b677e7
+        rl->num = rl->begin = 0;
b677e7
+}
b677e7
+
b677e7
+static inline bool ratelimit_configured(RateLimit *rl) {
b677e7
+        return rl->interval > 0 && rl->burst > 0;
b677e7
+}
b677e7
diff --git a/src/libsystemd/libsystemd.sym b/src/libsystemd/libsystemd.sym
b677e7
index 778e88a16c..149d2e7b82 100644
b677e7
--- a/src/libsystemd/libsystemd.sym
b677e7
+++ b/src/libsystemd/libsystemd.sym
b677e7
@@ -572,3 +572,10 @@ global:
b677e7
         sd_bus_enqueue_for_read;
b677e7
         sd_event_source_disable_unref;
b677e7
 } LIBSYSTEMD_238;
b677e7
+
b677e7
+LIBSYSTEMD_248 {
b677e7
+global:
b677e7
+        sd_event_source_set_ratelimit;
b677e7
+        sd_event_source_get_ratelimit;
b677e7
+        sd_event_source_is_ratelimited;
b677e7
+} LIBSYSTEMD_239;
b677e7
diff --git a/src/libsystemd/sd-event/sd-event.c b/src/libsystemd/sd-event/sd-event.c
b677e7
index d18ce28a92..be912d94e3 100644
b677e7
--- a/src/libsystemd/sd-event/sd-event.c
b677e7
+++ b/src/libsystemd/sd-event/sd-event.c
b677e7
@@ -19,6 +19,7 @@
b677e7
 #include "missing.h"
b677e7
 #include "prioq.h"
b677e7
 #include "process-util.h"
b677e7
+#include "ratelimit.h"
b677e7
 #include "set.h"
b677e7
 #include "signal-util.h"
b677e7
 #include "string-table.h"
b677e7
@@ -46,6 +47,7 @@ typedef enum EventSourceType {
b677e7
         _SOURCE_EVENT_SOURCE_TYPE_INVALID = -1
b677e7
 } EventSourceType;
b677e7
 
b677e7
+
b677e7
 static const char* const event_source_type_table[_SOURCE_EVENT_SOURCE_TYPE_MAX] = {
b677e7
         [SOURCE_IO] = "io",
b677e7
         [SOURCE_TIME_REALTIME] = "realtime",
b677e7
@@ -76,7 +78,25 @@ typedef enum WakeupType {
b677e7
         _WAKEUP_TYPE_INVALID = -1,
b677e7
 } WakeupType;
b677e7
 
b677e7
-#define EVENT_SOURCE_IS_TIME(t) IN_SET((t), SOURCE_TIME_REALTIME, SOURCE_TIME_BOOTTIME, SOURCE_TIME_MONOTONIC, SOURCE_TIME_REALTIME_ALARM, SOURCE_TIME_BOOTTIME_ALARM)
b677e7
+#define EVENT_SOURCE_IS_TIME(t)                 \
b677e7
+        IN_SET((t),                             \
b677e7
+               SOURCE_TIME_REALTIME,            \
b677e7
+               SOURCE_TIME_BOOTTIME,            \
b677e7
+               SOURCE_TIME_MONOTONIC,           \
b677e7
+               SOURCE_TIME_REALTIME_ALARM,      \
b677e7
+               SOURCE_TIME_BOOTTIME_ALARM)
b677e7
+
b677e7
+#define EVENT_SOURCE_CAN_RATE_LIMIT(t)          \
b677e7
+        IN_SET((t),                             \
b677e7
+               SOURCE_IO,                       \
b677e7
+               SOURCE_TIME_REALTIME,            \
b677e7
+               SOURCE_TIME_BOOTTIME,            \
b677e7
+               SOURCE_TIME_MONOTONIC,           \
b677e7
+               SOURCE_TIME_REALTIME_ALARM,      \
b677e7
+               SOURCE_TIME_BOOTTIME_ALARM,      \
b677e7
+               SOURCE_SIGNAL,                   \
b677e7
+               SOURCE_DEFER,                    \
b677e7
+               SOURCE_INOTIFY)
b677e7
 
b677e7
 struct inode_data;
b677e7
 
b677e7
@@ -96,6 +116,7 @@ struct sd_event_source {
b677e7
         bool pending:1;
b677e7
         bool dispatching:1;
b677e7
         bool floating:1;
b677e7
+        bool ratelimited:1;
b677e7
 
b677e7
         int64_t priority;
b677e7
         unsigned pending_index;
b677e7
@@ -107,6 +128,10 @@ struct sd_event_source {
b677e7
 
b677e7
         LIST_FIELDS(sd_event_source, sources);
b677e7
 
b677e7
+        RateLimit rate_limit;
b677e7
+
b677e7
+        /* These are primarily fields relevant for time event sources, but since any event source can
b677e7
+         * effectively become one when rate-limited, this is part of the common fields. */
b677e7
         unsigned earliest_index;
b677e7
         unsigned latest_index;
b677e7
 
b677e7
@@ -266,7 +291,7 @@ struct sd_event {
b677e7
         Hashmap *signal_data; /* indexed by priority */
b677e7
 
b677e7
         Hashmap *child_sources;
b677e7
-        unsigned n_enabled_child_sources;
b677e7
+        unsigned n_online_child_sources;
b677e7
 
b677e7
         Set *post_sources;
b677e7
 
b677e7
@@ -311,12 +336,23 @@ static thread_local sd_event *default_event = NULL;
b677e7
 static void source_disconnect(sd_event_source *s);
b677e7
 static void event_gc_inode_data(sd_event *e, struct inode_data *d);
b677e7
 
b677e7
+static bool event_source_is_online(sd_event_source *s) {
b677e7
+        assert(s);
b677e7
+        return s->enabled != SD_EVENT_OFF && !s->ratelimited;
b677e7
+}
b677e7
+
b677e7
+static bool event_source_is_offline(sd_event_source *s) {
b677e7
+        assert(s);
b677e7
+        return s->enabled == SD_EVENT_OFF || s->ratelimited;
b677e7
+}
b677e7
+
b677e7
 static sd_event *event_resolve(sd_event *e) {
b677e7
         return e == SD_EVENT_DEFAULT ? default_event : e;
b677e7
 }
b677e7
 
b677e7
 static int pending_prioq_compare(const void *a, const void *b) {
b677e7
         const sd_event_source *x = a, *y = b;
b677e7
+        int r;
b677e7
 
b677e7
         assert(x->pending);
b677e7
         assert(y->pending);
b677e7
@@ -327,23 +363,23 @@ static int pending_prioq_compare(const void *a, const void *b) {
b677e7
         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
b677e7
                 return 1;
b677e7
 
b677e7
+        /* Non rate-limited ones first. */
b677e7
+        r = CMP(!!x->ratelimited, !!y->ratelimited);
b677e7
+        if (r != 0)
b677e7
+                return r;
b677e7
+
b677e7
         /* Lower priority values first */
b677e7
-        if (x->priority < y->priority)
b677e7
-                return -1;
b677e7
-        if (x->priority > y->priority)
b677e7
-                return 1;
b677e7
+        r = CMP(x->priority, y->priority);
b677e7
+        if (r != 0)
b677e7
+                return r;
b677e7
 
b677e7
         /* Older entries first */
b677e7
-        if (x->pending_iteration < y->pending_iteration)
b677e7
-                return -1;
b677e7
-        if (x->pending_iteration > y->pending_iteration)
b677e7
-                return 1;
b677e7
-
b677e7
-        return 0;
b677e7
+        return CMP(x->pending_iteration, y->pending_iteration);
b677e7
 }
b677e7
 
b677e7
 static int prepare_prioq_compare(const void *a, const void *b) {
b677e7
         const sd_event_source *x = a, *y = b;
b677e7
+        int r;
b677e7
 
b677e7
         assert(x->prepare);
b677e7
         assert(y->prepare);
b677e7
@@ -354,29 +390,46 @@ static int prepare_prioq_compare(const void *a, const void *b) {
b677e7
         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
b677e7
                 return 1;
b677e7
 
b677e7
+        /* Non rate-limited ones first. */
b677e7
+        r = CMP(!!x->ratelimited, !!y->ratelimited);
b677e7
+        if (r != 0)
b677e7
+                return r;
b677e7
+
b677e7
         /* Move most recently prepared ones last, so that we can stop
b677e7
          * preparing as soon as we hit one that has already been
b677e7
          * prepared in the current iteration */
b677e7
-        if (x->prepare_iteration < y->prepare_iteration)
b677e7
-                return -1;
b677e7
-        if (x->prepare_iteration > y->prepare_iteration)
b677e7
-                return 1;
b677e7
+        r = CMP(x->prepare_iteration, y->prepare_iteration);
b677e7
+        if (r != 0)
b677e7
+                return r;
b677e7
 
b677e7
         /* Lower priority values first */
b677e7
-        if (x->priority < y->priority)
b677e7
-                return -1;
b677e7
-        if (x->priority > y->priority)
b677e7
-                return 1;
b677e7
+        return CMP(x->priority, y->priority);
b677e7
+}
b677e7
 
b677e7
-        return 0;
b677e7
+static usec_t time_event_source_next(const sd_event_source *s) {
b677e7
+        assert(s);
b677e7
+
b677e7
+        /* We have two kinds of event sources that have elapsation times associated with them: the actual
b677e7
+         * time based ones and the ones for which a ratelimit can be in effect (where we want to be notified
b677e7
+         * once the ratelimit time window ends). Let's return the next elapsing time depending on what we are
b677e7
+         * looking at here. */
b677e7
+
b677e7
+        if (s->ratelimited) { /* If rate-limited the next elapsation is when the ratelimit time window ends */
b677e7
+                assert(s->rate_limit.begin != 0);
b677e7
+                assert(s->rate_limit.interval != 0);
b677e7
+                return usec_add(s->rate_limit.begin, s->rate_limit.interval);
b677e7
+        }
b677e7
+
b677e7
+        /* Otherwise this must be a time event source, if not ratelimited */
b677e7
+        if (EVENT_SOURCE_IS_TIME(s->type))
b677e7
+                return s->time.next;
b677e7
+
b677e7
+        return USEC_INFINITY;
b677e7
 }
b677e7
 
b677e7
 static int earliest_time_prioq_compare(const void *a, const void *b) {
b677e7
         const sd_event_source *x = a, *y = b;
b677e7
 
b677e7
-        assert(EVENT_SOURCE_IS_TIME(x->type));
b677e7
-        assert(x->type == y->type);
b677e7
-
b677e7
         /* Enabled ones first */
b677e7
         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
b677e7
                 return -1;
b677e7
@@ -390,24 +443,30 @@ static int earliest_time_prioq_compare(const void *a, const void *b) {
b677e7
                 return 1;
b677e7
 
b677e7
         /* Order by time */
b677e7
-        if (x->time.next < y->time.next)
b677e7
-                return -1;
b677e7
-        if (x->time.next > y->time.next)
b677e7
-                return 1;
b677e7
-
b677e7
-        return 0;
b677e7
+        return CMP(time_event_source_next(x), time_event_source_next(y));
b677e7
 }
b677e7
 
b677e7
 static usec_t time_event_source_latest(const sd_event_source *s) {
b677e7
-        return usec_add(s->time.next, s->time.accuracy);
b677e7
+        assert(s);
b677e7
+
b677e7
+        if (s->ratelimited) { /* For ratelimited stuff the earliest and the latest time shall actually be the
b677e7
+                               * same, as we should avoid adding additional inaccuracy on an inaccuracy time
b677e7
+                               * window */
b677e7
+                assert(s->rate_limit.begin != 0);
b677e7
+                assert(s->rate_limit.interval != 0);
b677e7
+                return usec_add(s->rate_limit.begin, s->rate_limit.interval);
b677e7
+        }
b677e7
+
b677e7
+        /* Must be a time event source, if not ratelimited */
b677e7
+        if (EVENT_SOURCE_IS_TIME(s->type))
b677e7
+                return usec_add(s->time.next, s->time.accuracy);
b677e7
+
b677e7
+        return USEC_INFINITY;
b677e7
 }
b677e7
 
b677e7
 static int latest_time_prioq_compare(const void *a, const void *b) {
b677e7
         const sd_event_source *x = a, *y = b;
b677e7
 
b677e7
-        assert(EVENT_SOURCE_IS_TIME(x->type));
b677e7
-        assert(x->type == y->type);
b677e7
-
b677e7
         /* Enabled ones first */
b677e7
         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
b677e7
                 return -1;
b677e7
@@ -852,12 +911,12 @@ static void event_gc_signal_data(sd_event *e, const int64_t *priority, int sig)
b677e7
          * the signalfd for it. */
b677e7
 
b677e7
         if (sig == SIGCHLD &&
b677e7
-            e->n_enabled_child_sources > 0)
b677e7
+            e->n_online_child_sources > 0)
b677e7
                 return;
b677e7
 
b677e7
         if (e->signal_sources &&
b677e7
             e->signal_sources[sig] &&
b677e7
-            e->signal_sources[sig]->enabled != SD_EVENT_OFF)
b677e7
+            event_source_is_online(e->signal_sources[sig]))
b677e7
                 return;
b677e7
 
b677e7
         /*
b677e7
@@ -904,11 +963,17 @@ static void event_source_time_prioq_reshuffle(sd_event_source *s) {
b677e7
         struct clock_data *d;
b677e7
 
b677e7
         assert(s);
b677e7
-        assert(EVENT_SOURCE_IS_TIME(s->type));
b677e7
 
b677e7
         /* Called whenever the event source's timer ordering properties changed, i.e. time, accuracy,
b677e7
          * pending, enable state. Makes sure the two prioq's are ordered properly again. */
b677e7
-        assert_se(d = event_get_clock_data(s->event, s->type));
b677e7
+
b677e7
+        if (s->ratelimited)
b677e7
+                d = &s->event->monotonic;
b677e7
+        else {
b677e7
+                assert(EVENT_SOURCE_IS_TIME(s->type));
b677e7
+                assert_se(d = event_get_clock_data(s->event, s->type));
b677e7
+        }
b677e7
+
b677e7
         prioq_reshuffle(d->earliest, s, &s->earliest_index);
b677e7
         prioq_reshuffle(d->latest, s, &s->latest_index);
b677e7
         d->needs_rearm = true;
b677e7
@@ -949,12 +1014,18 @@ static void source_disconnect(sd_event_source *s) {
b677e7
         case SOURCE_TIME_BOOTTIME:
b677e7
         case SOURCE_TIME_MONOTONIC:
b677e7
         case SOURCE_TIME_REALTIME_ALARM:
b677e7
-        case SOURCE_TIME_BOOTTIME_ALARM: {
b677e7
-                struct clock_data *d;
b677e7
-                assert_se(d = event_get_clock_data(s->event, s->type));
b677e7
-                event_source_time_prioq_remove(s, d);
b677e7
+        case SOURCE_TIME_BOOTTIME_ALARM:
b677e7
+                /* Only remove this event source from the time event source here if it is not ratelimited. If
b677e7
+                 * it is ratelimited, we'll remove it below, separately. Why? Because the clock used might
b677e7
+                 * differ: ratelimiting always uses CLOCK_MONOTONIC, but timer events might use any clock */
b677e7
+
b677e7
+                if (!s->ratelimited) {
b677e7
+                        struct clock_data *d;
b677e7
+                        assert_se(d = event_get_clock_data(s->event, s->type));
b677e7
+                        event_source_time_prioq_remove(s, d);
b677e7
+                }
b677e7
+
b677e7
                 break;
b677e7
-        }
b677e7
 
b677e7
         case SOURCE_SIGNAL:
b677e7
                 if (s->signal.sig > 0) {
b677e7
@@ -969,9 +1040,9 @@ static void source_disconnect(sd_event_source *s) {
b677e7
 
b677e7
         case SOURCE_CHILD:
b677e7
                 if (s->child.pid > 0) {
b677e7
-                        if (s->enabled != SD_EVENT_OFF) {
b677e7
-                                assert(s->event->n_enabled_child_sources > 0);
b677e7
-                                s->event->n_enabled_child_sources--;
b677e7
+                        if (event_source_is_online(s)) {
b677e7
+                                assert(s->event->n_online_child_sources > 0);
b677e7
+                                s->event->n_online_child_sources--;
b677e7
                         }
b677e7
 
b677e7
                         (void) hashmap_remove(s->event->child_sources, PID_TO_PTR(s->child.pid));
b677e7
@@ -1037,6 +1108,9 @@ static void source_disconnect(sd_event_source *s) {
b677e7
         if (s->prepare)
b677e7
                 prioq_remove(s->event->prepare, s, &s->prepare_index);
b677e7
 
b677e7
+        if (s->ratelimited)
b677e7
+                event_source_time_prioq_remove(s, &s->event->monotonic);
b677e7
+
b677e7
         event = s->event;
b677e7
 
b677e7
         s->type = _SOURCE_EVENT_SOURCE_TYPE_INVALID;
b677e7
@@ -1458,11 +1532,11 @@ _public_ int sd_event_add_child(
b677e7
                 return r;
b677e7
         }
b677e7
 
b677e7
-        e->n_enabled_child_sources++;
b677e7
+        e->n_online_child_sources++;
b677e7
 
b677e7
         r = event_make_signal_data(e, SIGCHLD, NULL);
b677e7
         if (r < 0) {
b677e7
-                e->n_enabled_child_sources--;
b677e7
+                e->n_online_child_sources--;
b677e7
                 source_free(s);
b677e7
                 return r;
b677e7
         }
b677e7
@@ -2079,7 +2153,7 @@ _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
b677e7
         if (s->io.fd == fd)
b677e7
                 return 0;
b677e7
 
b677e7
-        if (s->enabled == SD_EVENT_OFF) {
b677e7
+        if (event_source_is_offline(s)) {
b677e7
                 s->io.fd = fd;
b677e7
                 s->io.registered = false;
b677e7
         } else {
b677e7
@@ -2146,7 +2220,7 @@ _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events)
b677e7
         if (r < 0)
b677e7
                 return r;
b677e7
 
b677e7
-        if (s->enabled != SD_EVENT_OFF) {
b677e7
+        if (event_source_is_online(s)) {
b677e7
                 r = source_io_register(s, s->enabled, events);
b677e7
                 if (r < 0)
b677e7
                         return r;
b677e7
@@ -2249,7 +2323,7 @@ _public_ int sd_event_source_set_priority(sd_event_source *s, int64_t priority)
b677e7
 
b677e7
                 event_gc_inode_data(s->event, old_inode_data);
b677e7
 
b677e7
-        } else if (s->type == SOURCE_SIGNAL && s->enabled != SD_EVENT_OFF) {
b677e7
+        } else if (s->type == SOURCE_SIGNAL && event_source_is_online(s)) {
b677e7
                 struct signal_data *old, *d;
b677e7
 
b677e7
                 /* Move us from the signalfd belonging to the old
b677e7
@@ -2296,20 +2370,29 @@ _public_ int sd_event_source_get_enabled(sd_event_source *s, int *ret) {
b677e7
         return s->enabled != SD_EVENT_OFF;
b677e7
 }
b677e7
 
b677e7
-static int event_source_disable(sd_event_source *s) {
b677e7
+static int event_source_offline(
b677e7
+                sd_event_source *s,
b677e7
+                int enabled,
b677e7
+                bool ratelimited) {
b677e7
+
b677e7
+        bool was_offline;
b677e7
         int r;
b677e7
 
b677e7
         assert(s);
b677e7
-        assert(s->enabled != SD_EVENT_OFF);
b677e7
+        assert(enabled == SD_EVENT_OFF || ratelimited);
b677e7
 
b677e7
         /* Unset the pending flag when this event source is disabled */
b677e7
-        if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
b677e7
+        if (s->enabled != SD_EVENT_OFF &&
b677e7
+            enabled == SD_EVENT_OFF &&
b677e7
+            !IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
b677e7
                 r = source_set_pending(s, false);
b677e7
                 if (r < 0)
b677e7
                         return r;
b677e7
         }
b677e7
 
b677e7
-        s->enabled = SD_EVENT_OFF;
b677e7
+        was_offline = event_source_is_offline(s);
b677e7
+        s->enabled = enabled;
b677e7
+        s->ratelimited = ratelimited;
b677e7
 
b677e7
         switch (s->type) {
b677e7
 
b677e7
@@ -2330,8 +2413,10 @@ static int event_source_disable(sd_event_source *s) {
b677e7
                 break;
b677e7
 
b677e7
         case SOURCE_CHILD:
b677e7
-                assert(s->event->n_enabled_child_sources > 0);
b677e7
-                s->event->n_enabled_child_sources--;
b677e7
+                if (!was_offline) {
b677e7
+                        assert(s->event->n_online_child_sources > 0);
b677e7
+                        s->event->n_online_child_sources--;
b677e7
+                }
b677e7
 
b677e7
                 event_gc_signal_data(s->event, &s->priority, SIGCHLD);
b677e7
                 break;
b677e7
@@ -2349,26 +2434,42 @@ static int event_source_disable(sd_event_source *s) {
b677e7
                 assert_not_reached("Wut? I shouldn't exist.");
b677e7
         }
b677e7
 
b677e7
-        return 0;
b677e7
+        return 1;
b677e7
 }
b677e7
 
b677e7
-static int event_source_enable(sd_event_source *s, int enable) {
b677e7
+static int event_source_online(
b677e7
+                sd_event_source *s,
b677e7
+                int enabled,
b677e7
+                bool ratelimited) {
b677e7
+
b677e7
+        bool was_online;
b677e7
         int r;
b677e7
 
b677e7
         assert(s);
b677e7
-        assert(IN_SET(enable, SD_EVENT_ON, SD_EVENT_ONESHOT));
b677e7
-        assert(s->enabled == SD_EVENT_OFF);
b677e7
+        assert(enabled != SD_EVENT_OFF || !ratelimited);
b677e7
 
b677e7
         /* Unset the pending flag when this event source is enabled */
b677e7
-        if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
b677e7
+        if (s->enabled == SD_EVENT_OFF &&
b677e7
+            enabled != SD_EVENT_OFF &&
b677e7
+            !IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
b677e7
                 r = source_set_pending(s, false);
b677e7
                 if (r < 0)
b677e7
                         return r;
b677e7
         }
b677e7
 
b677e7
+        /* Are we really ready for onlining? */
b677e7
+        if (enabled == SD_EVENT_OFF || ratelimited) {
b677e7
+                /* Nope, we are not ready for onlining, then just update the precise state and exit */
b677e7
+                s->enabled = enabled;
b677e7
+                s->ratelimited = ratelimited;
b677e7
+                return 0;
b677e7
+        }
b677e7
+
b677e7
+        was_online = event_source_is_online(s);
b677e7
+
b677e7
         switch (s->type) {
b677e7
         case SOURCE_IO:
b677e7
-                r = source_io_register(s, enable, s->io.events);
b677e7
+                r = source_io_register(s, enabled, s->io.events);
b677e7
                 if (r < 0)
b677e7
                         return r;
b677e7
                 break;
b677e7
@@ -2386,13 +2487,13 @@ static int event_source_enable(sd_event_source *s, int enable) {
b677e7
                 r = event_make_signal_data(s->event, SIGCHLD, NULL);
b677e7
                 if (r < 0) {
b677e7
                         s->enabled = SD_EVENT_OFF;
b677e7
-                        s->event->n_enabled_child_sources--;
b677e7
+                        s->event->n_online_child_sources--;
b677e7
                         event_gc_signal_data(s->event, &s->priority, SIGCHLD);
b677e7
                         return r;
b677e7
                 }
b677e7
 
b677e7
-                s->event->n_enabled_child_sources++;
b677e7
-
b677e7
+                if (!was_online)
b677e7
+                        s->event->n_online_child_sources++;
b677e7
                 break;
b677e7
 
b677e7
         case SOURCE_TIME_REALTIME:
b677e7
@@ -2410,7 +2511,8 @@ static int event_source_enable(sd_event_source *s, int enable) {
b677e7
                 assert_not_reached("Wut? I shouldn't exist.");
b677e7
         }
b677e7
 
b677e7
-        s->enabled = enable;
b677e7
+        s->enabled = enabled;
b677e7
+        s->ratelimited = ratelimited;
b677e7
 
b677e7
         /* Non-failing operations below */
b677e7
         switch (s->type) {
b677e7
@@ -2430,7 +2532,7 @@ static int event_source_enable(sd_event_source *s, int enable) {
b677e7
                 break;
b677e7
         }
b677e7
 
b677e7
-        return 0;
b677e7
+        return 1;
b677e7
 }
b677e7
 
b677e7
 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
b677e7
@@ -2448,7 +2550,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
b677e7
                 return 0;
b677e7
 
b677e7
         if (m == SD_EVENT_OFF)
b677e7
-                r = event_source_disable(s);
b677e7
+                r = event_source_offline(s, m, s->ratelimited);
b677e7
         else {
b677e7
                 if (s->enabled != SD_EVENT_OFF) {
b677e7
                         /* Switching from "on" to "oneshot" or back? If that's the case, we can take a shortcut, the
b677e7
@@ -2457,7 +2559,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
b677e7
                         return 0;
b677e7
                 }
b677e7
 
b677e7
-                r = event_source_enable(s, m);
b677e7
+                r = event_source_online(s, m, s->ratelimited);
b677e7
         }
b677e7
         if (r < 0)
b677e7
                 return r;
b677e7
@@ -2605,6 +2707,96 @@ _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata)
b677e7
         return ret;
b677e7
 }
b677e7
 
b677e7
+static int event_source_enter_ratelimited(sd_event_source *s) {
b677e7
+        int r;
b677e7
+
b677e7
+        assert(s);
b677e7
+
b677e7
+        /* When an event source becomes ratelimited, we place it in the CLOCK_MONOTONIC priority queue, with
b677e7
+         * the end of the rate limit time window, much as if it was a timer event source. */
b677e7
+
b677e7
+        if (s->ratelimited)
b677e7
+                return 0; /* Already ratelimited, this is a NOP hence */
b677e7
+
b677e7
+        /* Make sure we can install a CLOCK_MONOTONIC event further down. */
b677e7
+        r = setup_clock_data(s->event, &s->event->monotonic, CLOCK_MONOTONIC);
b677e7
+        if (r < 0)
b677e7
+                return r;
b677e7
+
b677e7
+        /* Timer event sources are already using the earliest/latest queues for the timer scheduling. Let's
b677e7
+         * first remove them from the prioq appropriate for their own clock, so that we can use the prioq
b677e7
+         * fields of the event source then for adding it to the CLOCK_MONOTONIC prioq instead. */
b677e7
+        if (EVENT_SOURCE_IS_TIME(s->type))
b677e7
+                event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type));
b677e7
+
b677e7
+        /* Now, let's add the event source to the monotonic clock instead */
b677e7
+        r = event_source_time_prioq_put(s, &s->event->monotonic);
b677e7
+        if (r < 0)
b677e7
+                goto fail;
b677e7
+
b677e7
+        /* And let's take the event source officially offline */
b677e7
+        r = event_source_offline(s, s->enabled, /* ratelimited= */ true);
b677e7
+        if (r < 0) {
b677e7
+                event_source_time_prioq_remove(s, &s->event->monotonic);
b677e7
+                goto fail;
b677e7
+        }
b677e7
+
b677e7
+        event_source_pp_prioq_reshuffle(s);
b677e7
+
b677e7
+        log_debug("Event source %p (%s) entered rate limit state.", s, strna(s->description));
b677e7
+        return 0;
b677e7
+
b677e7
+fail:
b677e7
+        /* Reinstall time event sources in the priority queue as before. This shouldn't fail, since the queue
b677e7
+         * space for it should already be allocated. */
b677e7
+        if (EVENT_SOURCE_IS_TIME(s->type))
b677e7
+                assert_se(event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type)) >= 0);
b677e7
+
b677e7
+        return r;
b677e7
+}
b677e7
+
b677e7
+static int event_source_leave_ratelimit(sd_event_source *s) {
b677e7
+        int r;
b677e7
+
b677e7
+        assert(s);
b677e7
+
b677e7
+        if (!s->ratelimited)
b677e7
+                return 0;
b677e7
+
b677e7
+        /* Let's take the event source out of the monotonic prioq first. */
b677e7
+        event_source_time_prioq_remove(s, &s->event->monotonic);
b677e7
+
b677e7
+        /* Let's then add the event source to its native clock prioq again — if this is a timer event source */
b677e7
+        if (EVENT_SOURCE_IS_TIME(s->type)) {
b677e7
+                r = event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type));
b677e7
+                if (r < 0)
b677e7
+                        goto fail;
b677e7
+        }
b677e7
+
b677e7
+        /* Let's try to take it online again.  */
b677e7
+        r = event_source_online(s, s->enabled, /* ratelimited= */ false);
b677e7
+        if (r < 0) {
b677e7
+                /* Do something roughly sensible when this failed: undo the two prioq ops above */
b677e7
+                if (EVENT_SOURCE_IS_TIME(s->type))
b677e7
+                        event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type));
b677e7
+
b677e7
+                goto fail;
b677e7
+        }
b677e7
+
b677e7
+        event_source_pp_prioq_reshuffle(s);
b677e7
+        ratelimit_reset(&s->rate_limit);
b677e7
+
b677e7
+        log_debug("Event source %p (%s) left rate limit state.", s, strna(s->description));
b677e7
+        return 0;
b677e7
+
b677e7
+fail:
b677e7
+        /* Do something somewhat reasonable when we cannot move an event sources out of ratelimited mode:
b677e7
+         * simply put it back in it, maybe we can then process it more successfully next iteration. */
b677e7
+        assert_se(event_source_time_prioq_put(s, &s->event->monotonic) >= 0);
b677e7
+
b677e7
+        return r;
b677e7
+}
b677e7
+
b677e7
 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
b677e7
         usec_t c;
b677e7
         assert(e);
b677e7
@@ -2703,7 +2895,7 @@ static int event_arm_timer(
b677e7
                 d->needs_rearm = false;
b677e7
 
b677e7
         a = prioq_peek(d->earliest);
b677e7
-        if (!a || a->enabled == SD_EVENT_OFF || a->time.next == USEC_INFINITY) {
b677e7
+        if (!a || a->enabled == SD_EVENT_OFF || time_event_source_next(a) == USEC_INFINITY) {
b677e7
 
b677e7
                 if (d->fd < 0)
b677e7
                         return 0;
b677e7
@@ -2723,7 +2915,7 @@ static int event_arm_timer(
b677e7
         b = prioq_peek(d->latest);
b677e7
         assert_se(b && b->enabled != SD_EVENT_OFF);
b677e7
 
b677e7
-        t = sleep_between(e, a->time.next, time_event_source_latest(b));
b677e7
+        t = sleep_between(e, time_event_source_next(a), time_event_source_latest(b));
b677e7
         if (d->next == t)
b677e7
                 return 0;
b677e7
 
b677e7
@@ -2802,10 +2994,22 @@ static int process_timer(
b677e7
 
b677e7
         for (;;) {
b677e7
                 s = prioq_peek(d->earliest);
b677e7
-                if (!s ||
b677e7
-                    s->time.next > n ||
b677e7
-                    s->enabled == SD_EVENT_OFF ||
b677e7
-                    s->pending)
b677e7
+                if (!s || time_event_source_next(s) > n)
b677e7
+                        break;
b677e7
+
b677e7
+                if (s->ratelimited) {
b677e7
+                        /* This is an event sources whose ratelimit window has ended. Let's turn it on
b677e7
+                         * again. */
b677e7
+                        assert(s->ratelimited);
b677e7
+
b677e7
+                        r = event_source_leave_ratelimit(s);
b677e7
+                        if (r < 0)
b677e7
+                                return r;
b677e7
+
b677e7
+                        continue;
b677e7
+                }
b677e7
+
b677e7
+                if (s->enabled == SD_EVENT_OFF || s->pending)
b677e7
                         break;
b677e7
 
b677e7
                 r = source_set_pending(s, true);
b677e7
@@ -2851,7 +3055,7 @@ static int process_child(sd_event *e) {
b677e7
                 if (s->pending)
b677e7
                         continue;
b677e7
 
b677e7
-                if (s->enabled == SD_EVENT_OFF)
b677e7
+                if (event_source_is_offline(s))
b677e7
                         continue;
b677e7
 
b677e7
                 zero(s->child.siginfo);
b677e7
@@ -3024,7 +3228,7 @@ static int event_inotify_data_process(sd_event *e, struct inotify_data *d) {
b677e7
 
b677e7
                                 LIST_FOREACH(inotify.by_inode_data, s, inode_data->event_sources) {
b677e7
 
b677e7
-                                        if (s->enabled == SD_EVENT_OFF)
b677e7
+                                        if (event_source_is_offline(s))
b677e7
                                                 continue;
b677e7
 
b677e7
                                         r = source_set_pending(s, true);
b677e7
@@ -3060,7 +3264,7 @@ static int event_inotify_data_process(sd_event *e, struct inotify_data *d) {
b677e7
                          * sources if IN_IGNORED or IN_UNMOUNT is set. */
b677e7
                         LIST_FOREACH(inotify.by_inode_data, s, inode_data->event_sources) {
b677e7
 
b677e7
-                                if (s->enabled == SD_EVENT_OFF)
b677e7
+                                if (event_source_is_offline(s))
b677e7
                                         continue;
b677e7
 
b677e7
                                 if ((d->buffer.ev.mask & (IN_IGNORED|IN_UNMOUNT)) == 0 &&
b677e7
@@ -3099,6 +3303,7 @@ static int process_inotify(sd_event *e) {
b677e7
 }
b677e7
 
b677e7
 static int source_dispatch(sd_event_source *s) {
b677e7
+        _cleanup_(sd_event_unrefp) sd_event *saved_event = NULL;
b677e7
         EventSourceType saved_type;
b677e7
         int r = 0;
b677e7
 
b677e7
@@ -3109,6 +3314,20 @@ static int source_dispatch(sd_event_source *s) {
b677e7
          * the event. */
b677e7
         saved_type = s->type;
b677e7
 
b677e7
+        /* Similar, store a reference to the event loop object, so that we can still access it after the
b677e7
+         * callback might have invalidated/disconnected the event source. */
b677e7
+        saved_event = sd_event_ref(s->event);
b677e7
+
b677e7
+        /* Check if we hit the ratelimit for this event source, if so, let's disable it. */
b677e7
+        assert(!s->ratelimited);
b677e7
+        if (!ratelimit_below(&s->rate_limit)) {
b677e7
+                r = event_source_enter_ratelimited(s);
b677e7
+                if (r < 0)
b677e7
+                        return r;
b677e7
+
b677e7
+                return 1;
b677e7
+        }
b677e7
+
b677e7
         if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
b677e7
                 r = source_set_pending(s, false);
b677e7
                 if (r < 0)
b677e7
@@ -3235,7 +3454,7 @@ static int event_prepare(sd_event *e) {
b677e7
                 sd_event_source *s;
b677e7
 
b677e7
                 s = prioq_peek(e->prepare);
b677e7
-                if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
b677e7
+                if (!s || s->prepare_iteration == e->iteration || event_source_is_offline(s))
b677e7
                         break;
b677e7
 
b677e7
                 s->prepare_iteration = e->iteration;
b677e7
@@ -3269,7 +3488,7 @@ static int dispatch_exit(sd_event *e) {
b677e7
         assert(e);
b677e7
 
b677e7
         p = prioq_peek(e->exit);
b677e7
-        if (!p || p->enabled == SD_EVENT_OFF) {
b677e7
+        if (!p || event_source_is_offline(p)) {
b677e7
                 e->state = SD_EVENT_FINISHED;
b677e7
                 return 0;
b677e7
         }
b677e7
@@ -3291,7 +3510,7 @@ static sd_event_source* event_next_pending(sd_event *e) {
b677e7
         if (!p)
b677e7
                 return NULL;
b677e7
 
b677e7
-        if (p->enabled == SD_EVENT_OFF)
b677e7
+        if (event_source_is_offline(p))
b677e7
                 return NULL;
b677e7
 
b677e7
         return p;
b677e7
@@ -3844,3 +4063,53 @@ _public_ int sd_event_source_get_destroy_callback(sd_event_source *s, sd_event_d
b677e7
 
b677e7
         return !!s->destroy_callback;
b677e7
 }
b677e7
+
b677e7
+_public_ int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval, unsigned burst) {
b677e7
+        int r;
b677e7
+
b677e7
+        assert_return(s, -EINVAL);
b677e7
+
b677e7
+        /* Turning on ratelimiting on event source types that don't support it, is a loggable offense. Doing
b677e7
+         * so is a programming error. */
b677e7
+        assert_return(EVENT_SOURCE_CAN_RATE_LIMIT(s->type), -EDOM);
b677e7
+
b677e7
+        /* When ratelimiting is configured we'll always reset the rate limit state first and start fresh,
b677e7
+         * non-ratelimited. */
b677e7
+        r = event_source_leave_ratelimit(s);
b677e7
+        if (r < 0)
b677e7
+                return r;
b677e7
+
b677e7
+        RATELIMIT_INIT(s->rate_limit, interval, burst);
b677e7
+        return 0;
b677e7
+}
b677e7
+
b677e7
+_public_ int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval, unsigned *ret_burst) {
b677e7
+        assert_return(s, -EINVAL);
b677e7
+
b677e7
+        /* Querying whether an event source has ratelimiting configured is not a loggable offsense, hence
b677e7
+         * don't use assert_return(). Unlike turning on ratelimiting it's not really a programming error */
b677e7
+        if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type))
b677e7
+                return -EDOM;
b677e7
+
b677e7
+        if (!ratelimit_configured(&s->rate_limit))
b677e7
+                return -ENOEXEC;
b677e7
+
b677e7
+        if (ret_interval)
b677e7
+                *ret_interval = s->rate_limit.interval;
b677e7
+        if (ret_burst)
b677e7
+                *ret_burst = s->rate_limit.burst;
b677e7
+
b677e7
+        return 0;
b677e7
+}
b677e7
+
b677e7
+_public_ int sd_event_source_is_ratelimited(sd_event_source *s) {
b677e7
+        assert_return(s, -EINVAL);
b677e7
+
b677e7
+        if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type))
b677e7
+                return false;
b677e7
+
b677e7
+        if (!ratelimit_configured(&s->rate_limit))
b677e7
+                return false;
b677e7
+
b677e7
+        return s->ratelimited;
b677e7
+}
b677e7
diff --git a/src/systemd/sd-event.h b/src/systemd/sd-event.h
b677e7
index 9876be01c6..a17a9b3488 100644
b677e7
--- a/src/systemd/sd-event.h
b677e7
+++ b/src/systemd/sd-event.h
b677e7
@@ -144,6 +144,9 @@ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid);
b677e7
 int sd_event_source_get_inotify_mask(sd_event_source *s, uint32_t *ret);
b677e7
 int sd_event_source_set_destroy_callback(sd_event_source *s, sd_event_destroy_t callback);
b677e7
 int sd_event_source_get_destroy_callback(sd_event_source *s, sd_event_destroy_t *ret);
b677e7
+int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval_usec, unsigned burst);
b677e7
+int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval_usec, unsigned *ret_burst);
b677e7
+int sd_event_source_is_ratelimited(sd_event_source *s);
b677e7
 
b677e7
 /* Define helpers so that __attribute__((cleanup(sd_event_unrefp))) and similar may be used. */
b677e7
 _SD_DEFINE_POINTER_CLEANUP_FUNC(sd_event, sd_event_unref);