diff --git a/common.py b/common.py index 488fca2..d7e4b35 100644 --- a/common.py +++ b/common.py @@ -1,5 +1,6 @@ # # Copyright(c) 2023 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # @@ -10,9 +11,7 @@ import hashlib import json import os -import random import re -import sys import time import yaml @@ -29,13 +28,22 @@ def __access(self): self.last_modify = os.path.getmtime(self.path) def need_reload(self): - return self.last_modify != os.path.getmtime(self.path) + try: + return self.last_modify != os.path.getmtime(self.path) + except FileNotFoundError: + # can occur on config file overwrite + self.last_modify = 0 + raise def load(self): with meta_lock: - self.__access() with open(self.path, 'r') as conf: - return yaml.safe_load(conf) + # config can be None if file is being overwritten at read time + # don't update last modify time - let the file be re-read + config = yaml.safe_load(conf) + if config: + self.__access() + return config def save(self, data): with meta_lock: @@ -173,7 +181,7 @@ def duration(self): start_time = self['start-timestamp'] end_time = self.get('end-timestamp', time.time()) return timedelta(seconds=int(end_time-start_time)) - except: + except KeyError: return timedelta(0) def __eq__(self, other): @@ -196,6 +204,7 @@ def new(cls, test_case, data={}): **data }) + class JournalParser: def __init__(self, journal_file): self.journal_file = journal_file diff --git a/jogger b/jogger index 06a503e..89a34fa 100755 --- a/jogger +++ b/jogger @@ -1,10 +1,11 @@ #!/usr/bin/env python3 # # Copyright(c) 2023 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # -from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser +from common import JournalFile, StatusFile, TestCase, TestEvent, JournalParser from datetime import datetime from functools import reduce @@ -12,7 +13,6 @@ from tabulate import tabulate from tempfile import NamedTemporaryFile import argparse import daemon -import hashlib import json import os import shutil @@ -27,19 +27,19 @@ def error(*args, **kwargs): class Printer: @staticmethod def red(string): - return "\033[0;31m"+string+"\033[0m" + return "\033[0;31m" + string + "\033[0m" @staticmethod def green(string): - return "\033[0;32m"+string+"\033[0m" + return "\033[0;32m" + string + "\033[0m" @staticmethod def yellow(string): - return "\033[0;33m"+string+"\033[0m" + return "\033[0;33m" + string + "\033[0m" @staticmethod def blue(string): - return "\033[0;34m"+string+"\033[0m" + return "\033[0;34m" + string + "\033[0m" class DataPrinter: @@ -47,7 +47,7 @@ class DataPrinter: if output_format not in ['table', 'json']: raise ValueError(f"Invalid output format '{output_format}'") self.output_format = output_format - self.caption = None + self.captions = None self.data = None def setCaptions(self, captions): @@ -188,7 +188,7 @@ class ScopeHandler: ) def run(self, req): - tests = reduce(lambda acc, sha: acc+self.__tests_by_sha(sha), req, []) + tests = reduce(lambda acc, sha: acc + self.__tests_by_sha(sha), req, []) test_events = [] with self.journal_file.record() as journal: for test_case in tests: @@ -238,7 +238,6 @@ class ScopeHandler: for res in results: results_dict[res['sha']] = res - scope_status = [] for test_case in tests: queued_events = filter( lambda e: e['test-case'] == test_case, @@ -303,7 +302,6 @@ class ScopeHandler: ) - class TestSelector: def __init__(self, tests): self.tests = tests @@ -325,6 +323,7 @@ class TestSelector: tmpf.seek(0) return [line.split()[0] for line in tmpf.readlines()] + usage = """%(prog)s command [args] Supported commands: @@ -355,13 +354,15 @@ class SuperRunnerCli: self.scope_handler = ScopeHandler() getattr(self, command)(f"{parser.prog} {args.command}", argv[1:]) - def __color_result(self, string): + @staticmethod + def __color_result(string): return { 'PASSED': Printer.green, 'FAILED': Printer.red, }.get(string, Printer.yellow)(string) - def __print_test_events(self, args, test_events): + @staticmethod + def __print_test_events(args, test_events): p = ListPrinter(args.format) id_field = ('id', 'sha')[args.long] test_field = ('function', 'test')[args.long] @@ -383,16 +384,17 @@ class SuperRunnerCli: }) p.print() - def init(self, prog, argv): + @staticmethod + def init(prog, argv): parser = argparse.ArgumentParser( prog=prog, description="Initialize new test scope" ) - args = parser.parse_args(argv) + _ = parser.parse_args(argv) runner_path = os.getenv('RUNNER_PATH') if not runner_path: - error("Enrvironment variable 'RUNNER_PATH' is not set!") + error("Environment variable 'RUNNER_PATH' is not set!") exit(1) if os.path.isdir("meta"): error("Existing runner scope found in this directory!") @@ -426,7 +428,7 @@ class SuperRunnerCli: 'sha': test_case['sha'], 'function': test_case.function(), 'test': test_case.test() - }) + }) p.print() def run(self, prog, argv): @@ -487,7 +489,7 @@ class SuperRunnerCli: args = parser.parse_args(argv) test_event = self.scope_handler.test_event_by_sha({'sha': args.id}) - deleted_event = self.scope_handler.delete({'test-event': test_event}) + self.scope_handler.delete({'test-event': test_event}) def queue(self, prog, argv): parser = argparse.ArgumentParser( @@ -556,8 +558,8 @@ class SuperRunnerCli: last_event_result = self.__color_result(last_event['result']) last_event_id = last_event['sha'][:16] last_event_sha = last_event['sha'] - last_event_date = datetime.fromtimestamp(last_event['end-timestamp']) \ - .strftime("%Y-%m-%d %H:%M:%S") + last_event_date = (datetime.fromtimestamp(last_event['end-timestamp']) + .strftime("%Y-%m-%d %H:%M:%S")) except: last_event_result = last_event_id = last_event_sha = last_event_date = "" p.addEntry({ @@ -571,7 +573,7 @@ class SuperRunnerCli: 'last-result-id': last_event_id, 'last-result-sha': last_event_sha, 'last-result-date': last_event_date - }) + }) p.print() def results(self, prog, argv): @@ -620,7 +622,7 @@ class SuperRunnerCli: 'test': test_case.test(), 'result': self.__color_result(test_event['result']), 'duration': test_event.duration() - }) + }) p.print() def show(self, prog, argv): @@ -701,7 +703,8 @@ class SuperRunnerCli: with daemon.DaemonContext(): webbrowser.open_new_tab(os.path.join(test_case['logs'], "main.html")) - def stdout(self, prog, argv): + @staticmethod + def stdout(prog, argv): parser = argparse.ArgumentParser( prog=prog, description="Show pytest standard output on selected DUT" diff --git a/requirements.txt b/requirements.txt index 3992629..4870803 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ python-daemon>=2.2.4 setproctitle>=1.1.10 tabulate>=0.8.7 watchdog>=0.10.3 +pyyaml>=5.4 diff --git a/runnerd b/runnerd index a0bb92b..9835af1 100755 --- a/runnerd +++ b/runnerd @@ -1,13 +1,16 @@ #!/usr/bin/env python3 # # Copyright(c) 2023 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # +import signal +import threading from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser from pathlib import Path -from queue import Queue +from queue import Queue, Empty from setproctitle import setproctitle from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile @@ -25,20 +28,25 @@ import os import sys import time +log_format = '%(asctime)s %(levelname)-8s %(message)s' logging.basicConfig( filename="runnerd.log", level=logging.ERROR, - format='%(asctime)s %(levelname)-8s %(message)s', + format=log_format, datefmt='%Y-%m-%d %H:%M:%S') log = logging.getLogger("runnerd") scriptdir = os.path.dirname(__file__) +child_pids = [] +pid_lock = Lock() + class EventHandler(FileSystemEventHandler): def __init__(self, callback): self.callback = callback + def on_modified(self, event): self.callback() @@ -64,8 +72,12 @@ class PytestRunner: cmd += f"\"{test_case}\"" out = open(self.stdout_path, "w") if self.stdout_path else PIPE - process = Popen(cmd, shell=True, stdout=out, stderr=out) + with pid_lock: + process = Popen(cmd, shell=True, stdout=out, stderr=out) + child_pids.append(process.pid) process.wait() + with pid_lock: + child_pids.remove(process.pid) if self.stdout_path: out.close() @@ -95,7 +107,7 @@ class Dut: def run_in_thread(self, test_event, on_complete): self.config['meta'] = { **test_event, - 'test-case' : {**test_event['test-case']} + 'test-case': {**test_event['test-case']} } tmp_conf_file = NamedTemporaryFile(prefix="dut_config_", suffix=".yml") ConfigFile(tmp_conf_file.name).save(self.config) @@ -104,9 +116,9 @@ class Dut: test_case = test_event['test-case'] log.info(f"Start {test_case} @ {self.ip}") runner = PytestRunner( - test_dir = test_case['dir'], - log_dir = self.log_path, - stdout_path = self.stdout_path + test_dir=test_case['dir'], + log_dir=self.log_path, + stdout_path=self.stdout_path ) runner.run(tmp_conf_file.name, test_case) log.info(f"Complete {test_case} @ {self.ip}") @@ -121,38 +133,100 @@ class DutsManager: base_dut_config_path = "configs/base_dut_config.yml" def __init__(self, duts_config_file): + self.duts_lock = Lock() + self.duts_config = None self.duts_config_file = duts_config_file self.base_dut_config = ConfigFile(self.base_dut_config_path).load() - self.duts_queue = Queue() - self.duts = [] + self.duts_queue = DutQueue() + self.duts = {} + self.duts_in_use = set() def collect_duts(self): - if not self.duts_config_file.need_reload(): - return + start_time = time.time() + while True: + try: + if not self.duts_config_file.need_reload(): + return + else: + break + except FileNotFoundError: + if time.time() - start_time > 5: + log.warning("DUT configuration file not found!") + start_time = time.time() + pass + + with self.duts_lock: + log.debug("Reloading DUTs config...") + self.duts_config = self.duts_config_file.load() + if self.duts_config is None: + log.error("Error while loading DUTs config!") + return + configured_duts = self.duts_config.get('duts', []) + if configured_duts is None: + log.error("Cannot read DUTs - check DUTs config file!") + return + ip_list = [dut["ip"] for dut in self.duts_config["duts"]] + for ip in self.duts.copy(): + # remove duts deleted from duts_config_file + if ip not in ip_list: + log.info(f"Removing DUT {ip} from scope (currently running test " + "will NOT be terminated)") + del self.duts[ip] + if ip_list: + for config in self.duts_config['duts']: + ip = config["ip"] + add_dut = ip not in self.duts + # add new or update config + self.duts[ip] = Dut({**self.base_dut_config, **config}) + if add_dut: + log.info(f"Adding DUT {ip} to scope") + if ip not in self.duts_in_use: + self.duts_queue.put(self.duts[ip]) + else: + log.debug(f"DUT {ip} is already in use - skip adding to queue") + else: + log.warning("DUT list is empty!") + return + log.debug("DUTs config reloaded") - self.duts_config = self.duts_config_file.load() - self.duts = [] - for config in self.duts_config['duts']: - dut = Dut({**self.base_dut_config, **config}) - self.duts.append(dut) - self.duts_queue.put(dut) - - def get_free_dut(self, dut_filter=None): - filtered_duts = [] - found = False - for _ in self.duts: - dut = self.duts_queue.get() - if not dut_filter or dut_filter(dut): - found = True - break - filtered_duts.append(dut) - map(self.duts_queue.put, filtered_duts) - if not found: - raise Exception("No DUT matching to filter!") - return dut + def get_free_dut(self, working: Event): + while working.is_set(): + try: + dut = self.duts_queue.get(block=False) + except Empty: + continue + with self.duts_lock: + if dut.ip in self.duts: + self.duts_in_use.add(dut.ip) + return dut def mark_free(self, dut): - self.duts_queue.put(dut) + with self.duts_lock: + self.duts_in_use.remove(dut.ip) + # re-queue only currently configured DUTs + if dut.ip in self.duts: + # use current DUT entry (might be updated in the meantime) + self.duts_queue.put(self.duts[dut.ip]) + + +class DutQueue(Queue): + """Queue with unique DUT entries""" + def __init__(self): + super().__init__() + self.dut_set = set() + self.set_lock = Lock() + + def put(self, dut: Dut, block=True, timeout=None): + with self.set_lock: + if dut.ip not in self.dut_set: + self.dut_set.add(dut.ip) + super().put(dut, block, timeout) + + def get(self, block=True, timeout=None) -> Dut: + dut = super().get(block, timeout) + with self.set_lock: + self.dut_set.remove(dut.ip) + return dut class TestManager: @@ -175,7 +249,7 @@ class TestManager: def __load_progress(self): self.test_events_complete.clear() with self.progress_file.edit() as progress: - if not 'test-events' in progress: + if 'test-events' not in progress: return progress['test-events'] = list(filter( lambda entry: entry['status'] == "complete", @@ -263,6 +337,7 @@ class MasterRunner: self.test_manager = TestManager() self.results_collector = ResultsCollector() self.results_collector.collect() + self.working = threading.Event() def run(self): def test_run_complete(dut, test_event): @@ -270,29 +345,45 @@ class MasterRunner: test_event['end-timestamp'] = time.time() self.test_manager.mark_complete(test_event) self.results_collector.collect() - while True: - self.duts_manager.collect_duts() - log.debug("Looking for free DUT... ") - dut = self.duts_manager.get_free_dut() - log.debug(f"Found DUT {dut.ip}") - log.debug("Looking for next test... ") + + def collect_duts(): + while self.working.is_set(): + self.duts_manager.collect_duts() + + collector_thread = Thread(target=collect_duts) + + self.working.set() + + collector_thread.start() + + while self.working.is_set(): + log.debug("Looking for the next test...") test_event = self.test_manager.get_next_test() log.debug(f"Found test {test_event}") + log.debug("Looking for a free DUT...") + dut = self.duts_manager.get_free_dut(self.working) + if dut is None: + # when execution is interrupted + break + log.debug(f"Found DUT {dut.ip}") test_event['ip'] = dut.ip test_event['start-timestamp'] = time.time() self.test_manager.mark_started(test_event) dut.run_test(test_event, test_run_complete) + collector_thread.join() + class ScopeParser: scope_config_path = "configs/scope_config.yml" def __init__(self): + self.observer = None self.scope_config_file = ConfigFile(self.scope_config_path) self.scope_file = StatusFile("meta/scope.json") def __parse_config(self): - log.debug("Reloading scope config") + log.debug("Reloading scope config...") scope_config = self.scope_config_file.load() scope = self.scope_file.load() pytest_runner = PytestRunner(test_dir=scope_config['tests_path']) @@ -317,6 +408,7 @@ class ScopeParser: pytest_options )) test_cases = list(dict.fromkeys(test_cases)) + log.debug("Scope config reloaded") with self.scope_file.edit() as scope: scope['scope'] = scope_config['scope'] @@ -333,24 +425,38 @@ class ScopeParser: def stop(self): self.observer.join() + def daemonize(enabled): if enabled: print("Starting daemon") logger_files = [handler.stream.fileno() for handler in logging.root.handlers] - return daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=logger_files) + signal_map = { + signal.SIGINT: signal_handler, + signal.SIGTERM: signal_handler, + } + return daemon.DaemonContext( + working_directory=os.getcwd(), files_preserve=logger_files, signal_map=signal_map, + ) else: + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) print("Starting in non-daemon mode") return contextlib.suppress() + if __name__ == '__main__': setproctitle('runnerd') parser = argparse.ArgumentParser() parser.add_argument('--debug', action='store_true') parser.add_argument('--no-daemon', action='store_true') - parser.add_argument('--version', action='version', version='Superrunner4000 v0.2') + parser.add_argument('--version', action='version', version='Superrunner4000 v0.3') args = parser.parse_args() log.setLevel((logging.INFO, logging.DEBUG)[args.debug]) + if args.no_daemon: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter(log_format)) + log.addHandler(handler) if not os.path.isdir("meta"): print("Scope is not initialized in this directory!") @@ -359,12 +465,21 @@ if __name__ == '__main__': try: lock = filelock.FileLock("meta/daemon.lock").acquire(timeout=0.01) except filelock.Timeout: - print("Another instance of 'runnerd' demon is active!") + print("Another instance of 'runnerd' daemon is active!") exit(1) print("Preparing scope") scope_parser = ScopeParser() scope_parser.start() + + runner = MasterRunner() + + def signal_handler(signum, frame): + log.debug(f"Signal {signum} - terminate execution") + runner.working.clear() + with pid_lock: + for pid in child_pids: + os.kill(pid, signal.SIGTERM) + with daemonize(not args.no_daemon): - runner = MasterRunner() runner.run()