Blame SOURCES/97eff6cf6c9b58f8239b28be2f080e23c9da62c0.patch

a7f575
From 97eff6cf6c9b58f8239b28be2f080e23c9da62c0 Mon Sep 17 00:00:00 2001
a7f575
From: Ulrich Obergfell <uobergfe@redhat.com>
a7f575
Date: Thu, 11 Jul 2019 14:33:55 +0200
a7f575
Subject: [PATCH] fix deadlock in restore() function
a7f575
a7f575
When containerd starts up, the restore() function walks through the directory
a7f575
hierarchy under /run/docker/libcontainerd/containerd and imports the state of
a7f575
processes from files in /run/docker/libcontainerd/containerd/CONTAINERID and
a7f575
in /run/docker/libcontainerd/containerd/CONTAINERID/PROCESSID. The restore()
a7f575
function adds an ExitTask entry to the s.tasks queue for each process that is
a7f575
no longer in state 'running'. The size of the s.tasks queue is hard-coded and
a7f575
limited to 2048 (defaultBufferSize). If more than 2048 ExitTask entries need
a7f575
to be added to the queue, the restore() function gets blocked (queue is full).
a7f575
If this happens, containerd is in a kind of deadlock situation because the
a7f575
handleTask() function (which would drain the ExitTask entries from the queue)
a7f575
has not been started in a separate goroutine yet, and the main goroutine is
a7f575
blocked in the restore() function (unable to start the handleTask() function).
a7f575
a7f575
This patch introduces the dynamically allocated restoreExitTasks slice which
a7f575
the restore() function uses to store the ExitTask entries separately instead
a7f575
of adding them to the s.tasks queue. The task handler goroutine subsequently
a7f575
drains all entries from restoreExitTasks frist before it enters the loop that
a7f575
handles entries from the s.tasks queue.
a7f575
a7f575
Signed-off-by: Ulrich Obergfell <uobergfe@redhat.com>
a7f575
---
a7f575
 supervisor/supervisor.go | 35 +++++++++++++++++++++++++++++++++--
a7f575
 1 file changed, 33 insertions(+), 2 deletions(-)
a7f575
a7f575
diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go
a7f575
index 8a26af0..d92de8a 100644
a7f575
--- a/supervisor/supervisor.go
a7f575
+++ b/supervisor/supervisor.go
a7f575
@@ -18,6 +18,16 @@ const (
a7f575
 	defaultBufferSize = 2048 // size of queue in eventloop
a7f575
 )
a7f575
 
a7f575
+// Pointers to all ExitTask that are created by the restore() function are stored in this slice.
a7f575
+var restoreExitTasks []*ExitTask
a7f575
+
a7f575
+func max(x, y int) int {
a7f575
+	if x < y {
a7f575
+		return y
a7f575
+	}
a7f575
+	return x
a7f575
+}
a7f575
+
a7f575
 // New returns an initialized Process supervisor.
a7f575
 func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
a7f575
 	startTasks := make(chan *startTask, 10)
a7f575
@@ -207,7 +217,9 @@ type eventV1 struct {
a7f575
 // Events returns an event channel that external consumers can use to receive updates
a7f575
 // on container events
a7f575
 func (s *Supervisor) Events(from time.Time, storedOnly bool, id string) chan Event {
a7f575
-	c := make(chan Event, defaultBufferSize)
a7f575
+	var c chan Event
a7f575
+
a7f575
+	c = make(chan Event, defaultBufferSize)
a7f575
 	if storedOnly {
a7f575
 		defer s.Unsubscribe(c)
a7f575
 	}
a7f575
@@ -216,6 +228,9 @@ func (s *Supervisor) Events(from time.Time, storedOnly bool, id string) chan Eve
a7f575
 	if !from.IsZero() {
a7f575
 		// replay old event
a7f575
 		s.eventLock.Lock()
a7f575
+		close(c)
a7f575
+		// Allocate a channel that has enough space for the entire event log.
a7f575
+		c = make(chan Event, max(defaultBufferSize, len(s.eventLog)))
a7f575
 		past := s.eventLog[:]
a7f575
 		s.eventLock.Unlock()
a7f575
 		for _, e := range past {
a7f575
@@ -276,6 +291,21 @@ func (s *Supervisor) Start() error {
a7f575
 		"cpus":        s.machine.Cpus,
a7f575
 	}).Debug("containerd: supervisor running")
a7f575
 	go func() {
a7f575
+		if (len(restoreExitTasks) > 0) {
a7f575
+			logrus.Infof("containerd: found %d exited processes after restart", len(restoreExitTasks))
a7f575
+			//
a7f575
+			// If the restore() function stored any ExitTask in the dedicated slice,
a7f575
+			// then handle those tasks first. The purpose of the one second delay is
a7f575
+			// to give dockerd a chance to establish its event stream connection to
a7f575
+			// containerd. If the connection is established before the ExitTask are
a7f575
+			// being handled, then dockerd can receive exit notifications directly
a7f575
+			// (rather than having to replay the notifications from the event log).
a7f575
+			//
a7f575
+			time.Sleep(time.Second)
a7f575
+			for _, e := range restoreExitTasks {
a7f575
+				s.handleTask(e)
a7f575
+			}
a7f575
+		}
a7f575
 		for i := range s.tasks {
a7f575
 			s.handleTask(i)
a7f575
 		}
a7f575
@@ -385,7 +415,8 @@ func (s *Supervisor) restore() error {
a7f575
 					Process: p,
a7f575
 				}
a7f575
 				e.WithContext(context.Background())
a7f575
-				s.SendTask(e)
a7f575
+				// Store pointer to ExitTask in dedicated slice.
a7f575
+				restoreExitTasks = append(restoreExitTasks, e)
a7f575
 			}
a7f575
 		}
a7f575
 	}