From 3e924bdfa5b094942b450a908196a73a9fa3edfe Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Mon, 7 Oct 2024 11:01:57 +0200 Subject: [PATCH 01/11] Declarative duts_config On editing duts_config replace configured duts instead of adding all duts from modified config. This allows for both adding and removing duts. Current behavior led to duplicates if config contained old duts in addition to new ones. Signed-off-by: Daniel Madej --- runnerd | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/runnerd b/runnerd index a0bb92b..5fd7b65 100755 --- a/runnerd +++ b/runnerd @@ -121,38 +121,51 @@ class DutsManager: base_dut_config_path = "configs/base_dut_config.yml" def __init__(self, duts_config_file): + self.duts_config = None self.duts_config_file = duts_config_file self.base_dut_config = ConfigFile(self.base_dut_config_path).load() self.duts_queue = Queue() - self.duts = [] + self.duts = {} def collect_duts(self): if not self.duts_config_file.need_reload(): return self.duts_config = self.duts_config_file.load() - self.duts = [] + ip_list = [dut["ip"] for dut in self.duts_config['duts']] + for ip in self.duts.copy(): + # remove duts deleted from duts_config_file + if ip not in ip_list: + del self.duts[ip] for config in self.duts_config['duts']: - dut = Dut({**self.base_dut_config, **config}) - self.duts.append(dut) - self.duts_queue.put(dut) + ip = config["ip"] + if ip in self.duts: + # update config for existing dut + self.duts[ip].config = {**self.base_dut_config, **config} + else: + # add dut + dut = Dut({**self.base_dut_config, **config}) + self.duts[config["ip"]] = dut + self.mark_free(dut) def get_free_dut(self, dut_filter=None): filtered_duts = [] - found = False + dut = None for _ in self.duts: - dut = self.duts_queue.get() - if not dut_filter or dut_filter(dut): - found = True + free_dut = self.duts_queue.get() + if not dut_filter or dut_filter(free_dut): + dut = free_dut break - filtered_duts.append(dut) - map(self.duts_queue.put, filtered_duts) - if not found: + if free_dut.ip in self.duts: + filtered_duts.append(free_dut) + map(self.mark_free, filtered_duts) + if not dut: raise Exception("No DUT matching to filter!") return dut def mark_free(self, dut): - self.duts_queue.put(dut) + if dut.ip in self.duts: + self.duts_queue.put(dut) class TestManager: From f460b9a9972c2051ff942ebaa29dd93bf9a47373 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Tue, 1 Oct 2024 11:25:25 +0200 Subject: [PATCH 02/11] Reformatting Signed-off-by: Daniel Madej --- common.py | 6 +++--- jogger | 47 +++++++++++++++++++++++++---------------------- requirements.txt | 1 + runnerd | 17 +++++++++++------ 4 files changed, 40 insertions(+), 31 deletions(-) diff --git a/common.py b/common.py index 488fca2..758d570 100644 --- a/common.py +++ b/common.py @@ -1,5 +1,6 @@ # # Copyright(c) 2023 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # @@ -10,9 +11,7 @@ import hashlib import json import os -import random import re -import sys import time import yaml @@ -173,7 +172,7 @@ def duration(self): start_time = self['start-timestamp'] end_time = self.get('end-timestamp', time.time()) return timedelta(seconds=int(end_time-start_time)) - except: + except KeyError: return timedelta(0) def __eq__(self, other): @@ -196,6 +195,7 @@ def new(cls, test_case, data={}): **data }) + class JournalParser: def __init__(self, journal_file): self.journal_file = journal_file diff --git a/jogger b/jogger index 06a503e..89a34fa 100755 --- a/jogger +++ b/jogger @@ -1,10 +1,11 @@ #!/usr/bin/env python3 # # Copyright(c) 2023 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # -from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser +from common import JournalFile, StatusFile, TestCase, TestEvent, JournalParser from datetime import datetime from functools import reduce @@ -12,7 +13,6 @@ from tabulate import tabulate from tempfile import NamedTemporaryFile import argparse import daemon -import hashlib import json import os import shutil @@ -27,19 +27,19 @@ def error(*args, **kwargs): class Printer: @staticmethod def red(string): - return "\033[0;31m"+string+"\033[0m" + return "\033[0;31m" + string + "\033[0m" @staticmethod def green(string): - return "\033[0;32m"+string+"\033[0m" + return "\033[0;32m" + string + "\033[0m" @staticmethod def yellow(string): - return "\033[0;33m"+string+"\033[0m" + return "\033[0;33m" + string + "\033[0m" @staticmethod def blue(string): - return "\033[0;34m"+string+"\033[0m" + return "\033[0;34m" + string + "\033[0m" class DataPrinter: @@ -47,7 +47,7 @@ class DataPrinter: if output_format not in ['table', 'json']: raise ValueError(f"Invalid output format '{output_format}'") self.output_format = output_format - self.caption = None + self.captions = None self.data = None def setCaptions(self, captions): @@ -188,7 +188,7 @@ class ScopeHandler: ) def run(self, req): - tests = reduce(lambda acc, sha: acc+self.__tests_by_sha(sha), req, []) + tests = reduce(lambda acc, sha: acc + self.__tests_by_sha(sha), req, []) test_events = [] with self.journal_file.record() as journal: for test_case in tests: @@ -238,7 +238,6 @@ class ScopeHandler: for res in results: results_dict[res['sha']] = res - scope_status = [] for test_case in tests: queued_events = filter( lambda e: e['test-case'] == test_case, @@ -303,7 +302,6 @@ class ScopeHandler: ) - class TestSelector: def __init__(self, tests): self.tests = tests @@ -325,6 +323,7 @@ class TestSelector: tmpf.seek(0) return [line.split()[0] for line in tmpf.readlines()] + usage = """%(prog)s command [args] Supported commands: @@ -355,13 +354,15 @@ class SuperRunnerCli: self.scope_handler = ScopeHandler() getattr(self, command)(f"{parser.prog} {args.command}", argv[1:]) - def __color_result(self, string): + @staticmethod + def __color_result(string): return { 'PASSED': Printer.green, 'FAILED': Printer.red, }.get(string, Printer.yellow)(string) - def __print_test_events(self, args, test_events): + @staticmethod + def __print_test_events(args, test_events): p = ListPrinter(args.format) id_field = ('id', 'sha')[args.long] test_field = ('function', 'test')[args.long] @@ -383,16 +384,17 @@ class SuperRunnerCli: }) p.print() - def init(self, prog, argv): + @staticmethod + def init(prog, argv): parser = argparse.ArgumentParser( prog=prog, description="Initialize new test scope" ) - args = parser.parse_args(argv) + _ = parser.parse_args(argv) runner_path = os.getenv('RUNNER_PATH') if not runner_path: - error("Enrvironment variable 'RUNNER_PATH' is not set!") + error("Environment variable 'RUNNER_PATH' is not set!") exit(1) if os.path.isdir("meta"): error("Existing runner scope found in this directory!") @@ -426,7 +428,7 @@ class SuperRunnerCli: 'sha': test_case['sha'], 'function': test_case.function(), 'test': test_case.test() - }) + }) p.print() def run(self, prog, argv): @@ -487,7 +489,7 @@ class SuperRunnerCli: args = parser.parse_args(argv) test_event = self.scope_handler.test_event_by_sha({'sha': args.id}) - deleted_event = self.scope_handler.delete({'test-event': test_event}) + self.scope_handler.delete({'test-event': test_event}) def queue(self, prog, argv): parser = argparse.ArgumentParser( @@ -556,8 +558,8 @@ class SuperRunnerCli: last_event_result = self.__color_result(last_event['result']) last_event_id = last_event['sha'][:16] last_event_sha = last_event['sha'] - last_event_date = datetime.fromtimestamp(last_event['end-timestamp']) \ - .strftime("%Y-%m-%d %H:%M:%S") + last_event_date = (datetime.fromtimestamp(last_event['end-timestamp']) + .strftime("%Y-%m-%d %H:%M:%S")) except: last_event_result = last_event_id = last_event_sha = last_event_date = "" p.addEntry({ @@ -571,7 +573,7 @@ class SuperRunnerCli: 'last-result-id': last_event_id, 'last-result-sha': last_event_sha, 'last-result-date': last_event_date - }) + }) p.print() def results(self, prog, argv): @@ -620,7 +622,7 @@ class SuperRunnerCli: 'test': test_case.test(), 'result': self.__color_result(test_event['result']), 'duration': test_event.duration() - }) + }) p.print() def show(self, prog, argv): @@ -701,7 +703,8 @@ class SuperRunnerCli: with daemon.DaemonContext(): webbrowser.open_new_tab(os.path.join(test_case['logs'], "main.html")) - def stdout(self, prog, argv): + @staticmethod + def stdout(prog, argv): parser = argparse.ArgumentParser( prog=prog, description="Show pytest standard output on selected DUT" diff --git a/requirements.txt b/requirements.txt index 3992629..4870803 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ python-daemon>=2.2.4 setproctitle>=1.1.10 tabulate>=0.8.7 watchdog>=0.10.3 +pyyaml>=5.4 diff --git a/runnerd b/runnerd index 5fd7b65..5e41313 100755 --- a/runnerd +++ b/runnerd @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # # Copyright(c) 2023 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # @@ -25,7 +26,6 @@ import os import sys import time - logging.basicConfig( filename="runnerd.log", level=logging.ERROR, @@ -39,6 +39,7 @@ scriptdir = os.path.dirname(__file__) class EventHandler(FileSystemEventHandler): def __init__(self, callback): self.callback = callback + def on_modified(self, event): self.callback() @@ -95,7 +96,7 @@ class Dut: def run_in_thread(self, test_event, on_complete): self.config['meta'] = { **test_event, - 'test-case' : {**test_event['test-case']} + 'test-case': {**test_event['test-case']} } tmp_conf_file = NamedTemporaryFile(prefix="dut_config_", suffix=".yml") ConfigFile(tmp_conf_file.name).save(self.config) @@ -104,9 +105,9 @@ class Dut: test_case = test_event['test-case'] log.info(f"Start {test_case} @ {self.ip}") runner = PytestRunner( - test_dir = test_case['dir'], - log_dir = self.log_path, - stdout_path = self.stdout_path + test_dir=test_case['dir'], + log_dir=self.log_path, + stdout_path=self.stdout_path ) runner.run(tmp_conf_file.name, test_case) log.info(f"Complete {test_case} @ {self.ip}") @@ -188,7 +189,7 @@ class TestManager: def __load_progress(self): self.test_events_complete.clear() with self.progress_file.edit() as progress: - if not 'test-events' in progress: + if 'test-events' not in progress: return progress['test-events'] = list(filter( lambda entry: entry['status'] == "complete", @@ -283,6 +284,7 @@ class MasterRunner: test_event['end-timestamp'] = time.time() self.test_manager.mark_complete(test_event) self.results_collector.collect() + while True: self.duts_manager.collect_duts() log.debug("Looking for free DUT... ") @@ -301,6 +303,7 @@ class ScopeParser: scope_config_path = "configs/scope_config.yml" def __init__(self): + self.observer = None self.scope_config_file = ConfigFile(self.scope_config_path) self.scope_file = StatusFile("meta/scope.json") @@ -346,6 +349,7 @@ class ScopeParser: def stop(self): self.observer.join() + def daemonize(enabled): if enabled: print("Starting daemon") @@ -355,6 +359,7 @@ def daemonize(enabled): print("Starting in non-daemon mode") return contextlib.suppress() + if __name__ == '__main__': setproctitle('runnerd') parser = argparse.ArgumentParser() From df5b5dfa8fe745c1a99110e1c31642503d85402f Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Tue, 8 Oct 2024 10:36:38 +0200 Subject: [PATCH 03/11] Remove unused filtering + change DUT reconfiguration Signed-off-by: Daniel Madej --- runnerd | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/runnerd b/runnerd index 5e41313..211100f 100755 --- a/runnerd +++ b/runnerd @@ -140,33 +140,27 @@ class DutsManager: del self.duts[ip] for config in self.duts_config['duts']: ip = config["ip"] - if ip in self.duts: - # update config for existing dut - self.duts[ip].config = {**self.base_dut_config, **config} - else: - # add dut - dut = Dut({**self.base_dut_config, **config}) - self.duts[config["ip"]] = dut - self.mark_free(dut) - - def get_free_dut(self, dut_filter=None): - filtered_duts = [] + add_dut = ip not in self.duts + self.duts[ip] = Dut({**self.base_dut_config, **config}) + if add_dut: + self.mark_free(self.duts[ip]) + + def get_free_dut(self): dut = None for _ in self.duts: free_dut = self.duts_queue.get() - if not dut_filter or dut_filter(free_dut): + if free_dut.ip in self.duts: dut = free_dut break - if free_dut.ip in self.duts: - filtered_duts.append(free_dut) - map(self.mark_free, filtered_duts) if not dut: - raise Exception("No DUT matching to filter!") + raise LookupError() return dut def mark_free(self, dut): + # DUTs can be removed from list or updated during test execution + # do not re-queue removed DUT or queue DUT by IP to get current DUT entry if dut.ip in self.duts: - self.duts_queue.put(dut) + self.duts_queue.put(self.duts[dut.ip]) class TestManager: @@ -288,7 +282,11 @@ class MasterRunner: while True: self.duts_manager.collect_duts() log.debug("Looking for free DUT... ") - dut = self.duts_manager.get_free_dut() + try: + dut = self.duts_manager.get_free_dut() + except LookupError: + log.debug("No available DUTs found!") + continue log.debug(f"Found DUT {dut.ip}") log.debug("Looking for next test... ") test_event = self.test_manager.get_next_test() @@ -365,7 +363,7 @@ if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--debug', action='store_true') parser.add_argument('--no-daemon', action='store_true') - parser.add_argument('--version', action='version', version='Superrunner4000 v0.2') + parser.add_argument('--version', action='version', version='Superrunner4000 v0.3') args = parser.parse_args() log.setLevel((logging.INFO, logging.DEBUG)[args.debug]) From a18eec0bd2b80c79ba9dc865f001977adf6e5e3c Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Tue, 8 Oct 2024 14:08:00 +0200 Subject: [PATCH 04/11] DUTs collecting and logging improvements Signed-off-by: Daniel Madej --- runnerd | 52 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/runnerd b/runnerd index 211100f..7c978a1 100755 --- a/runnerd +++ b/runnerd @@ -26,10 +26,12 @@ import os import sys import time +log_format='%(asctime)s %(levelname)-8s %(message)s' + logging.basicConfig( filename="runnerd.log", level=logging.ERROR, - format='%(asctime)s %(levelname)-8s %(message)s', + format=log_format, datefmt='%Y-%m-%d %H:%M:%S') log = logging.getLogger("runnerd") @@ -122,6 +124,7 @@ class DutsManager: base_dut_config_path = "configs/base_dut_config.yml" def __init__(self, duts_config_file): + self.lock = Lock() self.duts_config = None self.duts_config_file = duts_config_file self.base_dut_config = ConfigFile(self.base_dut_config_path).load() @@ -129,21 +132,31 @@ class DutsManager: self.duts = {} def collect_duts(self): - if not self.duts_config_file.need_reload(): - return + with self.lock: + if not self.duts_config_file.need_reload(): + return - self.duts_config = self.duts_config_file.load() - ip_list = [dut["ip"] for dut in self.duts_config['duts']] - for ip in self.duts.copy(): - # remove duts deleted from duts_config_file - if ip not in ip_list: - del self.duts[ip] - for config in self.duts_config['duts']: - ip = config["ip"] - add_dut = ip not in self.duts - self.duts[ip] = Dut({**self.base_dut_config, **config}) - if add_dut: - self.mark_free(self.duts[ip]) + self.duts_config = self.duts_config_file.load() + configured_duts = self.duts_config.get('duts', []) + if configured_duts is None: + log.warning("Incorrect DUTs config!") + return + ip_list = [dut["ip"] for dut in self.duts_config["duts"]] + for ip in self.duts.copy(): + # remove duts deleted from duts_config_file + if ip not in ip_list: + log.debug(f"Removing DUT {ip} from scope") + del self.duts[ip] + if ip_list: + for config in self.duts_config['duts']: + ip = config["ip"] + add_dut = ip not in self.duts + self.duts[ip] = Dut({**self.base_dut_config, **config}) + if add_dut: + log.debug(f"Adding DUT {ip} to scope") + self.mark_free(self.duts[ip]) + else: + log.warning("DUT list is empty!") def get_free_dut(self): dut = None @@ -274,6 +287,7 @@ class MasterRunner: def run(self): def test_run_complete(dut, test_event): + self.duts_manager.collect_duts() self.duts_manager.mark_free(dut) test_event['end-timestamp'] = time.time() self.test_manager.mark_complete(test_event) @@ -281,11 +295,13 @@ class MasterRunner: while True: self.duts_manager.collect_duts() + if not self.duts_manager.duts: + continue log.debug("Looking for free DUT... ") try: dut = self.duts_manager.get_free_dut() except LookupError: - log.debug("No available DUTs found!") + log.warning("No available DUTs found!") continue log.debug(f"Found DUT {dut.ip}") log.debug("Looking for next test... ") @@ -367,6 +383,10 @@ if __name__ == '__main__': args = parser.parse_args() log.setLevel((logging.INFO, logging.DEBUG)[args.debug]) + if args.no_daemon: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter(log_format)) + log.addHandler(handler) if not os.path.isdir("meta"): print("Scope is not initialized in this directory!") From 09d9efd10c677c088e3e313a6af5c3dbcb7166fd Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Tue, 8 Oct 2024 14:52:24 +0200 Subject: [PATCH 05/11] More changes to collecting DUTs Handle FileNotFoundError (when overwriting config file) Look for DUT only if queue is not empty Signed-off-by: Daniel Madej --- common.py | 7 ++++++- runnerd | 22 +++++++++++++++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/common.py b/common.py index 758d570..0cfbf75 100644 --- a/common.py +++ b/common.py @@ -28,7 +28,12 @@ def __access(self): self.last_modify = os.path.getmtime(self.path) def need_reload(self): - return self.last_modify != os.path.getmtime(self.path) + try: + return self.last_modify != os.path.getmtime(self.path) + except FileNotFoundError: + # can occur on config file overwrite + self.last_modify = 0 + raise def load(self): with meta_lock: diff --git a/runnerd b/runnerd index 7c978a1..6529302 100755 --- a/runnerd +++ b/runnerd @@ -133,9 +133,15 @@ class DutsManager: def collect_duts(self): with self.lock: - if not self.duts_config_file.need_reload(): - return - + while True: + try: + if not self.duts_config_file.need_reload(): + return + else: + break + except FileNotFoundError: + pass + log.info("Reloading DUTs config...") self.duts_config = self.duts_config_file.load() configured_duts = self.duts_config.get('duts', []) if configured_duts is None: @@ -160,7 +166,7 @@ class DutsManager: def get_free_dut(self): dut = None - for _ in self.duts: + while self.duts_queue.qsize(): free_dut = self.duts_queue.get() if free_dut.ip in self.duts: dut = free_dut @@ -287,6 +293,8 @@ class MasterRunner: def run(self): def test_run_complete(dut, test_event): + # Sometimes mark_free is quicker than reaching collect_duts in the loop below what + # results in queuing next test on removed DUT. collect_duts here prevents that. self.duts_manager.collect_duts() self.duts_manager.mark_free(dut) test_event['end-timestamp'] = time.time() @@ -295,16 +303,16 @@ class MasterRunner: while True: self.duts_manager.collect_duts() - if not self.duts_manager.duts: + if not self.duts_manager.duts_queue.qsize(): continue - log.debug("Looking for free DUT... ") + log.debug("Looking for a free DUT... ") try: dut = self.duts_manager.get_free_dut() except LookupError: log.warning("No available DUTs found!") continue log.debug(f"Found DUT {dut.ip}") - log.debug("Looking for next test... ") + log.debug("Looking for the next test... ") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") test_event['ip'] = dut.ip From 97b0cfb1b5e7e43648be3e972492bb37efeae458 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Wed, 9 Oct 2024 08:28:13 +0200 Subject: [PATCH 06/11] Refactor Removed redundant code, clarified lock, added logging for missing DUT config file Signed-off-by: Daniel Madej --- runnerd | 49 +++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/runnerd b/runnerd index 6529302..569fdb1 100755 --- a/runnerd +++ b/runnerd @@ -124,7 +124,7 @@ class DutsManager: base_dut_config_path = "configs/base_dut_config.yml" def __init__(self, duts_config_file): - self.lock = Lock() + self.dut_dict_lock = Lock() self.duts_config = None self.duts_config_file = duts_config_file self.base_dut_config = ConfigFile(self.base_dut_config_path).load() @@ -132,15 +132,20 @@ class DutsManager: self.duts = {} def collect_duts(self): - with self.lock: - while True: - try: - if not self.duts_config_file.need_reload(): - return - else: - break - except FileNotFoundError: - pass + start_time = time.time() + while True: + try: + if not self.duts_config_file.need_reload(): + return + else: + break + except FileNotFoundError: + if time.time() - start_time > 5: + log.warning("DUT configuration file not found!") + start_time = time.time() + pass + + with self.dut_dict_lock: log.info("Reloading DUTs config...") self.duts_config = self.duts_config_file.load() configured_duts = self.duts_config.get('duts', []) @@ -165,21 +170,16 @@ class DutsManager: log.warning("DUT list is empty!") def get_free_dut(self): - dut = None - while self.duts_queue.qsize(): - free_dut = self.duts_queue.get() - if free_dut.ip in self.duts: - dut = free_dut - break - if not dut: - raise LookupError() - return dut + free_dut = self.duts_queue.get() + if free_dut.ip in self.duts: + return free_dut def mark_free(self, dut): - # DUTs can be removed from list or updated during test execution - # do not re-queue removed DUT or queue DUT by IP to get current DUT entry - if dut.ip in self.duts: - self.duts_queue.put(self.duts[dut.ip]) + with self.dut_dict_lock: + # DUTs can be removed from list or updated during test execution + # do not re-queue removed DUT or queue DUT by IP to get current DUT entry + if dut.ip in self.duts: + self.duts_queue.put(self.duts[dut.ip]) class TestManager: @@ -293,9 +293,6 @@ class MasterRunner: def run(self): def test_run_complete(dut, test_event): - # Sometimes mark_free is quicker than reaching collect_duts in the loop below what - # results in queuing next test on removed DUT. collect_duts here prevents that. - self.duts_manager.collect_duts() self.duts_manager.mark_free(dut) test_event['end-timestamp'] = time.time() self.test_manager.mark_complete(test_event) From 7dfb52b78c9a7e9d6239df59ec1686421f843e68 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Wed, 9 Oct 2024 09:46:17 +0200 Subject: [PATCH 07/11] Prevent scheduling tests on the same DUT Check if DUT is in use before adding it to queue Prevent duplicated entries in queue Handle faulty config load Signed-off-by: Daniel Madej --- common.py | 8 ++++-- runnerd | 77 ++++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 63 insertions(+), 22 deletions(-) diff --git a/common.py b/common.py index 0cfbf75..d7e4b35 100644 --- a/common.py +++ b/common.py @@ -37,9 +37,13 @@ def need_reload(self): def load(self): with meta_lock: - self.__access() with open(self.path, 'r') as conf: - return yaml.safe_load(conf) + # config can be None if file is being overwritten at read time + # don't update last modify time - let the file be re-read + config = yaml.safe_load(conf) + if config: + self.__access() + return config def save(self, data): with meta_lock: diff --git a/runnerd b/runnerd index 569fdb1..b1566b5 100755 --- a/runnerd +++ b/runnerd @@ -8,7 +8,7 @@ from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser from pathlib import Path -from queue import Queue +from queue import Queue, Empty from setproctitle import setproctitle from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile @@ -124,12 +124,13 @@ class DutsManager: base_dut_config_path = "configs/base_dut_config.yml" def __init__(self, duts_config_file): - self.dut_dict_lock = Lock() + self.duts_lock = Lock() self.duts_config = None self.duts_config_file = duts_config_file self.base_dut_config = ConfigFile(self.base_dut_config_path).load() - self.duts_queue = Queue() + self.duts_queue = DutQueue() self.duts = {} + self.duts_in_use = set() def collect_duts(self): start_time = time.time() @@ -145,43 +146,78 @@ class DutsManager: start_time = time.time() pass - with self.dut_dict_lock: - log.info("Reloading DUTs config...") + with self.duts_lock: + log.debug("Reloading DUTs config...") self.duts_config = self.duts_config_file.load() + if self.duts_config is None: + log.error("Error while loading DUTs config!") + return configured_duts = self.duts_config.get('duts', []) if configured_duts is None: - log.warning("Incorrect DUTs config!") + log.warning("Check DUTs config!") return ip_list = [dut["ip"] for dut in self.duts_config["duts"]] for ip in self.duts.copy(): # remove duts deleted from duts_config_file if ip not in ip_list: - log.debug(f"Removing DUT {ip} from scope") + log.info(f"Removing DUT {ip} from scope (currently running test " + "will NOT be terminated)") del self.duts[ip] if ip_list: for config in self.duts_config['duts']: ip = config["ip"] add_dut = ip not in self.duts + # add new or update config self.duts[ip] = Dut({**self.base_dut_config, **config}) if add_dut: - log.debug(f"Adding DUT {ip} to scope") - self.mark_free(self.duts[ip]) + log.info(f"Adding DUT {ip} to scope") + if ip not in self.duts_in_use: + self.duts_queue.put(self.duts[ip]) + else: + log.debug(f"DUT {ip} is already in use - skip adding to queue") else: log.warning("DUT list is empty!") + return + log.debug("DUTs config reloaded") def get_free_dut(self): - free_dut = self.duts_queue.get() - if free_dut.ip in self.duts: - return free_dut + with self.duts_lock: + while True: + # raise Empty if queue is empty + dut = self.duts_queue.get(block=False) + if dut.ip in self.duts: + self.duts_in_use.add(dut.ip) + return dut def mark_free(self, dut): - with self.dut_dict_lock: - # DUTs can be removed from list or updated during test execution - # do not re-queue removed DUT or queue DUT by IP to get current DUT entry + with self.duts_lock: + self.duts_in_use.remove(dut.ip) + # re-queue only currently configured DUTs if dut.ip in self.duts: + # use current DUT entry (might be updated in the meantime) self.duts_queue.put(self.duts[dut.ip]) +class DutQueue(Queue): + """Queue with unique DUT entries""" + def __init__(self): + super().__init__() + self.dut_set = set() + self.set_lock = Lock() + + def put(self, dut: Dut, block=True, timeout=None): + with self.set_lock: + if dut.ip not in self.dut_set: + self.dut_set.add(dut.ip) + super().put(dut, block, timeout) + + def get(self, block=True, timeout=None) -> Dut: + with self.set_lock: + dut = super().get(block, timeout) + self.dut_set.remove(dut.ip) + return dut + + class TestManager: def __init__(self): self.lock = Lock() @@ -302,14 +338,14 @@ class MasterRunner: self.duts_manager.collect_duts() if not self.duts_manager.duts_queue.qsize(): continue - log.debug("Looking for a free DUT... ") + log.debug("Looking for a free DUT...") try: dut = self.duts_manager.get_free_dut() - except LookupError: - log.warning("No available DUTs found!") + except Empty: + log.debug("No free DUTs at the moment! Waiting...") continue log.debug(f"Found DUT {dut.ip}") - log.debug("Looking for the next test... ") + log.debug("Looking for the next test...") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") test_event['ip'] = dut.ip @@ -327,7 +363,7 @@ class ScopeParser: self.scope_file = StatusFile("meta/scope.json") def __parse_config(self): - log.debug("Reloading scope config") + log.debug("Reloading scope config...") scope_config = self.scope_config_file.load() scope = self.scope_file.load() pytest_runner = PytestRunner(test_dir=scope_config['tests_path']) @@ -352,6 +388,7 @@ class ScopeParser: pytest_options )) test_cases = list(dict.fromkeys(test_cases)) + log.debug("Scope config reloaded") with self.scope_file.edit() as scope: scope['scope'] = scope_config['scope'] From a768e7a06d217d290f145f23b603d1e6bfba7c2b Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Wed, 9 Oct 2024 12:28:39 +0200 Subject: [PATCH 08/11] Changed the order of looking for tests and DUTs Acquire test and then look for DUT to run it on Signed-off-by: Daniel Madej --- runnerd | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/runnerd b/runnerd index b1566b5..d53a04e 100755 --- a/runnerd +++ b/runnerd @@ -335,23 +335,25 @@ class MasterRunner: self.results_collector.collect() while True: - self.duts_manager.collect_duts() - if not self.duts_manager.duts_queue.qsize(): - continue - log.debug("Looking for a free DUT...") - try: - dut = self.duts_manager.get_free_dut() - except Empty: - log.debug("No free DUTs at the moment! Waiting...") - continue - log.debug(f"Found DUT {dut.ip}") log.debug("Looking for the next test...") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") - test_event['ip'] = dut.ip - test_event['start-timestamp'] = time.time() - self.test_manager.mark_started(test_event) - dut.run_test(test_event, test_run_complete) + while True: + self.duts_manager.collect_duts() + if not self.duts_manager.duts_queue.qsize(): + continue + log.debug("Looking for a free DUT...") + try: + dut = self.duts_manager.get_free_dut() + except Empty: + log.debug("No free DUTs at the moment! Waiting...") + continue + log.debug(f"Found DUT {dut.ip}") + test_event['ip'] = dut.ip + test_event['start-timestamp'] = time.time() + self.test_manager.mark_started(test_event) + dut.run_test(test_event, test_run_complete) + break class ScopeParser: From 6fc3986ec9a19c74a3947aa8a10224a6c60eec73 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Wed, 9 Oct 2024 13:32:27 +0200 Subject: [PATCH 09/11] Collect DUTs in a separate thread Signed-off-by: Daniel Madej --- runnerd | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/runnerd b/runnerd index d53a04e..e78e2b1 100755 --- a/runnerd +++ b/runnerd @@ -334,12 +334,18 @@ class MasterRunner: self.test_manager.mark_complete(test_event) self.results_collector.collect() + def collect_duts(): + while True: + self.duts_manager.collect_duts() + + collector_thread = Thread(target=collect_duts) + collector_thread.start() + while True: log.debug("Looking for the next test...") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") while True: - self.duts_manager.collect_duts() if not self.duts_manager.duts_queue.qsize(): continue log.debug("Looking for a free DUT...") From 0373fe1cd5aa17a333549ec8070c1a30bfe39985 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Thu, 10 Oct 2024 08:38:46 +0200 Subject: [PATCH 10/11] Simplify getting DUTs With separate collection thread we can get rid of double loop. Signed-off-by: Daniel Madej --- runnerd | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/runnerd b/runnerd index e78e2b1..0b8d19d 100755 --- a/runnerd +++ b/runnerd @@ -8,7 +8,7 @@ from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser from pathlib import Path -from queue import Queue, Empty +from queue import Queue from setproctitle import setproctitle from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile @@ -26,7 +26,7 @@ import os import sys import time -log_format='%(asctime)s %(levelname)-8s %(message)s' +log_format = '%(asctime)s %(levelname)-8s %(message)s' logging.basicConfig( filename="runnerd.log", @@ -154,7 +154,7 @@ class DutsManager: return configured_duts = self.duts_config.get('duts', []) if configured_duts is None: - log.warning("Check DUTs config!") + log.error("Cannot read DUTs - check DUTs config file!") return ip_list = [dut["ip"] for dut in self.duts_config["duts"]] for ip in self.duts.copy(): @@ -181,10 +181,9 @@ class DutsManager: log.debug("DUTs config reloaded") def get_free_dut(self): - with self.duts_lock: - while True: - # raise Empty if queue is empty - dut = self.duts_queue.get(block=False) + while True: + dut = self.duts_queue.get() + with self.duts_lock: if dut.ip in self.duts: self.duts_in_use.add(dut.ip) return dut @@ -212,8 +211,8 @@ class DutQueue(Queue): super().put(dut, block, timeout) def get(self, block=True, timeout=None) -> Dut: + dut = super().get(block, timeout) with self.set_lock: - dut = super().get(block, timeout) self.dut_set.remove(dut.ip) return dut @@ -345,21 +344,13 @@ class MasterRunner: log.debug("Looking for the next test...") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") - while True: - if not self.duts_manager.duts_queue.qsize(): - continue - log.debug("Looking for a free DUT...") - try: - dut = self.duts_manager.get_free_dut() - except Empty: - log.debug("No free DUTs at the moment! Waiting...") - continue - log.debug(f"Found DUT {dut.ip}") - test_event['ip'] = dut.ip - test_event['start-timestamp'] = time.time() - self.test_manager.mark_started(test_event) - dut.run_test(test_event, test_run_complete) - break + log.debug("Looking for a free DUT...") + dut = self.duts_manager.get_free_dut() + log.debug(f"Found DUT {dut.ip}") + test_event['ip'] = dut.ip + test_event['start-timestamp'] = time.time() + self.test_manager.mark_started(test_event) + dut.run_test(test_event, test_run_complete) class ScopeParser: From c820db184c543cb89b90a2e8e44823c0a745c596 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Thu, 10 Oct 2024 15:50:04 +0200 Subject: [PATCH 11/11] Unify termination behaviour Daemonized and non-daemonized runner will now behave the same after receiving a SIGTERM or SIGINT signal and close all the processes in the process group. The behaviour was different also for Ctrl+C (affected process group) and sending SIGINT through kill command (affected only the main process). Signed-off-by: Daniel Madej --- runnerd | 60 ++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/runnerd b/runnerd index 0b8d19d..9835af1 100755 --- a/runnerd +++ b/runnerd @@ -4,11 +4,13 @@ # Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # +import signal +import threading from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser from pathlib import Path -from queue import Queue +from queue import Queue, Empty from setproctitle import setproctitle from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile @@ -37,6 +39,9 @@ log = logging.getLogger("runnerd") scriptdir = os.path.dirname(__file__) +child_pids = [] +pid_lock = Lock() + class EventHandler(FileSystemEventHandler): def __init__(self, callback): @@ -67,8 +72,12 @@ class PytestRunner: cmd += f"\"{test_case}\"" out = open(self.stdout_path, "w") if self.stdout_path else PIPE - process = Popen(cmd, shell=True, stdout=out, stderr=out) + with pid_lock: + process = Popen(cmd, shell=True, stdout=out, stderr=out) + child_pids.append(process.pid) process.wait() + with pid_lock: + child_pids.remove(process.pid) if self.stdout_path: out.close() @@ -180,9 +189,12 @@ class DutsManager: return log.debug("DUTs config reloaded") - def get_free_dut(self): - while True: - dut = self.duts_queue.get() + def get_free_dut(self, working: Event): + while working.is_set(): + try: + dut = self.duts_queue.get(block=False) + except Empty: + continue with self.duts_lock: if dut.ip in self.duts: self.duts_in_use.add(dut.ip) @@ -325,6 +337,7 @@ class MasterRunner: self.test_manager = TestManager() self.results_collector = ResultsCollector() self.results_collector.collect() + self.working = threading.Event() def run(self): def test_run_complete(dut, test_event): @@ -334,24 +347,32 @@ class MasterRunner: self.results_collector.collect() def collect_duts(): - while True: + while self.working.is_set(): self.duts_manager.collect_duts() collector_thread = Thread(target=collect_duts) + + self.working.set() + collector_thread.start() - while True: + while self.working.is_set(): log.debug("Looking for the next test...") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") log.debug("Looking for a free DUT...") - dut = self.duts_manager.get_free_dut() + dut = self.duts_manager.get_free_dut(self.working) + if dut is None: + # when execution is interrupted + break log.debug(f"Found DUT {dut.ip}") test_event['ip'] = dut.ip test_event['start-timestamp'] = time.time() self.test_manager.mark_started(test_event) dut.run_test(test_event, test_run_complete) + collector_thread.join() + class ScopeParser: scope_config_path = "configs/scope_config.yml" @@ -409,8 +430,16 @@ def daemonize(enabled): if enabled: print("Starting daemon") logger_files = [handler.stream.fileno() for handler in logging.root.handlers] - return daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=logger_files) + signal_map = { + signal.SIGINT: signal_handler, + signal.SIGTERM: signal_handler, + } + return daemon.DaemonContext( + working_directory=os.getcwd(), files_preserve=logger_files, signal_map=signal_map, + ) else: + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) print("Starting in non-daemon mode") return contextlib.suppress() @@ -436,12 +465,21 @@ if __name__ == '__main__': try: lock = filelock.FileLock("meta/daemon.lock").acquire(timeout=0.01) except filelock.Timeout: - print("Another instance of 'runnerd' demon is active!") + print("Another instance of 'runnerd' daemon is active!") exit(1) print("Preparing scope") scope_parser = ScopeParser() scope_parser.start() + + runner = MasterRunner() + + def signal_handler(signum, frame): + log.debug(f"Signal {signum} - terminate execution") + runner.working.clear() + with pid_lock: + for pid in child_pids: + os.kill(pid, signal.SIGTERM) + with daemonize(not args.no_daemon): - runner = MasterRunner() runner.run()