c1c26e
From f919e65e4a462b385a6daa6b7cccc6af1358cbcf Mon Sep 17 00:00:00 2001
c1c26e
From: Eduardo Otubo <otubo@redhat.com>
c1c26e
Date: Wed, 29 May 2019 13:41:47 +0200
c1c26e
Subject: [PATCH 3/5] Azure: Changes to the Hyper-V KVP Reporter
c1c26e
MIME-Version: 1.0
c1c26e
Content-Type: text/plain; charset=UTF-8
c1c26e
Content-Transfer-Encoding: 8bit
c1c26e
c1c26e
RH-Author: Eduardo Otubo <otubo@redhat.com>
c1c26e
Message-id: <20190529134149.842-4-otubo@redhat.com>
c1c26e
Patchwork-id: 88266
c1c26e
O-Subject: [RHEL-8.0.1/RHEL-8.1.0 cloud-init PATCHv2 3/5] Azure: Changes to the Hyper-V KVP Reporter
c1c26e
Bugzilla: 1691986
c1c26e
RH-Acked-by: Vitaly Kuznetsov <vkuznets@redhat.com>
c1c26e
RH-Acked-by: Cathy Avery <cavery@redhat.com>
c1c26e
c1c26e
From: Anh Vo <anhvo@microsoft.com>
c1c26e
commit 86674f013dfcea3c075ab41373ffb475881066f6
c1c26e
Author: Anh Vo <anhvo@microsoft.com>
c1c26e
Date:   Mon Apr 29 20:22:16 2019 +0000
c1c26e
c1c26e
    Azure: Changes to the Hyper-V KVP Reporter
c1c26e
c1c26e
     + Truncate KVP Pool file to prevent stale entries from
c1c26e
       being processed by the Hyper-V KVP reporter.
c1c26e
     + Drop filtering of KVPs as it is no longer needed.
c1c26e
     + Batch appending of existing KVP entries.
c1c26e
c1c26e
Signed-off-by: Eduardo Otubo <otubo@redhat.com>
c1c26e
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
c1c26e
---
c1c26e
 cloudinit/reporting/handlers.py          | 117 +++++++++++++++----------------
c1c26e
 tests/unittests/test_reporting_hyperv.py | 104 +++++++++++++--------------
c1c26e
 2 files changed, 106 insertions(+), 115 deletions(-)
c1c26e
 mode change 100644 => 100755 cloudinit/reporting/handlers.py
c1c26e
 mode change 100644 => 100755 tests/unittests/test_reporting_hyperv.py
c1c26e
c1c26e
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
c1c26e
old mode 100644
c1c26e
new mode 100755
c1c26e
index 6d23558..10165ae
c1c26e
--- a/cloudinit/reporting/handlers.py
c1c26e
+++ b/cloudinit/reporting/handlers.py
c1c26e
@@ -5,7 +5,6 @@ import fcntl
c1c26e
 import json
c1c26e
 import six
c1c26e
 import os
c1c26e
-import re
c1c26e
 import struct
c1c26e
 import threading
c1c26e
 import time
c1c26e
@@ -14,6 +13,7 @@ from cloudinit import log as logging
c1c26e
 from cloudinit.registry import DictRegistry
c1c26e
 from cloudinit import (url_helper, util)
c1c26e
 from datetime import datetime
c1c26e
+from six.moves.queue import Empty as QueueEmptyError
c1c26e
 
c1c26e
 if six.PY2:
c1c26e
     from multiprocessing.queues import JoinableQueue as JQueue
c1c26e
@@ -129,24 +129,50 @@ class HyperVKvpReportingHandler(ReportingHandler):
c1c26e
     DESC_IDX_KEY = 'msg_i'
c1c26e
     JSON_SEPARATORS = (',', ':')
c1c26e
     KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
c1c26e
+    _already_truncated_pool_file = False
c1c26e
 
c1c26e
     def __init__(self,
c1c26e
                  kvp_file_path=KVP_POOL_FILE_GUEST,
c1c26e
                  event_types=None):
c1c26e
         super(HyperVKvpReportingHandler, self).__init__()
c1c26e
         self._kvp_file_path = kvp_file_path
c1c26e
+        HyperVKvpReportingHandler._truncate_guest_pool_file(
c1c26e
+            self._kvp_file_path)
c1c26e
+
c1c26e
         self._event_types = event_types
c1c26e
         self.q = JQueue()
c1c26e
-        self.kvp_file = None
c1c26e
         self.incarnation_no = self._get_incarnation_no()
c1c26e
         self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
c1c26e
                                                   self.incarnation_no)
c1c26e
-        self._current_offset = 0
c1c26e
         self.publish_thread = threading.Thread(
c1c26e
                 target=self._publish_event_routine)
c1c26e
         self.publish_thread.daemon = True
c1c26e
         self.publish_thread.start()
c1c26e
 
c1c26e
+    @classmethod
c1c26e
+    def _truncate_guest_pool_file(cls, kvp_file):
c1c26e
+        """
c1c26e
+        Truncate the pool file if it has not been truncated since boot.
c1c26e
+        This should be done exactly once for the file indicated by
c1c26e
+        KVP_POOL_FILE_GUEST constant above. This method takes a filename
c1c26e
+        so that we can use an arbitrary file during unit testing.
c1c26e
+        Since KVP is a best-effort telemetry channel we only attempt to
c1c26e
+        truncate the file once and only if the file has not been modified
c1c26e
+        since boot. Additional truncation can lead to loss of existing
c1c26e
+        KVPs.
c1c26e
+        """
c1c26e
+        if cls._already_truncated_pool_file:
c1c26e
+            return
c1c26e
+        boot_time = time.time() - float(util.uptime())
c1c26e
+        try:
c1c26e
+            if os.path.getmtime(kvp_file) < boot_time:
c1c26e
+                with open(kvp_file, "w"):
c1c26e
+                    pass
c1c26e
+        except (OSError, IOError) as e:
c1c26e
+            LOG.warning("failed to truncate kvp pool file, %s", e)
c1c26e
+        finally:
c1c26e
+            cls._already_truncated_pool_file = True
c1c26e
+
c1c26e
     def _get_incarnation_no(self):
c1c26e
         """
c1c26e
         use the time passed as the incarnation number.
c1c26e
@@ -162,20 +188,15 @@ class HyperVKvpReportingHandler(ReportingHandler):
c1c26e
 
c1c26e
     def _iterate_kvps(self, offset):
c1c26e
         """iterate the kvp file from the current offset."""
c1c26e
-        try:
c1c26e
-            with open(self._kvp_file_path, 'rb+') as f:
c1c26e
-                self.kvp_file = f
c1c26e
-                fcntl.flock(f, fcntl.LOCK_EX)
c1c26e
-                f.seek(offset)
c1c26e
+        with open(self._kvp_file_path, 'rb') as f:
c1c26e
+            fcntl.flock(f, fcntl.LOCK_EX)
c1c26e
+            f.seek(offset)
c1c26e
+            record_data = f.read(self.HV_KVP_RECORD_SIZE)
c1c26e
+            while len(record_data) == self.HV_KVP_RECORD_SIZE:
c1c26e
+                kvp_item = self._decode_kvp_item(record_data)
c1c26e
+                yield kvp_item
c1c26e
                 record_data = f.read(self.HV_KVP_RECORD_SIZE)
c1c26e
-                while len(record_data) == self.HV_KVP_RECORD_SIZE:
c1c26e
-                    self._current_offset += self.HV_KVP_RECORD_SIZE
c1c26e
-                    kvp_item = self._decode_kvp_item(record_data)
c1c26e
-                    yield kvp_item
c1c26e
-                    record_data = f.read(self.HV_KVP_RECORD_SIZE)
c1c26e
-                fcntl.flock(f, fcntl.LOCK_UN)
c1c26e
-        finally:
c1c26e
-            self.kvp_file = None
c1c26e
+            fcntl.flock(f, fcntl.LOCK_UN)
c1c26e
 
c1c26e
     def _event_key(self, event):
c1c26e
         """
c1c26e
@@ -207,23 +228,13 @@ class HyperVKvpReportingHandler(ReportingHandler):
c1c26e
 
c1c26e
         return {'key': k, 'value': v}
c1c26e
 
c1c26e
-    def _update_kvp_item(self, record_data):
c1c26e
-        if self.kvp_file is None:
c1c26e
-            raise ReportException(
c1c26e
-                "kvp file '{0}' not opened."
c1c26e
-                .format(self._kvp_file_path))
c1c26e
-        self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
c1c26e
-        self.kvp_file.write(record_data)
c1c26e
-
c1c26e
     def _append_kvp_item(self, record_data):
c1c26e
-        with open(self._kvp_file_path, 'rb+') as f:
c1c26e
+        with open(self._kvp_file_path, 'ab') as f:
c1c26e
             fcntl.flock(f, fcntl.LOCK_EX)
c1c26e
-            # seek to end of the file
c1c26e
-            f.seek(0, 2)
c1c26e
-            f.write(record_data)
c1c26e
+            for data in record_data:
c1c26e
+                f.write(data)
c1c26e
             f.flush()
c1c26e
             fcntl.flock(f, fcntl.LOCK_UN)
c1c26e
-            self._current_offset = f.tell()
c1c26e
 
c1c26e
     def _break_down(self, key, meta_data, description):
c1c26e
         del meta_data[self.MSG_KEY]
c1c26e
@@ -279,40 +290,26 @@ class HyperVKvpReportingHandler(ReportingHandler):
c1c26e
 
c1c26e
     def _publish_event_routine(self):
c1c26e
         while True:
c1c26e
+            items_from_queue = 0
c1c26e
             try:
c1c26e
                 event = self.q.get(block=True)
c1c26e
-                need_append = True
c1c26e
+                items_from_queue += 1
c1c26e
+                encoded_data = []
c1c26e
+                while event is not None:
c1c26e
+                    encoded_data += self._encode_event(event)
c1c26e
+                    try:
c1c26e
+                        # get all the rest of the events in the queue
c1c26e
+                        event = self.q.get(block=False)
c1c26e
+                        items_from_queue += 1
c1c26e
+                    except QueueEmptyError:
c1c26e
+                        event = None
c1c26e
                 try:
c1c26e
-                    if not os.path.exists(self._kvp_file_path):
c1c26e
-                        LOG.warning(
c1c26e
-                            "skip writing events %s to %s. file not present.",
c1c26e
-                            event.as_string(),
c1c26e
-                            self._kvp_file_path)
c1c26e
-                    encoded_event = self._encode_event(event)
c1c26e
-                    # for each encoded_event
c1c26e
-                    for encoded_data in (encoded_event):
c1c26e
-                        for kvp in self._iterate_kvps(self._current_offset):
c1c26e
-                            match = (
c1c26e
-                                re.match(
c1c26e
-                                    r"^{0}\|(\d+)\|.+"
c1c26e
-                                    .format(self.EVENT_PREFIX),
c1c26e
-                                    kvp['key']
c1c26e
-                                ))
c1c26e
-                            if match:
c1c26e
-                                match_groups = match.groups(0)
c1c26e
-                                if int(match_groups[0]) < self.incarnation_no:
c1c26e
-                                    need_append = False
c1c26e
-                                    self._update_kvp_item(encoded_data)
c1c26e
-                                    continue
c1c26e
-                        if need_append:
c1c26e
-                            self._append_kvp_item(encoded_data)
c1c26e
-                except IOError as e:
c1c26e
-                    LOG.warning(
c1c26e
-                        "failed posting event to kvp: %s e:%s",
c1c26e
-                        event.as_string(), e)
c1c26e
+                    self._append_kvp_item(encoded_data)
c1c26e
+                except (OSError, IOError) as e:
c1c26e
+                    LOG.warning("failed posting events to kvp, %s", e)
c1c26e
                 finally:
c1c26e
-                    self.q.task_done()
c1c26e
-
c1c26e
+                    for _ in range(items_from_queue):
c1c26e
+                        self.q.task_done()
c1c26e
             # when main process exits, q.get() will through EOFError
c1c26e
             # indicating we should exit this thread.
c1c26e
             except EOFError:
c1c26e
@@ -322,7 +319,7 @@ class HyperVKvpReportingHandler(ReportingHandler):
c1c26e
     # if the kvp pool already contains a chunk of data,
c1c26e
     # so defer it to another thread.
c1c26e
     def publish_event(self, event):
c1c26e
-        if (not self._event_types or event.event_type in self._event_types):
c1c26e
+        if not self._event_types or event.event_type in self._event_types:
c1c26e
             self.q.put(event)
c1c26e
 
c1c26e
     def flush(self):
c1c26e
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
c1c26e
old mode 100644
c1c26e
new mode 100755
c1c26e
index 2e64c6c..d01ed5b
c1c26e
--- a/tests/unittests/test_reporting_hyperv.py
c1c26e
+++ b/tests/unittests/test_reporting_hyperv.py
c1c26e
@@ -1,10 +1,12 @@
c1c26e
 # This file is part of cloud-init. See LICENSE file for license information.
c1c26e
 
c1c26e
 from cloudinit.reporting import events
c1c26e
-from cloudinit.reporting import handlers
c1c26e
+from cloudinit.reporting.handlers import HyperVKvpReportingHandler
c1c26e
 
c1c26e
 import json
c1c26e
 import os
c1c26e
+import struct
c1c26e
+import time
c1c26e
 
c1c26e
 from cloudinit import util
c1c26e
 from cloudinit.tests.helpers import CiTestCase
c1c26e
@@ -13,7 +15,7 @@ from cloudinit.tests.helpers import CiTestCase
c1c26e
 class TestKvpEncoding(CiTestCase):
c1c26e
     def test_encode_decode(self):
c1c26e
         kvp = {'key': 'key1', 'value': 'value1'}
c1c26e
-        kvp_reporting = handlers.HyperVKvpReportingHandler()
c1c26e
+        kvp_reporting = HyperVKvpReportingHandler()
c1c26e
         data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
c1c26e
         self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
c1c26e
         decoded_kvp = kvp_reporting._decode_kvp_item(data)
c1c26e
@@ -26,57 +28,9 @@ class TextKvpReporter(CiTestCase):
c1c26e
         self.tmp_file_path = self.tmp_path('kvp_pool_file')
c1c26e
         util.ensure_file(self.tmp_file_path)
c1c26e
 
c1c26e
-    def test_event_type_can_be_filtered(self):
c1c26e
-        reporter = handlers.HyperVKvpReportingHandler(
c1c26e
-            kvp_file_path=self.tmp_file_path,
c1c26e
-            event_types=['foo', 'bar'])
c1c26e
-
c1c26e
-        reporter.publish_event(
c1c26e
-            events.ReportingEvent('foo', 'name', 'description'))
c1c26e
-        reporter.publish_event(
c1c26e
-            events.ReportingEvent('some_other', 'name', 'description3'))
c1c26e
-        reporter.q.join()
c1c26e
-
c1c26e
-        kvps = list(reporter._iterate_kvps(0))
c1c26e
-        self.assertEqual(1, len(kvps))
c1c26e
-
c1c26e
-        reporter.publish_event(
c1c26e
-            events.ReportingEvent('bar', 'name', 'description2'))
c1c26e
-        reporter.q.join()
c1c26e
-        kvps = list(reporter._iterate_kvps(0))
c1c26e
-        self.assertEqual(2, len(kvps))
c1c26e
-
c1c26e
-        self.assertIn('foo', kvps[0]['key'])
c1c26e
-        self.assertIn('bar', kvps[1]['key'])
c1c26e
-        self.assertNotIn('some_other', kvps[0]['key'])
c1c26e
-        self.assertNotIn('some_other', kvps[1]['key'])
c1c26e
-
c1c26e
-    def test_events_are_over_written(self):
c1c26e
-        reporter = handlers.HyperVKvpReportingHandler(
c1c26e
-            kvp_file_path=self.tmp_file_path)
c1c26e
-
c1c26e
-        self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
c1c26e
-
c1c26e
-        reporter.publish_event(
c1c26e
-            events.ReportingEvent('foo', 'name1', 'description'))
c1c26e
-        reporter.publish_event(
c1c26e
-            events.ReportingEvent('foo', 'name2', 'description'))
c1c26e
-        reporter.q.join()
c1c26e
-        self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
c1c26e
-
c1c26e
-        reporter2 = handlers.HyperVKvpReportingHandler(
c1c26e
-            kvp_file_path=self.tmp_file_path)
c1c26e
-        reporter2.incarnation_no = reporter.incarnation_no + 1
c1c26e
-        reporter2.publish_event(
c1c26e
-            events.ReportingEvent('foo', 'name3', 'description'))
c1c26e
-        reporter2.q.join()
c1c26e
-
c1c26e
-        self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
c1c26e
-
c1c26e
     def test_events_with_higher_incarnation_not_over_written(self):
c1c26e
-        reporter = handlers.HyperVKvpReportingHandler(
c1c26e
+        reporter = HyperVKvpReportingHandler(
c1c26e
             kvp_file_path=self.tmp_file_path)
c1c26e
-
c1c26e
         self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
c1c26e
 
c1c26e
         reporter.publish_event(
c1c26e
@@ -86,7 +40,7 @@ class TextKvpReporter(CiTestCase):
c1c26e
         reporter.q.join()
c1c26e
         self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
c1c26e
 
c1c26e
-        reporter3 = handlers.HyperVKvpReportingHandler(
c1c26e
+        reporter3 = HyperVKvpReportingHandler(
c1c26e
             kvp_file_path=self.tmp_file_path)
c1c26e
         reporter3.incarnation_no = reporter.incarnation_no - 1
c1c26e
         reporter3.publish_event(
c1c26e
@@ -95,7 +49,7 @@ class TextKvpReporter(CiTestCase):
c1c26e
         self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
c1c26e
 
c1c26e
     def test_finish_event_result_is_logged(self):
c1c26e
-        reporter = handlers.HyperVKvpReportingHandler(
c1c26e
+        reporter = HyperVKvpReportingHandler(
c1c26e
             kvp_file_path=self.tmp_file_path)
c1c26e
         reporter.publish_event(
c1c26e
             events.FinishReportingEvent('name2', 'description1',
c1c26e
@@ -105,7 +59,7 @@ class TextKvpReporter(CiTestCase):
c1c26e
 
c1c26e
     def test_file_operation_issue(self):
c1c26e
         os.remove(self.tmp_file_path)
c1c26e
-        reporter = handlers.HyperVKvpReportingHandler(
c1c26e
+        reporter = HyperVKvpReportingHandler(
c1c26e
             kvp_file_path=self.tmp_file_path)
c1c26e
         reporter.publish_event(
c1c26e
             events.FinishReportingEvent('name2', 'description1',
c1c26e
@@ -113,7 +67,7 @@ class TextKvpReporter(CiTestCase):
c1c26e
         reporter.q.join()
c1c26e
 
c1c26e
     def test_event_very_long(self):
c1c26e
-        reporter = handlers.HyperVKvpReportingHandler(
c1c26e
+        reporter = HyperVKvpReportingHandler(
c1c26e
             kvp_file_path=self.tmp_file_path)
c1c26e
         description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
c1c26e
         long_event = events.FinishReportingEvent(
c1c26e
@@ -132,3 +86,43 @@ class TextKvpReporter(CiTestCase):
c1c26e
             self.assertEqual(msg_slice['msg_i'], i)
c1c26e
             full_description += msg_slice['msg']
c1c26e
         self.assertEqual(description, full_description)
c1c26e
+
c1c26e
+    def test_not_truncate_kvp_file_modified_after_boot(self):
c1c26e
+        with open(self.tmp_file_path, "wb+") as f:
c1c26e
+            kvp = {'key': 'key1', 'value': 'value1'}
c1c26e
+            data = (struct.pack("%ds%ds" % (
c1c26e
+                    HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
c1c26e
+                    HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
c1c26e
+                    kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8')))
c1c26e
+            f.write(data)
c1c26e
+        cur_time = time.time()
c1c26e
+        os.utime(self.tmp_file_path, (cur_time, cur_time))
c1c26e
+
c1c26e
+        # reset this because the unit test framework
c1c26e
+        # has already polluted the class variable
c1c26e
+        HyperVKvpReportingHandler._already_truncated_pool_file = False
c1c26e
+
c1c26e
+        reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
c1c26e
+        kvps = list(reporter._iterate_kvps(0))
c1c26e
+        self.assertEqual(1, len(kvps))
c1c26e
+
c1c26e
+    def test_truncate_stale_kvp_file(self):
c1c26e
+        with open(self.tmp_file_path, "wb+") as f:
c1c26e
+            kvp = {'key': 'key1', 'value': 'value1'}
c1c26e
+            data = (struct.pack("%ds%ds" % (
c1c26e
+                HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
c1c26e
+                HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
c1c26e
+                kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8')))
c1c26e
+            f.write(data)
c1c26e
+
c1c26e
+        # set the time ways back to make it look like
c1c26e
+        # we had an old kvp file
c1c26e
+        os.utime(self.tmp_file_path, (1000000, 1000000))
c1c26e
+
c1c26e
+        # reset this because the unit test framework
c1c26e
+        # has already polluted the class variable
c1c26e
+        HyperVKvpReportingHandler._already_truncated_pool_file = False
c1c26e
+
c1c26e
+        reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
c1c26e
+        kvps = list(reporter._iterate_kvps(0))
c1c26e
+        self.assertEqual(0, len(kvps))
c1c26e
-- 
c1c26e
1.8.3.1
c1c26e