diff --git a/.git_archival.txt b/.git_archival.txt new file mode 100644 index 00000000..8fb235d7 --- /dev/null +++ b/.git_archival.txt @@ -0,0 +1,4 @@ +node: $Format:%H$ +node-date: $Format:%cI$ +describe-name: $Format:%(describe:tags=true,match=*[0-9]*)$ +ref-names: $Format:%D$ diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..00a7b00c --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +.git_archival.txt export-subst diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 00000000..e1f43427 --- /dev/null +++ b/.github/codecov.yml @@ -0,0 +1,3 @@ +codecov: + notify: + after_n_builds: 4 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..49d74ba5 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,32 @@ +name: CI + +on: + workflow_dispatch: + inputs: + upload-wheel: + type: boolean + required: false + default: false + description: Upload wheel as an artifact + pull_request: + push: + branches: [ main ] + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + tests: + uses: ./.github/workflows/step_test.yml + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + build-wheel: + uses: ./.github/workflows/step_build-wheel.yml + needs: [ tests ] + with: + upload: ${{ inputs.upload-wheel || false }} diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml deleted file mode 100644 index 9981cdf0..00000000 --- a/.github/workflows/pre-commit.yml +++ /dev/null @@ -1,14 +0,0 @@ -name: pre-commit - -on: - pull_request: - push: - branches: [master] - -jobs: - pre-commit: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - - uses: pre-commit/action@v2.0.2 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b4ca4e41..d7a7fa3e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,17 +10,30 @@ on: required: true jobs: - release: + build-wheel: + name: Build wheel runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 if: ${{ github.event_name == 'release' }} - - uses: actions/checkout@v2 - if: ${{ github.event_name == 'workflow_dispatch' }} + - uses: actions/checkout@v3 with: ref: ${{ github.event.inputs.ref }} - - name: Create dist - run: make wheel + if: ${{ github.event_name == 'workflow_dispatch' }} + - name: Build package + run: pipx run build + - uses: actions/upload-artifact@v3 + with: + path: dist/* + release: + name: Prepare release + runs-on: ubuntu-latest + needs: [build-wheel] + steps: + - uses: actions/download-artifact@v3 + with: + name: artifact + path: dist - name: Publish to PyPI uses: pypa/gh-action-pypi-publish@release/v1 with: diff --git a/.github/workflows/step_build-wheel.yml b/.github/workflows/step_build-wheel.yml new file mode 100644 index 00000000..4f80a0f1 --- /dev/null +++ b/.github/workflows/step_build-wheel.yml @@ -0,0 +1,23 @@ +on: + workflow_call: + inputs: + upload: + required: false + type: boolean + default: true + description: Upload wheel as artifact + +permissions: + contents: read + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Build package + run: pipx run build + - uses: actions/upload-artifact@v3 + with: + path: dist/* + if: ${{ inputs.upload }} diff --git a/.github/workflows/step_test.yml b/.github/workflows/step_test.yml new file mode 100644 index 00000000..0805ad31 --- /dev/null +++ b/.github/workflows/step_test.yml @@ -0,0 +1,42 @@ +on: + workflow_call: + secrets: + CODECOV_TOKEN: + required: false + description: Codecov token + +permissions: + contents: read + +jobs: + pre-commit: + name: Run pre-commit + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + - uses: pre-commit/action@v3.0.0 + + checks: + name: + Check 🐍 ${{ matrix.python-version }} + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + allow-prereleases: true + + - name: Install package + run: pip install -e.[tests-cov] + - name: Test package + run: pytest --cov --cov-report=xml + - name: Upload coverage report + uses: codecov/codecov-action@v3 + with: + name: python-${{ matrix.python-version }} diff --git a/.packit.yaml b/.packit.yaml index 00ab97ec..a98966b2 100644 --- a/.packit.yaml +++ b/.packit.yaml @@ -5,29 +5,31 @@ synced_files: upstream_package_name: fmf downstream_package_name: fmf +# Epel9 fails to build with dynamic version. Need to create archive with PKG-INFO +# F37 works with setuptools_scm 7.0 actions: create-archive: - - make tarball + - "python3 -m build --sdist --outdir ." + - "sh -c 'echo fmf-$(hatch version).tar.gz'" get-current-version: - - make version + - "hatch version" srpm_build_deps: - - make - - python3-docutils + - python3-build + - hatch + - python3-hatch-vcs jobs: - job: copr_build trigger: pull_request targets: - fedora-all - - epel-8 - epel-9 - job: tests trigger: pull_request targets: - fedora-all - - epel-8 - epel-9 - job: copr_build @@ -35,7 +37,6 @@ jobs: branch: main targets: - fedora-all - - epel-8 - epel-9 list_on_homepage: True preserve_project: True diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ec413a8..ed530795 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,3 +38,11 @@ repos: - id: end-of-file-fixer - id: mixed-line-ending - id: trailing-whitespace + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: "v1.4.1" + hooks: + - id: mypy + files: ^(fmf) + additional_dependencies: + - types-jsonschema diff --git a/.readthedocs.yaml b/.readthedocs.yaml index af717e0c..a2dfdff0 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,5 +1,11 @@ # Config for building https://fmf.readthedocs.io/ version: 2 +build: + os: ubuntu-22.04 + tools: + python: "3.11" +sphinx: + configuration: docs/conf.py python: install: - method: pip diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 536c8264..00000000 --- a/.travis.yml +++ /dev/null @@ -1,18 +0,0 @@ -language: python -python: - - "2.7" - - "3.6" - - "3.7" -before_install: - - "pip install -U pip setuptools virtualenv coveralls PyYAML" -script: - - "coverage run --source=bin,fmf -m py.test $CAPTURE tests" -after_success: - - coveralls - - coverage report - -# Hint: To enable more detailed logging for debugging purposes, -# define the following variables in the Travis CI web interface -# -# CAPTURE=--capture=no -# DEBUG=5 diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index b4ac02bd..00000000 --- a/MANIFEST.in +++ /dev/null @@ -1 +0,0 @@ -include fmf.spec diff --git a/Makefile b/Makefile index 9ee405b3..5063369b 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,10 @@ # Prepare variables TMP = $(CURDIR)/tmp -VERSION = $(shell grep ^Version fmf.spec | sed 's/.* //') -COMMIT = $(shell git rev-parse --short HEAD) -REPLACE_VERSION = "s/running from the source/$(VERSION) ($(COMMIT))/" +VERSION = $(hatch version) PACKAGE = fmf-$(VERSION) FILES = LICENSE README.rst \ - Makefile fmf.spec setup.py \ - examples fmf bin tests + Makefile fmf.spec pyproject.toml \ + examples fmf tests # Define special targets all: docs packages @@ -19,11 +17,11 @@ tmp: # Run the test suite, optionally with coverage test: tmp - pytest tests/unit -c tests/unit/pytest.ini + pytest tests/unit smoke: tmp - pytest tests/unit/test_smoke.py -c tests/unit/pytest.ini + pytest tests/unit/test_smoke.py coverage: tmp - coverage run --source=fmf,bin -m py.test -c tests/unit/pytest.ini tests + coverage run --source=fmf -m py.test tests coverage report coverage annotate @@ -42,12 +40,9 @@ source: clean tmp mkdir -p $(TMP)/SOURCES mkdir -p $(TMP)/$(PACKAGE) cp -a $(FILES) $(TMP)/$(PACKAGE) - sed -i $(REPLACE_VERSION) $(TMP)/$(PACKAGE)/fmf/__init__.py tarball: source man cd $(TMP) && tar cfz SOURCES/$(PACKAGE).tar.gz $(PACKAGE) @echo ./tmp/SOURCES/$(PACKAGE).tar.gz -version: - @echo "$(VERSION)" rpm: tarball rpmbuild --define '_topdir $(TMP)' -bb fmf.spec srpm: tarball @@ -57,8 +52,7 @@ packages: rpm srpm # Python packaging wheel: - python setup.py bdist_wheel - python3 setup.py bdist_wheel + python3 -m build upload: twine upload dist/*.whl diff --git a/bin/fmf b/bin/fmf deleted file mode 100755 index 0b8a45df..00000000 --- a/bin/fmf +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/python -# coding: utf-8 - -# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# -# fmf - Flexible Metadata Format -# Author: Petr Šplíchal -# -# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# -# Copyright (c) 2018 Red Hat, Inc. -# -# This program is free software: you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation, either version 2 of -# the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be -# useful, but WITHOUT ANY WARRANTY; without even the implied -# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR -# PURPOSE. See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses/. -# -# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -import sys - -import fmf.base -import fmf.cli -import fmf.utils - -try: - fmf.cli.main() -except fmf.utils.GeneralError as error: - if "--debug" in sys.argv: - raise - fmf.utils.log.error(error) - raise SystemExit(1) diff --git a/fmf.spec b/fmf.spec index 8bfdcaf6..ed57f674 100644 --- a/fmf.spec +++ b/fmf.spec @@ -1,16 +1,19 @@ Name: fmf -Version: 1.2.1 -Release: 1%{?dist} +Version: 0.0.0 +Release: %autorelease Summary: Flexible Metadata Format License: GPLv2+ BuildArch: noarch -URL: https://github.com/psss/fmf -Source0: https://github.com/psss/fmf/releases/download/%{version}/fmf-%{version}.tar.gz +URL: https://github.com/teemtee/fmf +Source0: https://github.com/teemtee/fmf/releases/download/%{version}/fmf-%{version}.tar.gz # Main fmf package requires the Python module -Requires: python%{python3_pkgversion}-%{name} == %{version}-%{release} +BuildRequires: python3-devel +BuildRequires: python3dist(docutils) +BuildRequires: git-core +Requires: python3-fmf == %{version}-%{release} %description The fmf Python module and command line tool implement a flexible @@ -20,22 +23,12 @@ with support for inheritance and elasticity it provides an efficient way to organize data into well-sized text documents. This package contains the command line tool. -%?python_enable_dependency_generator - -%package -n python%{python3_pkgversion}-%{name} +%package -n python3-fmf Summary: %{summary} -BuildRequires: python%{python3_pkgversion}-devel -BuildRequires: python%{python3_pkgversion}-setuptools -BuildRequires: python%{python3_pkgversion}-pytest -BuildRequires: python%{python3_pkgversion}-ruamel-yaml -BuildRequires: python%{python3_pkgversion}-filelock -BuildRequires: python%{python3_pkgversion}-jsonschema -BuildRequires: git-core -%{?python_provide:%python_provide python%{python3_pkgversion}-%{name}} Requires: git-core -%description -n python%{python3_pkgversion}-%{name} +%description -n python3-fmf The fmf Python module and command line tool implement a flexible format for defining metadata in plain text files which can be stored close to the source code. Thanks to hierarchical structure @@ -45,21 +38,31 @@ This package contains the Python 3 module. %prep -%autosetup +%autosetup -n fmf-%{version} + + +%generate_buildrequires +%pyproject_buildrequires -x tests %{?epel:-w} %build -%py3_build +%pyproject_wheel +cp docs/header.txt man.rst +tail -n+7 README.rst >> man.rst +rst2man man.rst > fmf.1 %install -%py3_install +%pyproject_install +%pyproject_save_files fmf + mkdir -p %{buildroot}%{_mandir}/man1 install -pm 644 fmf.1* %{buildroot}%{_mandir}/man1 %check -%{__python3} -m pytest -vv -c tests/unit/pytest.ini -m 'not web' +%pytest -vv \ + -m 'not web' %{!?_licensedir:%global license %%doc} @@ -70,10 +73,12 @@ install -pm 644 fmf.1* %{buildroot}%{_mandir}/man1 %doc README.rst examples %license LICENSE -%files -n python%{python3_pkgversion}-%{name} -%{python3_sitelib}/%{name}/ -%{python3_sitelib}/%{name}-*.egg-info +%files -n python3-fmf -f %{pyproject_files} +# Epel9 does not tag the license file in pyproject_files as a license. Manually install it in this case +%if 0%{?el9} %license LICENSE +%endif +%doc README.rst %changelog diff --git a/fmf/__init__.py b/fmf/__init__.py index bdca5f5d..12af8b89 100644 --- a/fmf/__init__.py +++ b/fmf/__init__.py @@ -1,14 +1,17 @@ """ Flexible Metadata Format """ -# Version is replaced before building the package -__version__ = 'running from the source' +from __future__ import annotations + +import importlib.metadata + +from fmf.base import Tree +from fmf.context import Context +from fmf.utils import filter + +__version__ = importlib.metadata.version("fmf") __all__ = [ "Context", "Tree", "filter", ] - -from fmf.base import Tree -from fmf.context import Context -from fmf.utils import filter diff --git a/fmf/base.py b/fmf/base.py index c9c7a454..d6216b16 100644 --- a/fmf/base.py +++ b/fmf/base.py @@ -1,11 +1,28 @@ """ Base Metadata Classes """ +from __future__ import annotations + import copy import os import re import subprocess +import sys +from collections.abc import (Generator, ItemsView, Iterator, KeysView, Mapping, + ValuesView) from io import open from pprint import pformat as pretty +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + # TODO: py3.10: typing.Optional, typing.Union -> '|' operator + from typing import Any, Optional, Union + + from _typeshed import StrPath + + if sys.version_info >= (3, 10): + from typing import Self, TypeAlias + else: + from typing_extensions import TypeAlias, Self import jsonschema from ruamel.yaml import YAML @@ -24,15 +41,44 @@ MAIN = "main" + SUFFIX IGNORED_DIRECTORIES = ['/dev', '/proc', '/sys'] +if TYPE_CHECKING: + # TypeHints + RawDataType: TypeAlias = Union[None, int, float, str, bool] + ListDataType: TypeAlias = list[Union[RawDataType, 'ListDataType', 'DictDataType']] + DictDataType: TypeAlias = dict[str, Union[RawDataType, ListDataType, 'DictDataType']] + # Equivalent to: + # JSON: TypeAlias = dict[str, "JSON"] | list["JSON"] | str | int | float | bool | None + DataType: TypeAlias = Union[RawDataType, ListDataType, DictDataType] + TreeData: TypeAlias = dict[str, DataType] + TreeDataPath: TypeAlias = Union[TreeData, str] # Either TreeData or path + JsonSchema: TypeAlias = Mapping[str, Any] + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Metadata # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Cannot specify class Tree(Mapping[str, Tree | DataType]]): +# This has a different .get method interface incompatible with mypy class Tree: """ Metadata Tree """ - - def __init__(self, data, name=None, parent=None): + parent: Optional[Tree] + children: dict[str, Tree] + data: TreeData + sources: list[str] + root: Optional[str] + version: int + original_data: TreeData + name: str + _commit: Optional[Union[str, bool]] + _raw_data: TreeData + _updated: bool + _directives: TreeData + _symlinkdirs: list[str] + + def __init__(self, data: Optional[TreeDataPath], + name: Optional[str] = None, + parent: Optional[Tree] = None): """ Initialize metadata tree from directory path or data dictionary @@ -41,7 +87,7 @@ def __init__(self, data, name=None, parent=None): """ # Bail out if no data and no parent given - if not data and not parent: + if not data and parent is None: raise utils.GeneralError( "No data or parent provided to initialize the tree.") @@ -73,11 +119,13 @@ def __init__(self, data, name=None, parent=None): if self.parent is None: self.name = "/" if not isinstance(data, dict): + assert data is not None self._initialize(path=data) data = self.root # Handle child node creation else: self.root = self.parent.root + assert name is not None self.name = os.path.join(self.parent.name, name) # Update data from a dictionary (handle empty nodes) @@ -92,10 +140,21 @@ def __init__(self, data, name=None, parent=None): if self.parent is None: self.inherit() - log.debug("New tree '{0}' created.".format(self)) + log.debug(f"New tree '{self}' created.") + + @property + def rel_name(self) -> str: + if self.parent is None: + assert self.name == "/" + return "/" + parent_name = self.parent.name + if not parent_name.endswith('/'): + parent_name = f"{parent_name}/" + assert parent_name in self.name + return self.name.removeprefix(parent_name) @property - def commit(self): + def commit(self) -> Union[str, bool]: """ Commit hash if tree grows under a git repo, False otherwise @@ -116,15 +175,16 @@ def commit(self): output, _ = utils.run( ['git', 'rev-parse', '--verify', 'HEAD'], cwd=self.root) self._commit = output.strip() + return self._commit except subprocess.CalledProcessError: self._commit = False - return self._commit + return self._commit def __str__(self): """ Use tree name as identifier """ return self.name - def _initialize(self, path): + def _initialize(self, path: str) -> None: """ Find metadata tree root, detect format version """ # Find the tree root root = os.path.abspath(path) @@ -132,72 +192,75 @@ def _initialize(self, path): while ".fmf" not in next(os.walk(root))[1]: if root == "/": raise utils.RootError( - "Unable to find tree root for '{0}'.".format( - os.path.abspath(path))) + f"Unable to find tree root for '{os.path.abspath(path)}'.") root = os.path.abspath(os.path.join(root, os.pardir)) except StopIteration: - raise utils.FileError("Invalid directory path: {0}".format(root)) - log.info("Root directory found: {0}".format(root)) + raise utils.FileError(f"Invalid directory path: {root}") + log.info(f"Root directory found: {root}") self.root = root # Detect format version try: with open(os.path.join(self.root, ".fmf", "version")) as version: self.version = int(version.read()) - log.info("Format version detected: {0}".format(self.version)) + log.info(f"Format version detected: {self.version}") except IOError as error: raise utils.FormatError( - "Unable to detect format version: {0}".format(error)) + f"Unable to detect format version: {error}") except ValueError: raise utils.FormatError("Invalid version format") - def _merge_plus(self, data, key, value, prepend=False): + def _merge_plus(self, data: TreeData, key: str, + value: DataType, prepend: bool = False) -> None: """ Handle extending attributes using the '+' suffix """ - # Nothing to do if key not in parent - if key not in data: - data[key] = value - return - # Use the special merge for merging dictionaries - if type(data[key]) == type(value) == dict: - self._merge_special(data[key], value) - return - # Attempt to apply the plus operator try: + # Nothing to do if key not in parent + if key not in data: + data[key] = value + return + # Use the special merge for merging dictionaries + data_val = data[key] + if type(data_val) == type(value) == dict: + self._merge_special(data_val, value) + data[key] = data_val + return + # Attempt to apply the plus operator if prepend: - data[key] = value + data[key] + data_val = value + data_val # type: ignore else: - data[key] = data[key] + value - except TypeError as error: - raise utils.MergeError( - "MergeError: Key '{0}' in {1} ({2}).".format( - key, self.name, str(error))) + data_val = data_val + value # type: ignore + data[key] = data_val + except TypeError as err: + raise utils.MergeError(f"MergeError: Key '{key}' in {self.name}.") from err - def _merge_minus(self, data, key, value): + def _merge_minus(self, data: TreeData, key: str, value: DataType) -> None: """ Handle reducing attributes using the '-' suffix """ - # Cannot reduce attribute if key is not present in parent - if key not in data: - data[key] = value - raise utils.MergeError( - "MergeError: Key '{0}' in {1} (not inherited).".format( - key, self.name)) - # Subtract numbers - if type(data[key]) == type(value) in [int, float]: - data[key] = data[key] - value - # Replace matching regular expression with empty string - elif type(data[key]) == type(value) == type(""): - data[key] = re.sub(value, '', data[key]) - # Remove given values from the parent list - elif type(data[key]) == type(value) == list: - data[key] = [item for item in data[key] if item not in value] - # Remove given key from the parent dictionary - elif isinstance(data[key], dict) and isinstance(value, list): - for item in value: - data[key].pop(item, None) - else: - raise utils.MergeError( - "MergeError: Key '{0}' in {1} (wrong type).".format( - key, self.name)) + try: + # Cannot reduce attribute if key is not present in parent + if key not in data: + data[key] = value + raise utils.MergeError(f"MergeError: Key '{key}' in {self.name} (not inherited).") + # Subtract numbers + data_val = data[key] + if type(data_val) == type(value) in [int, float]: + data_val -= value # type: ignore + # Replace matching regular expression with empty string + elif type(data_val) == type(value) == str: + data_val = re.sub(value, '', data_val) + # Remove given values from the parent list + elif isinstance(data_val, list) and isinstance(value, list): + data_val = [item for item in data_val if item not in value] + # Remove given key from the parent dictionary + elif isinstance(data_val, dict) and isinstance(value, list): + for item in value: + assert isinstance(item, str) + data_val.pop(item, None) + else: + raise TypeError(f"Incompatible types: {type(data_val)} - {type(value)}") + data[key] = data_val + except TypeError as err: + raise utils.MergeError(f"MergeError: Key '{key}' in {self.name}.") from err - def _merge_special(self, data, source): + def _merge_special(self, data: TreeData, source: TreeData) -> None: """ Merge source dict into data, handle special suffixes """ for key, value in sorted(source.items()): # Handle special attribute merging @@ -211,10 +274,10 @@ def _merge_special(self, data, source): else: data[key] = value - def _process_directives(self, directives): + def _process_directives(self, directives: TreeData) -> None: """ Check and process special fmf directives """ - def check(value, type_, name=None): + def check(value: DataType, type_: type, name: Optional[str] = None) -> None: """ Check for correct type """ if not isinstance(value, type_): name = f" '{name}'" if name else "" @@ -239,22 +302,21 @@ def check(value, type_, name=None): self._directives.update(directives) @staticmethod - def init(path): + def init(path: str) -> str: """ Create metadata tree root under given path """ root = os.path.abspath(os.path.join(path, ".fmf")) if os.path.exists(root): - raise utils.FileError("{0} '{1}' already exists.".format( - "Directory" if os.path.isdir(root) else "File", root)) + raise utils.FileError( + f"{'Directory' if os.path.isdir(root) else 'File'} '{root}' already exists.") try: os.makedirs(root) with open(os.path.join(root, "version"), "w") as version: - version.write("{0}\n".format(utils.VERSION)) + version.write(f"{utils.VERSION}\n") except OSError as error: - raise utils.FileError("Failed to create '{}': {}.".format( - root, error)) + raise utils.FileError(f"Failed to create '{root}': {error}.") return root - def merge(self, parent=None): + def merge(self, parent: Optional[Tree] = None) -> None: """ Merge parent data """ # Check parent, append source files if parent is None: @@ -270,19 +332,19 @@ def merge(self, parent=None): self._merge_special(data, self.data) self.data = data - def inherit(self): + def inherit(self) -> None: """ Apply inheritance """ # Preserve original data and merge parent # (original data needed for custom inheritance extensions) self.original_data = self.data self.merge() - log.debug("Data for '{0}' inherited.".format(self)) + log.debug(f"Data for '{self}' inherited.") log.data(pretty(self.data)) # Apply inheritance to all children for child in self.children.values(): child.inherit() - def update(self, data): + def update(self, data: Optional[TreeData]) -> None: """ Update metadata, handle virtual hierarchy """ # Make a note that the data dictionary has been updated # None is handled in the same way as an empty dictionary @@ -294,7 +356,7 @@ def update(self, data): # Handle fmf directives first try: directives = data.pop("/") - self._process_directives(directives) + self._process_directives(directives) # type: ignore except KeyError: pass @@ -313,14 +375,18 @@ def update(self, data): name = match.groups()[0] value = {match.groups()[1]: value} # Update existing child or create a new one + assert isinstance(value, dict) or isinstance(value, str) or value is None self.child(name, value) # Update regular attributes else: self.data[key] = value - log.debug("Data for '{0}' updated.".format(self)) + log.debug(f"Data for '{self}' updated.") log.data(pretty(self.data)) - def adjust(self, context, key='adjust', undecided='skip'): + def adjust(self, + context: fmf.context.Context, + key: str = 'adjust', + undecided: str = 'skip') -> None: """ Adjust tree data based on provided context and rules @@ -339,20 +405,20 @@ class describing the environment context. By default, the key # Check context sanity if not isinstance(context, fmf.context.Context): raise utils.GeneralError( - "Invalid adjust context: '{}'.".format(type(context).__name__)) + f"Invalid adjust context: '{type(context).__name__}'.") # Adjust rules should be a dictionary or a list of dictionaries try: rules = copy.deepcopy(self.data[key]) - log.debug("Applying adjust rules for '{}'.".format(self)) - log.data(rules) + log.debug(f"Applying adjust rules for '{self}'.") + log.data(str(rules)) if isinstance(rules, dict): rules = [rules] if not isinstance(rules, list): raise utils.FormatError( - "Invalid adjust rule format in '{}'. " - "Should be a dictionary or a list of dictionaries, " - "got '{}'.".format(self.name, type(rules).__name__)) + f"Invalid adjust rule format in '{self.name}'. " + f"Should be a dictionary or a list of dictionaries, " + f"got '{type(rules).__name__}'.") except KeyError: rules = [] @@ -369,12 +435,13 @@ class describing the environment context. By default, the key except KeyError: condition = True + assert isinstance(condition, str) or isinstance(condition, bool) # The optional 'continue' key should be a bool continue_ = rule.pop('continue', True) if not isinstance(continue_, bool): raise utils.FormatError( - "The 'continue' value should be bool, " - "got '{}'.".format(continue_)) + f"The 'continue' value should be bool, " + f"got '{continue_}'.") # The 'because' key is reserved for optional comments (ignored) rule.pop('because', None) @@ -395,14 +462,15 @@ class describing the environment context. By default, the key raise else: raise utils.GeneralError( - "Invalid value for the 'undecided' parameter. Should " - "be 'skip' or 'raise', got '{}'.".format(undecided)) + f"Invalid value for the 'undecided' parameter. Should " + f"be 'skip' or 'raise', got '{undecided}'.") # Adjust all child nodes as well for child in self.children.values(): child.adjust(context, key, undecided) - def get(self, name=None, default=None): + def get(self, name: Optional[Union[list[str], str]] = None, + default: DataType = None) -> DataType: """ Get attribute value or return default @@ -426,12 +494,13 @@ def get(self, name=None, default=None): data = self.data try: for key in name: - data = data[key] + data = data[key] # type: ignore except KeyError: return default return data - def child(self, name, data, source=None): + def child(self, name: str, data: Optional[TreeDataPath], + source: Optional[str] = None) -> None: """ Create or update child with given data """ try: # Update data from a dictionary (handle empty nodes) @@ -445,9 +514,13 @@ def child(self, name, data, source=None): # Save source file if source is not None: self.children[name].sources.append(source) - self.children[name]._raw_data = copy.deepcopy(data) + if data is None: + self.children[name]._raw_data = {} + else: + assert isinstance(data, dict) + self.children[name]._raw_data = copy.deepcopy(data) - def grow(self, path): + def grow(self, path: str) -> None: """ Grow the metadata tree for the given directory path @@ -458,14 +531,13 @@ def grow(self, path): if path != '/': path = path.rstrip("/") if path in IGNORED_DIRECTORIES: # pragma: no cover - log.debug("Ignoring '{0}' (special directory).".format(path)) + log.debug(f"Ignoring '{path}' (special directory).") return - log.info("Walking through directory {0}".format( - os.path.abspath(path))) + log.info(f"Walking through directory {os.path.abspath(path)}") try: dirpath, dirnames, filenames = next(os.walk(path)) except StopIteration: - log.debug("Skipping '{0}' (not accessible).".format(path)) + log.debug(f"Skipping '{path}' (not accessible).") return # Investigate main.fmf as the first file (for correct inheritance) filenames = sorted( @@ -479,7 +551,7 @@ def grow(self, path): if filename.startswith("."): continue fullpath = os.path.abspath(os.path.join(dirpath, filename)) - log.info("Checking file {0}".format(fullpath)) + log.info(f"Checking file {fullpath}") try: with open(fullpath, encoding='utf-8') as datafile: # Workadound ruamel s390x read issue - fmf/issues/164 @@ -509,14 +581,14 @@ def grow(self, path): # more than one node fullpath = os.path.realpath(fulldir) if fullpath in self._symlinkdirs: - log.debug("Not entering symlink loop {}".format(fulldir)) + log.debug(f"Not entering symlink loop {fulldir}") continue else: self._symlinkdirs.append(fullpath) # Ignore metadata subtrees if os.path.isdir(os.path.join(path, dirname, SUFFIX)): - log.debug("Ignoring metadata tree '{0}'.".format(dirname)) + log.debug(f"Ignoring metadata tree '{dirname}'.") continue self.child(dirname, os.path.join(path, dirname)) # Ignore directories with no metadata (remove all child nodes which @@ -525,9 +597,9 @@ def grow(self, path): child = self.children[name] if not child.children and not child._updated: del self.children[name] - log.debug("Empty tree '{0}' removed.".format(child.name)) + log.debug(f"Empty tree '{child.name}' removed.") - def climb(self, whole=False): + def climb(self, whole: bool = False) -> Iterator[Tree]: """ Climb through the tree (iterate leaf/all nodes) """ if whole or not self.children: yield self @@ -535,15 +607,19 @@ def climb(self, whole=False): for node in child.climb(whole): yield node - def find(self, name): + def find(self, name: str) -> Optional[Tree]: """ Find node with given name """ for node in self.climb(whole=True): if node.name == name: return node return None - def prune(self, whole=False, keys=None, names=None, filters=None, - conditions=None, sources=None): + def prune(self, whole: bool = False, + keys: Optional[list[str]] = None, + names: Optional[list[str]] = None, + filters: Optional[list[str]] = None, + conditions: Optional[list[str]] = None, + sources: Optional[list[str]] = None) -> Iterator[Tree]: """ Filter tree nodes based on given criteria """ keys = keys or [] names = names or [] @@ -551,8 +627,9 @@ def prune(self, whole=False, keys=None, names=None, filters=None, conditions = conditions or [] # Expand paths to absolute + sources_set = set() if sources: - sources = {os.path.abspath(src) for src in sources} + sources_set = {os.path.abspath(src) for src in sources} for node in self.climb(whole): # Select only nodes with key content @@ -563,7 +640,7 @@ def prune(self, whole=False, keys=None, names=None, filters=None, [re.search(name, node.name) for name in names]): continue # Select nodes defined by any of the source files - if sources and not sources.intersection(node.sources): + if sources_set and not sources_set.intersection(node.sources): continue # Apply filters and conditions if given try: @@ -579,41 +656,43 @@ def prune(self, whole=False, keys=None, names=None, filters=None, # All criteria met, thus yield the node yield node - def show(self, brief=False, formatting=None, values=None): + def show( + self, + brief: bool = False, + formatting: Optional[str] = None, + values: Optional[list[str]] = None) -> str: """ Show metadata """ values = values or [] # Custom formatting if formatting is not None: formatting = re.sub("\\\\n", "\n", formatting) - name = self.name # noqa: F841 - data = self.data # noqa: F841 - root = self.root # noqa: F841 + name = self.name # noqa: F841 + data = self.data # noqa: F841 + root = self.root # noqa: F841 sources = self.sources # noqa: F841 evaluated = [] - for value in values: - evaluated.append(eval(value)) + for str_v in values: + evaluated.append(eval(str_v)) return formatting.format(*evaluated) # Show the name output = utils.color(self.name, 'red') if brief or not self.data: - return output + "\n" + return f"{output}\n" # List available attributes - for key, value in sorted(self.data.items()): - output += "\n{0}: ".format(utils.color(key, 'green')) - if isinstance(value, type("")): - output += value.rstrip("\n") - elif isinstance(value, list) and all( - [isinstance(item, type("")) for item in value]): - output += utils.listed(value) + for key, val in sorted(self.data.items()): + output += f"\n{utils.color(key, 'green')}: " + if isinstance(val, str): + output += val.rstrip("\n") + elif isinstance(val, list) and all(isinstance(item, str) for item in val): + output += utils.listed(val) # type: ignore else: - output += pretty(value) - output - return output + "\n" + output += pretty(val) + return f"{output}\n" @staticmethod - def node(reference): + def node(reference: TreeData) -> Tree: """ Return Tree node referenced by the fmf identifier @@ -632,23 +711,23 @@ def node(reference): # Fetch remote git repository if 'url' in reference: tree = utils.fetch_tree( - reference.get('url'), - reference.get('ref'), - reference.get('path', '.').lstrip('/')) + str(reference.get('url')), + reference.get('ref'), # type: ignore + str(reference.get('path', '.')).lstrip('/')) # Use local files else: - root = reference.get('path', '.') + root = str(reference.get('path', '.')) if not root.startswith('/') and root != '.': raise utils.ReferenceError( 'Relative path "%s" specified.' % root) tree = Tree(root) - found_node = tree.find(reference.get('name', '/')) + found_node = tree.find(str(reference.get('name', '/'))) if found_node is None: - raise utils.ReferenceError( - "No tree node found for '{0}' reference".format(reference)) + raise utils.ReferenceError(f"No tree node found for '{reference}' reference") + assert isinstance(found_node, Tree) return found_node - def copy(self): + def copy(self) -> Tree: """ Create and return a deep copy of the node and its subtree @@ -663,7 +742,10 @@ def copy(self): self.parent = duplicate.parent = original_parent return duplicate - def validate(self, schema, schema_store=None): + def validate(self, + schema: JsonSchema, + schema_store: Optional[dict[str, + Any]] = None) -> utils.JsonSchemaValidationResult: """ Validate node data with given JSON Schema and schema references. @@ -698,14 +780,14 @@ def validate(self, schema, schema_store=None): # Schema file is invalid except ( - jsonschema.exceptions.SchemaError, - jsonschema.exceptions.RefResolutionError, - jsonschema.exceptions.UnknownType + jsonschema.exceptions.SchemaError, + jsonschema.exceptions.RefResolutionError, + jsonschema.exceptions.UnknownType ) as error: raise utils.JsonSchemaError( f'Errors found in provided schema: {error}') - def _locate_raw_data(self): + def _locate_raw_data(self) -> tuple[TreeData, TreeData, str]: """ Detect location of raw data from which the node has been created @@ -721,7 +803,7 @@ def _locate_raw_data(self): """ # List of node names in the virtual hierarchy - hierarchy = list() + hierarchy: list[str] = [] # Find the closest parent with raw data defined node = self @@ -743,16 +825,17 @@ def _locate_raw_data(self): for key in hierarchy: # Create a virtual hierarchy level if missing if key not in node_data: - node_data[key] = dict() + node_data[key] = {} # Initialize as an empty dict if leaf node is empty if node_data[key] is None: - node_data[key] = dict() - node_data = node_data[key] + node_data[key] = {} + assert isinstance(node_data, dict) + node_data = node_data[key] # type: ignore # The full raw data were read from the last source return node_data, full_data, node.sources[-1] - def __enter__(self): + def __enter__(self) -> TreeData: """ Experimental: Modify metadata and store changes to disk @@ -783,7 +866,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): with open(source, "w", encoding='utf-8') as file: file.write(dict_to_yaml(full_data)) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Union[DataType, Tree]: """ Dictionary method to get child node or data item @@ -794,3 +877,81 @@ def __getitem__(self, key): return self.children[key[1:]] else: return self.data[key] + + def __len__(self) -> int: + return len(self.children) + len(self.data) + + def __iter__(self) -> Iterator[str]: + for c in self.children: + yield f"/{c}" + for d in self.data: + yield d + + def __contains__(self, item: str) -> bool: + if item.startswith("/"): + return item[1:] in self.children + else: + return item in self.data + + def keys(self) -> KeysView[str]: + return self.data.keys() + + def values(self) -> ValuesView[DataType]: + return self.data.values() + + def items(self) -> ItemsView[str, DataType]: + return self.data.items() + + def __fspath__(self) -> str: + path = self.name + parent = self.parent + while parent is not None: + path = f"{parent}/{path}" + return path + + def __truediv__(self: Self, key: StrPath) -> Self: + # TODO: Properly resolve path navigation e.g. /path/../other_path + key_str = str(key) + if not key_str.startswith('/'): + key_str = f"/{key_str}" + child = self[key_str] + assert isinstance(child, self.__class__) + return child + + def __rtruediv__(self: Self, key: StrPath) -> Self: + # TODO: Implement if virtual tree is possible + raise NotImplementedError + + def joinpath(self: Self, *other: StrPath) -> Self: + # TODO: Properly resolve path navigation e.g. /path/../other_path + str_path = '/'.join(list(*other)) + str_path = f"/{str_path}" + child = self[str_path] + assert isinstance(child, self.__class__) + return child + + def iterdir(self) -> Generator[Tree, None, None]: + yield from self.children.values() + + def walk(self, top_down: bool = True) -> Iterator[tuple[Self, list[str], list[str]]]: + paths: list[Union[Self, tuple[Self, list[str], list[str]]]] = [self] + while paths: + path = paths.pop() + if isinstance(path, tuple): + yield path + continue + + branch_names: list[str] = [] + leaf_names: list[str] = [] + for child in path.children.values(): + if child.children: + branch_names.append(child.rel_name) + else: + leaf_names.append(child.rel_name) + + if top_down: + yield path, branch_names, leaf_names + else: + paths.append((path, branch_names, leaf_names)) + assert isinstance(path, Tree) + paths += [path[f"/{b}"] for b in reversed(branch_names)] # type: ignore diff --git a/fmf/cli.py b/fmf/cli.py index 7fd1804d..67e57470 100644 --- a/fmf/cli.py +++ b/fmf/cli.py @@ -21,6 +21,7 @@ import os.path import shlex import sys +from typing import Optional import fmf import fmf.utils as utils @@ -32,8 +33,9 @@ class Parser: """ Command line options parser """ + arguments: list[str] - def __init__(self, arguments=None, path=None): + def __init__(self, arguments: Optional[list[str]] = None, path: Optional[str] = None): """ Prepare the parser. """ # Change current working directory (used for testing) if path is not None: @@ -69,7 +71,7 @@ def __init__(self, arguments=None, path=None): self.output = "" getattr(self, "command_" + self.command)() - def options_select(self): + def options_select(self) -> None: """ Select by name, filter """ group = self.parser.add_argument_group("Select") group.add_argument( @@ -92,7 +94,7 @@ def options_select(self): "--whole", dest="whole", action="store_true", help="Consider the whole tree (leaves only by default)") - def options_formatting(self): + def options_formatting(self) -> None: """ Formating options """ group = self.parser.add_argument_group("Format") group.add_argument( @@ -102,7 +104,7 @@ def options_formatting(self): "--value", dest="values", action="append", default=[], help="Values for the custom formatting string") - def options_utils(self): + def options_utils(self) -> None: """ Utilities """ group = self.parser.add_argument_group("Utils") group.add_argument( @@ -115,7 +117,7 @@ def options_utils(self): "--debug", action="store_true", help="Turn on debugging output, do not catch exceptions") - def command_ls(self): + def command_ls(self) -> None: """ List names """ self.parser = argparse.ArgumentParser( description="List names of available objects") @@ -124,13 +126,13 @@ def command_ls(self): self.options = self.parser.parse_args(self.arguments[2:]) self.show(brief=True) - def command_clean(self): + def command_clean(self) -> None: """ Clean cache """ self.parser = argparse.ArgumentParser( description="Remove cache directory and its content") self.clean() - def command_show(self): + def command_show(self) -> None: """ Show metadata """ self.parser = argparse.ArgumentParser( description="Show metadata of available objects") @@ -140,7 +142,7 @@ def command_show(self): self.options = self.parser.parse_args(self.arguments[2:]) self.show(brief=False) - def command_init(self): + def command_init(self) -> None: """ Initialize tree """ self.parser = argparse.ArgumentParser( description="Initialize a new metadata tree") @@ -151,7 +153,7 @@ def command_init(self): root = fmf.Tree.init(path) print("Metadata tree '{0}' successfully initialized.".format(root)) - def show(self, brief=False): + def show(self, brief: bool = False) -> None: """ Show metadata for each path given """ output = [] for path in self.options.paths or ["."]: @@ -190,7 +192,7 @@ def show(self, brief=False): utils.listed(len(output), "object"))) self.output = joined - def clean(self): + def clean(self) -> None: """ Remove cache directory """ try: cache = utils.get_cache_directory(create=False) @@ -205,7 +207,16 @@ def clean(self): # Main # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def main(arguments=None, path=None): +def main(arguments: Optional[list[str]] = None, path: Optional[str] = None) -> str: """ Parse options, do what is requested """ parser = Parser(arguments, path) return parser.output + + +def cli_entry(): + try: + main() + except fmf.utils.GeneralError as error: + if "--debug" not in sys.argv: + fmf.utils.log.error(error) + raise diff --git a/fmf/context.py b/fmf/context.py index 66001e76..e84d3188 100644 --- a/fmf/context.py +++ b/fmf/context.py @@ -16,7 +16,22 @@ See https://fmf.readthedocs.io/en/latest/modules.html#fmf.Tree.adjust """ +from __future__ import annotations + import re +import sys +from collections.abc import Callable +# TODO: py3.10: typing.Optional, typing.Union -> '|' operator +from typing import Any, Optional, Union + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +# TypeHints +ExpressionType: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list['ContextValue']]] +ExpressionType_raw: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list[str]]] class CannotDecide(Exception): @@ -34,7 +49,7 @@ class InvalidContext(Exception): class ContextValue: """ Value for dimension """ - def __init__(self, origin): + def __init__(self, origin: Union[str, tuple[str, ...]]): """ ContextValue("foo-1.2.3") ContextValue(["foo", "1", "2", "3"]) @@ -44,22 +59,26 @@ def __init__(self, origin): else: self._to_compare = self._split_to_version(origin) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, self.__class__): return self._to_compare == other._to_compare else: return False - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not self.__eq__(other) - def __str__(self): + def __str__(self) -> str: return str(self._to_compare) - def __repr__(self): - return "{}({})".format(self.__class__.__name__, repr(self._to_compare)) + def __repr__(self) -> str: + return f"{self.__class__.__name__}({repr(self._to_compare)})" - def version_cmp(self, other, minor_mode=False, ordered=True): + def version_cmp( + self, + other: ContextValue, + minor_mode: bool = False, + ordered: bool = True) -> int: """ Comparing two ContextValue objects @@ -138,7 +157,7 @@ def version_cmp(self, other, minor_mode=False, ordered=True): return -1 # other is larger (more pars) @staticmethod - def compare(first, second): + def compare(first: str, second: str) -> int: """ compare two version parts """ # Ideally use `from packaging import version` but we need older # python support too so very rough @@ -146,16 +165,15 @@ def compare(first, second): # convert to int first_version = int(first) second_version = int(second) + return ( + (first_version > second_version) - + (first_version < second_version)) except ValueError: # fallback to compare as strings - first_version = first - second_version = second - return ( - (first_version > second_version) - - (first_version < second_version)) + return (first > second) - (first < second) @staticmethod - def _split_to_version(text): + def _split_to_version(text: str) -> tuple[str, ...]: """ Try to split text into name + version parts @@ -173,7 +191,6 @@ def _split_to_version(text): :param text: original value :return: tuple of name followed by version parts - :rtype: tuple """ return tuple(re.split(r":|-|\.", text)) @@ -183,119 +200,121 @@ def __hash__(self): class Context: """ Represents https://fmf.readthedocs.io/en/latest/context.html """ + # Operators' definitions - def _op_defined(self, dimension_name, values): + def _op_defined(self, dimension_name: str, values: Any) -> bool: """ 'is defined' operator """ return dimension_name in self._dimensions - def _op_not_defined(self, dimension_name, values): + def _op_not_defined(self, dimension_name: str, values: Any) -> bool: """ 'is not defined' operator """ return dimension_name not in self._dimensions - def _op_eq(self, dimension_name, values): + def _op_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=False) == 0 return self._op_core(dimension_name, values, comparator) - def _op_not_eq(self, dimension_name, values): + def _op_not_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '!=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=False) != 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_eq(self, dimension_name, values): + def _op_minor_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=False) == 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_not_eq(self, dimension_name, values): + def _op_minor_not_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~!=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=False) != 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_less_or_eq(self, dimension_name, values): + def _op_minor_less_or_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~<=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) <= 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_less(self, dimension_name, values): + def _op_minor_less(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~<' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) < 0 return self._op_core(dimension_name, values, comparator) - def _op_less(self, dimension_name, values): + def _op_less(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '<' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) < 0 return self._op_core(dimension_name, values, comparator) - def _op_less_or_equal(self, dimension_name, values): + def _op_less_or_equal(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '<=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) <= 0 return self._op_core(dimension_name, values, comparator) - def _op_greater_or_equal(self, dimension_name, values): + def _op_greater_or_equal(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '>=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) >= 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_greater_or_equal(self, dimension_name, values): + def _op_minor_greater_or_equal(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~>=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) >= 0 return self._op_core(dimension_name, values, comparator) - def _op_greater(self, dimension_name, values): + def _op_greater(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '>' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) > 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_greater(self, dimension_name, values): + def _op_minor_greater(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~>' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) > 0 return self._op_core(dimension_name, values, comparator) - def _op_core(self, dimension_name, values, comparator): + def _op_core(self, dimension_name: str, values: list[ContextValue], + comparator: Callable[[ContextValue, ContextValue], bool]) -> bool: """ Evaluate value from dimension vs target values combination @@ -321,7 +340,7 @@ def _op_core(self, dimension_name, values, comparator): raise CannotDecide("No values could be compared.") except KeyError: raise CannotDecide( - "Dimension {0} is not defined.".format(dimension_name)) + f"Dimension {dimension_name} is not defined.") operator_map = { "is defined": _op_defined, @@ -362,6 +381,7 @@ def _op_core(self, dimension_name, values, comparator): # To split by 'or' operator re_or_split = re.compile(r'\bor\b') + _dimensions: dict[str, set[ContextValue]] def __init__(self, *args, **kwargs): """ @@ -384,17 +404,20 @@ def __init__(self, *args, **kwargs): for dim, op, values in definition[0]: if op != "==": raise InvalidContext() + assert dim is not None + assert values is not None self._dimensions[dim] = set(values) # Initialized with dimension=value(s) for dimension_name, values in kwargs.items(): if not isinstance(values, list): + assert values is not None values = [values] self._dimensions[dimension_name] = set( [self.parse_value(val) for val in values] ) @staticmethod - def parse_rule(rule): + def parse_rule(rule: Union[str, bool]) -> list[list[ExpressionType]]: """ Parses rule into expressions @@ -408,7 +431,6 @@ def parse_rule(rule): expr_6 and expr_7 is returned as [[expr_6, expr_7]] :param rule: rule to parse - :type rule: str | bool :return: nested list of expressions from the rule :raises InvalidRule: Syntax error in the rule """ @@ -437,37 +459,36 @@ def parse_rule(rule): return parsed_rule @staticmethod - def parse_value(value): + def parse_value(value: Any) -> ContextValue: """ Single place to convert to ContextValue """ return ContextValue(str(value)) @staticmethod - def split_rule_to_groups(rule): + def split_rule_to_groups(rule: str) -> list[list[str]]: """ Split rule into nested lists, no real parsing expr0 and expr1 or expr2 is split into [[expr0, expr1], [expr2]] :param rule: rule to split - :type rule: str :raises InvalidRule: Syntax error in the rule """ rule_parts = [] for or_group in Context.re_or_split.split(rule): if not or_group: - raise InvalidRule("Empty OR expression in {}.".format(rule)) + raise InvalidRule(f"Empty OR expression in {rule}.") and_group = [] for part in Context.re_and_split.split(or_group): part_stripped = part.strip() if not part_stripped: raise InvalidRule( - "Empty AND expression in {}.".format(rule)) + f"Empty AND expression in {rule}.") and_group.append(part_stripped) rule_parts.append(and_group) return rule_parts @staticmethod - def split_expression(expression): + def split_expression(expression: str) -> ExpressionType_raw: """ Split expression to dimension name, operator and values @@ -475,19 +496,17 @@ def split_expression(expression): of the list of values. :param expression: expression to split - :type expression: str :raises InvalidRule: When expression cannot be split, e.g. syntax error :return: tuple(dimension name, operator, list of values) - :rtype: tuple(str|None, str|bool, list|None) """ # true/false match = Context.re_boolean.match(expression) if match: # convert to bool and return expression tuple if match.group(1)[0].lower() == 't': - return (None, True, None) + return None, True, None else: - return (None, False, None) + return None, False, None # Triple expressions match = Context.re_expression_triple.match(expression) if match: @@ -497,10 +516,10 @@ def split_expression(expression): # Double expressions match = Context.re_expression_double.match(expression) if match: - return (match.group(1), match.group(2), None) - raise InvalidRule("Cannot parse expression '{}'.".format(expression)) + return match.group(1), match.group(2), None + raise InvalidRule(f"Cannot parse expression '{expression}'.") - def matches(self, rule): + def matches(self, rule: Union[str, bool]) -> bool: """ Does the rule match the current Context? @@ -512,8 +531,6 @@ def matches(self, rule): CannotDecide or False == False or CannotDecide == CannotDecide :param rule: Single rule to decide - :type rule: str | bool - :rtype: bool :raises CannotDecide: Impossible to decide the rule wrt current Context, e.g. dimension is missing :raises InvalidRule: Syntax error in the rule @@ -546,7 +563,7 @@ def matches(self, rule): break # Just making sure, parse_rule should have raised it already assert and_valid, ( - "Malformed expression: Missing AND part in {0}".format(rule)) + f"Malformed expression: Missing AND part in {rule}") # AND group finished as True, no need to process the rest of # OR groups if and_outcome is True: @@ -564,14 +581,15 @@ def matches(self, rule): valid = True # Just making sure, parse_rule should have raised it already assert valid, ( - "Malformed expression: Missing OR part in {0}".format(rule)) + f"Malformed expression: Missing OR part in {rule}") if final_outcome is False: return False else: raise CannotDecide() # It's up to callee how to treat this - def evaluate(self, expression): + def evaluate(self, expression: ExpressionType) -> bool: dimension_name, operator, values = expression if isinstance(operator, bool): return operator + assert dimension_name is not None return self.operator_map[operator](self, dimension_name, values) diff --git a/fmf/py.typed b/fmf/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/fmf/utils.py b/fmf/utils.py index a6e53730..2ec2f6f1 100644 --- a/fmf/utils.py +++ b/fmf/utils.py @@ -1,5 +1,7 @@ """ Logging, config, constants & utilities """ +from __future__ import annotations + import copy import logging import os @@ -9,8 +11,11 @@ import sys import time import warnings +from collections.abc import Callable from io import StringIO -from typing import Any, List, NamedTuple +from logging import Logger as _Logger +# TODO: py3.10: typing.Optional, typing.Union -> '|' operator +from typing import Any, NamedTuple, Optional, Union from filelock import FileLock, Timeout from ruamel.yaml import YAML, scalarstring @@ -89,6 +94,7 @@ class ReferenceError(GeneralError): class FetchError(GeneralError): """ Fatal error in helper command while fetching """ + # Keep previously used format of the message def __str__(self): @@ -99,11 +105,23 @@ class JsonSchemaError(GeneralError): """ Invalid JSON Schema """ +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Type hints +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class Logger(_Logger): + DATA: int + CACHE: int + ALL: int + cache: Callable[[str], None] + data: Callable[[str], None] + all: Callable[[str], None] + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Utils # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def pluralize(singular=None): +def pluralize(singular: str) -> str: """ Naively pluralize words """ if singular.endswith("y") and not singular.endswith("ay"): plural = singular[:-1] + "ies" @@ -114,7 +132,11 @@ def pluralize(singular=None): return plural -def listed(items, singular=None, plural=None, max=None, quote="", join="and"): +def listed(items: Union[int, list[Union[int, str]]], + singular: Optional[str] = None, + plural: Optional[str] = None, + max: Optional[int] = None, + quote: str = "", join: str = "and") -> str: """ Convert an iterable into a nice, human readable list or description:: @@ -133,7 +155,7 @@ def listed(items, singular=None, plural=None, max=None, quote="", join="and"): """ # Convert items to list if necessary - items = range(items) if isinstance(items, int) else list(items) + items = list(range(items)) if isinstance(items, int) else list(items) more = " more" # Description mode expected when singular provided but no maximum set if singular is not None and max is None: @@ -143,29 +165,30 @@ def listed(items, singular=None, plural=None, max=None, quote="", join="and"): if singular is not None and plural is None: plural = pluralize(singular) # Convert to strings and optionally quote each item - items = ["{0}{1}{0}".format(quote, item) for item in items] + items_str = [f"{quote}{item}{quote}" for item in items] # Select the maximum of items and describe the rest if max provided if max is not None: # Special case when the list is empty (0 items) - if max == 0 and len(items) == 0: - return "0 {0}".format(plural) + if max == 0 and len(items_str) == 0: + return f"0 {plural}" # Cut the list if maximum exceeded - if len(items) > max: - rest = len(items[max:]) - items = items[:max] + if len(items_str) > max: + rest = len(items_str[max:]) + items_str = items_str[:max] if singular is not None: - more += " {0}".format(singular if rest == 1 else plural) - items.append("{0}{1}".format(rest, more)) + more += f" {singular if rest == 1 else plural}" + items_str.append(f"{rest}{more}") # For two and more items use 'and' instead of the last comma - if len(items) < 2: - return "".join(items) + if len(items_str) < 2: + return "".join(items_str) else: - return ", ".join(items[0:-2] + [' {} '.format(join).join(items[-2:])]) + return ", ".join(items_str[0:-2] + [f" {join} ".join(items_str[-2:])]) -def split(values, separator=re.compile("[ ,]+")): +def split(values: Union[str, list[str]], separator: re.Pattern[str] + = re.compile("[ ,]+")) -> list[str]: """ Convert space-or-comma-separated values into a single list @@ -185,7 +208,7 @@ def split(values, separator=re.compile("[ ,]+")): return sum([separator.split(value) for value in values], []) -def info(message, newline=True): +def info(message: str, newline: bool = True) -> None: """ Log provided info message to the standard error output """ sys.stderr.write(message + ("\n" if newline else "")) @@ -194,7 +217,8 @@ def info(message, newline=True): # Filtering # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def evaluate(expression, data, _node=None): +def evaluate(expression: str, data: fmf.base.TreeData, + _node: Optional[fmf.base.Tree] = None) -> Any: """ Evaluate arbitrary Python expression against given data @@ -205,12 +229,13 @@ def evaluate(expression, data, _node=None): try: return eval(expression) except NameError as error: - raise FilterError("Key is not defined in data: {}".format(error)) + raise FilterError(f"Key is not defined in data: {error}") except KeyError as error: - raise FilterError("Internal key is not defined: {}".format(error)) + raise FilterError(f"Internal key is not defined: {error}") -def filter(filter, data, sensitive=True, regexp=False): +def filter(filter: str, data: fmf.base.TreeData, + sensitive: bool = True, regexp: bool = False) -> bool: """ Return true if provided filter matches given dictionary of values @@ -238,14 +263,14 @@ def filter(filter, data, sensitive=True, regexp=False): True, regular expressions can be used in the filter values as well. """ - def match_value(pattern, text): + def match_value(pattern: str, text: str) -> bool: """ Match value against data (simple or regexp) """ if regexp: - return re.match("^{0}$".format(pattern), text) + return bool(re.match(f"^{pattern}$", text)) else: return pattern == text - def check_value(dimension, value): + def check_value(dimension: str, value: str) -> bool: """ Check whether the value matches data """ # E.g. value = 'A, B' or value = "C" or value = "-D" # If there are multiple values, at least one must match @@ -254,7 +279,10 @@ def check_value(dimension, value): if atom.startswith("-"): atom = atom[1:] # Check each value for given dimension - for dato in data[dimension]: + dim_data = data_copy[dimension] + assert isinstance(dim_data, list) + for dato in dim_data: + assert isinstance(dato, str) if match_value(atom, dato): break # Pattern not found ---> good @@ -263,33 +291,36 @@ def check_value(dimension, value): # Handle positive values (return True upon first successful match) else: # Check each value for given dimension - for dato in data[dimension]: + dim_data = data_copy[dimension] + assert isinstance(dim_data, list) + for dato in dim_data: + assert isinstance(dato, str) if match_value(atom, dato): # Pattern found ---> good return True # No value matched the data return False - def check_dimension(dimension, values): + def check_dimension(dimension: str, values: list[str]) -> bool: """ Check whether all values for given dimension match data """ # E.g. dimension = 'tag', values = ['A, B', 'C', '-D'] # Raise exception upon unknown dimension - if dimension not in data: - raise FilterError("Invalid filter '{0}'".format(dimension)) + if dimension not in data_copy: + raise FilterError(f"Invalid filter '{dimension}'") # Every value must match at least one value for data - return all([check_value(dimension, value) for value in values]) + return all(check_value(dimension, value) for value in values) - def check_clause(clause): + def check_clause(clause: str) -> bool: """ Split into literals and check whether all match """ # E.g. clause = 'tag: A, B & tag: C & tag: -D' # Split into individual literals by dimension - literals = dict() + literals: dict[str, list[str]] = {} for literal in re.split(r"\s*&\s*", clause): # E.g. literal = 'tag: A, B' # Make sure the literal matches dimension:value format matched = re.match(r"^([^:]*)\s*:\s*(.*)$", literal) if not matched: - raise FilterError("Invalid filter '{0}'".format(literal)) + raise FilterError(f"Invalid filter '{literal}'") dimension, value = matched.groups() values = [value] # Append the literal value(s) to corresponding dimension list @@ -302,27 +333,30 @@ def check_clause(clause): if filter is None or filter == "": return True if not isinstance(data, dict): - raise FilterError("Invalid data type '{0}'".format(type(data))) + raise FilterError(f"Invalid data type '{type(data)}'") # Make sure that data dictionary contains lists of strings - data = copy.deepcopy(data) + data_copy = copy.deepcopy(data) for key in data: - if isinstance(data[key], list): - data[key] = [str(item) for item in data[key]] + data_val = data_copy[key] + if isinstance(data_val, list): + data_copy[key] = [str(item) for item in data_val] else: - data[key] = [str(data[key])] + data_copy[key] = [str(data_val)] # Turn all data into lowercase if sensitivity is off if not sensitive: filter = filter.lower() - lowered = dict() - for key, values in data.items(): - lowered[key.lower()] = [value.lower() for value in values] - data = lowered + lowered: fmf.base.TreeData = {} + for key, values in data_copy.items(): + assert isinstance(values, list) and all(isinstance(value, str) for value in values) + lowered[key.lower()] = [value.lower() for value in values] # type: ignore + data_copy = lowered # At least one clause must be true return any([check_clause(clause) for clause in re.split(r"\s*\|\s*", filter)]) + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Logging # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -356,9 +390,9 @@ class Logging: _level = LOG_WARN # Already initialized loggers by their name - _loggers = dict() + _loggers: dict[str, Logger] = {} - def __init__(self, name='fmf'): + def __init__(self, name: str = 'fmf'): # Use existing logger if already initialized try: self.logger = Logging._loggers[name] @@ -390,11 +424,11 @@ def format(self, record): if Coloring().enabled(): level = color(" " + levelname + " ", "lightwhite", colour) else: - level = "[{0}]".format(levelname) - return u"{0} {1}".format(level, record.getMessage()) + level = f"[{levelname}]" + return f"{level} {record.getMessage()}" @staticmethod - def _create_logger(name='fmf', level=None): + def _create_logger(name: str = 'fmf', level: Optional[str] = None) -> Logger: """ Create fmf logger """ # Create logger, handler and formatter logger = logging.getLogger(name) @@ -402,18 +436,18 @@ def _create_logger(name='fmf', level=None): handler.setFormatter(Logging.ColoredFormatter()) logger.addHandler(handler) # Save log levels in the logger itself (backward compatibility) - for level in Logging.LEVELS: - setattr(logger, level, getattr(logging, level)) + for lev in Logging.LEVELS: + setattr(logger, lev, getattr(logging, lev)) # Additional logging constants and methods for cache and xmlrpc - logger.DATA = LOG_DATA - logger.CACHE = LOG_CACHE - logger.ALL = LOG_ALL - logger.cache = lambda message: logger.log(LOG_CACHE, message) # NOQA - logger.data = lambda message: logger.log(LOG_DATA, message) # NOQA - logger.all = lambda message: logger.log(LOG_ALL, message) # NOQA - return logger - - def set(self, level=None): + logger.DATA = LOG_DATA # type: ignore + logger.CACHE = LOG_CACHE # type: ignore + logger.ALL = LOG_ALL # type: ignore + logger.cache = lambda message: logger.log(LOG_CACHE, message) # type: ignore # NOQA + logger.data = lambda message: logger.log(LOG_DATA, message) # type: ignore # NOQA + logger.all = lambda message: logger.log(LOG_ALL, message) # type: ignore # NOQA + return logger # type: ignore + + def set(self, level: Optional[int] = None) -> None: """ Set the default log level @@ -438,7 +472,7 @@ def set(self, level=None): Logging._level = logging.WARN self.logger.setLevel(Logging._level) - def get(self): + def get(self) -> int: """ Get the current log level """ return self.logger.level @@ -447,7 +481,9 @@ def get(self): # Coloring # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def color(text, color=None, background=None, light=False, enabled="auto"): +def color(text: str, color: Optional[str] = None, + background: Optional[str] = None, + light: bool = False, enabled: Union[str, bool] = "auto") -> str: """ Return text in desired color if coloring enabled @@ -465,11 +501,10 @@ def color(text, color=None, background=None, light=False, enabled="auto"): if color and color.startswith("light"): light = True color = color[5:] - color = color and ";{0}".format(colors[color]) or "" - background = background and ";{0}".format(colors[background] + 10) or "" - light = light and 1 or 0 + color = color and f";{colors[color]}" or "" + background = background and f";{colors[background] + 10}" or "" # Starting and finishing sequence - start = "\033[{0}{1}{2}m".format(light, color, background) + start = f"\033[{int(light)}{color}{background}m" finish = "\033[1;m" return "".join([start, text, finish]) @@ -478,7 +513,7 @@ class Coloring: """ Coloring configuration """ # Default color mode is auto-detected from the terminal presence - _mode = None + _mode: Optional[int] = None MODES = ["COLOR_OFF", "COLOR_ON", "COLOR_AUTO"] # We need only a single config instance _instance = None @@ -489,7 +524,7 @@ def __new__(cls, *args, **kwargs): cls._instance = super(Coloring, cls).__new__(cls, *args, **kwargs) return cls._instance - def __init__(self, mode=None): + def __init__(self, mode: Optional[int] = None): """ Initialize the coloring mode """ # Nothing to do if already initialized if self._mode is not None: @@ -497,7 +532,7 @@ def __init__(self, mode=None): # Set the mode self.set(mode) - def set(self, mode=None): + def set(self, mode: Optional[int] = None) -> None: """ Set the coloring mode @@ -523,18 +558,17 @@ def set(self, mode=None): except Exception: mode = COLOR_AUTO elif mode < 0 or mode > 2: - raise RuntimeError("Invalid color mode '{0}'".format(mode)) + raise RuntimeError(f"Invalid color mode '{mode}'") self._mode = mode log.debug( - "Coloring {0} ({1})".format( - "enabled" if self.enabled() else "disabled", - self.MODES[self._mode])) + f"Coloring {'enabled' if self.enabled() else 'disabled'} ({self.MODES[self._mode]})") - def get(self): + def get(self) -> int: """ Get the current color mode """ + assert self._mode is not None return self._mode - def enabled(self): + def enabled(self) -> bool: """ True if coloring is currently enabled """ # In auto-detection mode color enabled when terminal attached if self._mode == COLOR_AUTO: @@ -546,7 +580,7 @@ def enabled(self): # Cache directory # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def get_cache_directory(create=True): +def get_cache_directory(create: bool = True) -> str: """ Return cache directory, created by this call if necessary @@ -569,8 +603,7 @@ def get_cache_directory(create=True): try: os.makedirs(cache, exist_ok=True) except OSError: - raise GeneralError( - "Failed to create cache directory '{0}'.".format(cache)) + raise GeneralError(f"Failed to create cache directory '{cache}'.") return cache @@ -586,7 +619,7 @@ def set_cache_expiration(seconds): CACHE_EXPIRATION = int(seconds) -def clean_cache_directory(): +def clean_cache_directory() -> None: """ Delete used cache directory if it exists """ cache = get_cache_directory(create=False) if os.path.isdir(cache): @@ -609,23 +642,24 @@ def invalidate_cache(): try: if os.path.isfile(fetch_head): lock_path = root + LOCK_SUFFIX_FETCH - log.debug("Remove '{0}'.".format(fetch_head)) + log.debug(f"Remove '{fetch_head}'.") with FileLock(lock_path, timeout=FETCH_LOCK_TIMEOUT): os.remove(fetch_head) except (IOError, Timeout) as error: # pragma: no cover issues.append( - "Couldn't remove file '{0}': {1}".format(fetch_head, error)) + f"Couldn't remove file '{fetch_head}': {error}") # Already found a .git so no need to continue inside the root del dirs[:] if issues: # pragma: no cover raise GeneralError("\n".join(issues)) + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Fetch Tree from the Remote Repository # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def fetch_tree(url, ref=None, path='.'): +def fetch_tree(url: str, ref: Optional[str] = None, path: str = '.') -> fmf.base.Tree: """ Get initialized Tree from a remote git repository @@ -655,8 +689,7 @@ def fetch_tree(url, ref=None, path='.'): return fmf.base.Tree(root) except Timeout: raise GeneralError( - "Failed to acquire lock for {0} within {1} seconds".format( - lock_path, NODE_LOCK_TIMEOUT)) + f"Failed to acquire lock for {lock_path} within {NODE_LOCK_TIMEOUT} seconds") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -678,7 +711,7 @@ def fetch(url, ref=None, destination=None, env=None): # Fetch Remote Repository # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def default_branch(repository, remote="origin"): +def default_branch(repository: str, remote: str = "origin") -> str: """ Detect default branch from given local git repository """ head = os.path.join(repository, f".git/refs/remotes/{remote}/HEAD") # Make sure the HEAD reference is available @@ -689,7 +722,10 @@ def default_branch(repository, remote="origin"): return ref.read().strip().split('/')[-1] -def fetch_repo(url, ref=None, destination=None, env=None): +def fetch_repo(url: str, + ref: Optional[str] = None, + destination: Optional[str] = None, + env: Optional[dict[str, str]] = None) -> str: """ Fetch remote git repository and return local directory @@ -713,7 +749,7 @@ def fetch_repo(url, ref=None, destination=None, env=None): # Lock for possibly shared cache directory. Add the extension # LOCK_SUFFIX_FETCH manually in the constructor. Everything under # the with statement to correctly remove lock upon exception. - log.debug("Acquire lock for '{0}'.".format(destination)) + log.debug(f"Acquire lock for '{destination}'.") try: lock_path = destination + LOCK_SUFFIX_FETCH with FileLock(lock_path, timeout=FETCH_LOCK_TIMEOUT) as lock: @@ -733,7 +769,7 @@ def fetch_repo(url, ref=None, destination=None, env=None): if not depth: # Do not retry if shallow clone was not used raise - log.debug("Clone failed with '{0}', trying without '--depth=1'.".format(error)) + log.debug(f"Clone failed with '{error}', trying without '--depth=1'.") run(['git', 'clone', url, destination], cwd=cache, env=env) # Detect the default branch if 'ref' not provided if ref is None: @@ -755,7 +791,7 @@ def fetch_repo(url, ref=None, destination=None, env=None): if os.path.isfile(os.path.join(destination, '.git', 'shallow')): # Make fetch get all remote refs (branches...) run(["git", "config", "remote.origin.fetch", - "+refs/heads/*:refs/remotes/origin/*"], cwd=destination) + "+refs/heads/*:refs/remotes/origin/*"], cwd=destination) # Fetch the whole history run(['git', 'fetch', '--unshallow'], cwd=destination) run(['git', 'checkout', '-f', ref], cwd=destination, env=env) @@ -764,14 +800,13 @@ def fetch_repo(url, ref=None, destination=None, env=None): raise error # Reset to origin to get possible changes but no exit code check # ref could be tag or commit where it is expected to fail - run(['git', 'reset', '--hard', "origin/{0}".format(ref)], + run(['git', 'reset', '--hard', f"origin/{ref}"], cwd=destination, check_exit_code=False, env=env) except Timeout: raise GeneralError( - "Failed to acquire lock for '{0}' within {1} seconds.".format( - destination, FETCH_LOCK_TIMEOUT)) + f"Failed to acquire lock for '{destination}' within {FETCH_LOCK_TIMEOUT} seconds.") except (OSError, subprocess.CalledProcessError) as error: - raise FetchError("{0}".format(error), error) + raise FetchError(f"{error}", error) return destination @@ -780,7 +815,9 @@ def fetch_repo(url, ref=None, destination=None, env=None): # Run command # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def run(command, cwd=None, check_exit_code=True, env=None): +def run(command: Union[str, list[str]], cwd: Optional[str] = None, + check_exit_code: bool = True, + env: Optional[dict[str, str]] = None) -> tuple[str, str]: """ Run command and return a (stdout, stderr) tuple @@ -789,16 +826,15 @@ def run(command, cwd=None, check_exit_code=True, env=None): :check_exit_code raise CalledProcessError if exit code is non-zero :env dictionary of the environment variables for the command """ - log.debug("Running command: '{0}'.".format(' '.join(command))) + log.debug(f"Running command: '{' '.join(command)}'.") process = subprocess.Popen( command, cwd=cwd, env=env, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() - log.debug("stdout: {0}".format(stdout.strip())) - log.debug("stderr: {0}".format(stderr.strip())) - log.debug("exit_code: {0}{1}".format( - process.returncode, ('' if check_exit_code else ' (ignored)'))) + log.debug(f"stdout: {stdout.strip()}") + log.debug(f"stderr: {stderr.strip()}") + log.debug(f"exit_code: {process.returncode}{('' if check_exit_code else ' (ignored)')}") if check_exit_code and process.returncode != 0: raise subprocess.CalledProcessError( process.returncode, ' '.join(command), output=stdout + stderr) @@ -817,7 +853,9 @@ def run(command, cwd=None, check_exit_code=True, env=None): # Convert dict to yaml # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def dict_to_yaml(data, width=None, sort=False): +def dict_to_yaml(data: fmf.base.TreeData, + width: Optional[int] = None, + sort: bool = False) -> str: """ Convert dictionary into yaml """ output = StringIO() @@ -852,4 +890,4 @@ class JsonSchemaValidationResult(NamedTuple): """ Represents JSON Schema validation result """ result: bool - errors: List[Any] + errors: list[Any] diff --git a/plans/integration.fmf b/plans/integration.fmf index 65f84aa9..4c0b18a8 100644 --- a/plans/integration.fmf +++ b/plans/integration.fmf @@ -23,10 +23,3 @@ prepare: # newer yq requires tomlkit>=0.11.7 which is python 3.7+ only. - pip3 install --user yq==3.1.1 || pip3 install yq==3.1.1 - yq --help - -adjust+: - - when: distro == centos-stream-8 and trigger == commit - environment+: - # Default PATH, spiked with ~/.local/bin for Python packages installed with --user. - # This is needed on Centos Stream 8 where the user-specific path is not in PATH. - PATH: "/root/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..ed74a547 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,90 @@ +[build-system] +requires = ['hatchling', 'hatch-vcs'] +build-backend = 'hatchling.build' + +[project] +name = 'fmf' +authors = [ + { name = 'Petr Splichal', email = 'psplicha@redhat.com' }, +] +maintainers = [ + { name = 'Petr Splichal', email = 'psplicha@redhat.com' }, +] +description = 'Flexible Metadata Format' +readme = 'README.rst' +license = 'GPL-2.0-or-later' +license-files = { paths = ['LICENSE'] } +requires-python = '>=3.9' +classifiers = [ + 'Natural Language :: English', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'Topic :: Utilities', +] +keywords = [ + 'metadata', + 'testing', +] +dependencies = [ + 'ruamel.yaml', + 'filelock', + 'jsonschema', + 'typing-extensions ; python_version<"3.10"', +] +dynamic = ['version'] + +[project.urls] +Homepage = 'https://github.com/psss/fmf' + +[project.optional-dependencies] +tests = [ + 'pytest', +] +tests-cov = [ + 'fmf[tests]', + 'pytest-cov', +] +dev = [ + 'fmf[tests]', + 'pre-commit', +] +docs = [ + 'sphinx', + 'sphinx_rtd_theme', +] +all = [ + 'fmf[dev]', + 'fmf[docs]', +] + +[project.scripts] +fmf = 'fmf.cli:cli_entry' + +[tool.hatch] +version.source = 'vcs' +version.raw-options.version_scheme = 'post-release' + +[tool.hatch.build.targets.wheel] +packages = ['fmf'] + +[tool.pytest.ini_options] +markers = [ + "web: tests which need to access the web", +] +testpaths = [ + 'tests', +] + +[tool.coverage] +run.source = ["fmf"] + +[tool.mypy] +strict = true +files = ["fmf"] +python_version = "3.9" +warn_unused_configs = true +show_error_codes = true +disallow_untyped_defs = false +follow_imports = "normal" diff --git a/setup.py b/setup.py deleted file mode 100755 index fac0b39d..00000000 --- a/setup.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python - -import re -from io import open - -from setuptools import setup - -# Parse version from the spec file -with open('fmf.spec', encoding='utf-8') as specfile: - lines = "\n".join(line.rstrip() for line in specfile) - version = re.search('Version: (.+)', lines).group(1).rstrip() - -# acceptable version schema: major.minor[.patch][sub] -__version__ = version -__pkg__ = 'fmf' -__pkgdir__ = {} -__pkgs__ = ['fmf'] -__provides__ = ['fmf'] -__desc__ = 'Flexible Metadata Format' -__scripts__ = ['bin/fmf'] - -# Prepare install requires and extra requires -install_requires = [ - 'ruamel.yaml', - 'filelock', - 'jsonschema', - ] -extras_require = { - 'docs': ['sphinx>=3', 'sphinx_rtd_theme'], - 'tests': ['pytest', 'python-coveralls', 'pre-commit'], - } -extras_require['all'] = [ - dependency - for extra in extras_require.values() - for dependency in extra] - -pip_src = 'https://pypi.python.org/packages/source' -__deplinks__ = [] - -# README is in the parent directory -readme = 'README.rst' -with open(readme, encoding='utf-8') as _file: - readme = _file.read() - -github = 'https://github.com/psss/fmf' -download_url = '{0}/archive/master.zip'.format(github) - -default_setup = dict( - url=github, - license='GPLv2', - author='Petr Splichal', - author_email='psplicha@redhat.com', - maintainer='Petr Splichal', - maintainer_email='psplicha@redhat.com', - download_url=download_url, - long_description=readme, - data_files=[], - classifiers=[ - 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', - 'Natural Language :: English', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Topic :: Utilities', - ], - keywords=['metadata', 'testing'], - dependency_links=__deplinks__, - description=__desc__, - install_requires=install_requires, - extras_require=extras_require, - name=__pkg__, - package_dir=__pkgdir__, - packages=__pkgs__, - provides=__provides__, - scripts=__scripts__, - version=__version__, - ) - -setup(**default_setup) diff --git a/tests/unit/pytest.ini b/tests/unit/pytest.ini deleted file mode 100644 index cafe5e5d..00000000 --- a/tests/unit/pytest.ini +++ /dev/null @@ -1,3 +0,0 @@ -[pytest] -markers = - web: tests which need to access the web