diff --git a/beangulp/__init__.py b/beangulp/__init__.py index 17fbd30..8d8b8a2 100644 --- a/beangulp/__init__.py +++ b/beangulp/__init__.py @@ -10,15 +10,22 @@ __license__ = "GNU GPLv2" +import datetime +import difflib import os import sys import io import warnings +from os import path + import click from beancount import loader from typing import Optional from typing import List +from typing import TextIO +from beancount.core import data +from beancount.parser import printer from beangulp import archive from beangulp import cache # noqa: F401 @@ -249,6 +256,181 @@ def _identify(ctx: 'Ingest', src: str, failfast: bool, verbose: bool): sys.exit(1) +@click.command("test") +@click.argument("src", nargs=-1, type=click.Path(exists=True, resolve_path=True)) +@click.option( + "--expected", + "-e", + metavar="DIR", + type=click.Path(file_okay=False, resolve_path=True), + help="Directory containing the expecrted output files.", +) +@click.option("--verbose", "-v", count=True, help="Enable verbose output.") +@click.option("--quiet", "-q", count=True, help="Suppress all output.") +@click.option("--failfast", "-x", is_flag=True, help="Stop at the first test failure.") +@click.pass_obj +def _test(ctx, src: List[str], expected: str, verbose: int, quiet: int, failfast: bool): + """Test the importer. + + Run the importer on all DOCUMENTS and verify that it produces the + desired output. The desired output is stored in Beancount ledger + files with a header containing additional metadata and can be + generated with the "generate" command. + + The name of the desired output files is derived appending a + ".beancount" suffix to the name of the input files, and are + searched in the same directory where the input document is + located, unless a different location is specified through the + "--expected DIR" option. + + DOCUMENTS can be files or directories. Directories are walked and + the importer is called on all files with names not ending in + ".beancount". All and only the documents for which a desired + output file exists must be positively identify by the importer. + + """ + return _run(ctx, src, expected, verbose, quiet, failfast=failfast) + + +@click.command("generate") +@click.argument("src", nargs=-1, type=click.Path(exists=True, resolve_path=True)) +@click.option( + "--expected", + "-e", + metavar="DIR", + type=click.Path(file_okay=False, resolve_path=True), + help="Directory containing the expecrted output files.", +) +@click.option("--verbose", "-v", count=True, help="Enable verbose output.") +@click.option("--quiet", "-q", count=True, help="Suppress all output.") +@click.option("--force", "-f", is_flag=True, help="Alow to overwrite existing files.") +@click.pass_obj +def _generate(ctx, src: List[str], expected: str, verbose: int, quiet: int, force: bool): + """Generate expected files for tests. + + Run the importer on all DOCUMENTS and save the import results in + Beancount ledger files with an header containing additional + metadata that can be used to as regression tests for the importer. + + The name of the desired output files is derived appending a + ".beancount" suffix to the name of the input files, and are + written in the same directory where the input document is located, + unless a different location is specified through the "--expected + DIR" option. + + DOCUMENTS can be files or directories. Directories are walked and + the importer is called on all files with names not ending in + ".beancount". + + """ + return _run(ctx, src, expected, verbose, quiet, generate=True, force=force) + + +def _run(ctx, src, expected, verbose, quiet, generate=False, failfast=False, force=False): + log = utils.logger(verbose) + errors = exceptions.ExceptionsTrap(log) + + verbosity = verbose - quiet + failures = 0 + + if len(src) == 0 and "src" in ctx.defaults: + src = (ctx.defaults["src"],) if isinstance(ctx.defaults["src"], str) else ctx.defaults["src"] + + for filename in _walk(src, log): + with errors: + importer = identify.identify(ctx.importers, filename) + if not importer: + log("") # Newline. + continue + + account = importer.account(filename) + date = importer.date(filename) + name = importer.filename(filename) + entries = extract.extract_from_file(importer, filename, []) + + expected_filename = f"{filename}.beancount" + if expected: + expected_filename = path.join(expected, path.basename(expected_filename)) + + if generate: + try: + write_expected_file(expected_filename, account, date, name, entries, force=force) + except FileExistsError as ex: + failures += 1 + log(" ERROR", fg="red") + log(" FileExistsError: {}".format(ex.filename)) + continue + log(" OK", fg="green") + continue + + try: + diff = compare_expected(expected_filename, account, date, name, entries) + except FileNotFoundError: + # The importer has positively identified a document + # for which there is no expecred output file. + failures += 1 + log(" ERROR", fg="red") + log(" ExpectedOutputFileNotFound") + continue + if diff: + # Test failure. Log an error. + failures += 1 + log(" ERROR", fg="red") + if verbosity >= 0: + sys.stdout.writelines(diff) + sys.stdout.write(os.linesep) + continue + log(" PASSED", fg="green") + + if failfast and errors: + break + + if errors: + sys.exit(1) + + +def write_expected( + outfile: TextIO, account: data.Account, date: Optional[datetime.date], name: Optional[str], entries: data.Entries +): + """Produce the expected output file. + + Args: + outfile: The file object where to write. + account: The account name produced by the importer. + date: The date of the downloads file, produced by the importer. + name: The filename for filing, produced by the importer. + entries: The list of entries extracted by the importer. + """ + date = date.isoformat() if date else "" + name = name or "" + print(f";; Account: {account}", file=outfile) + print(f";; Date: {date}", file=outfile) + print(f";; Name: {name}", file=outfile) + printer.print_entries(entries, file=outfile) + + +def compare_expected(filepath: str, *data) -> List[str]: + """Compare the expected file with extracted data.""" + with io.StringIO() as buffer: + write_expected(buffer, *data) + # rewind to the beginning of the stream + buffer.seek(0) + lines_imported = buffer.readlines() + + with open(filepath, "r") as infile: + lines_expected = infile.readlines() + + diff = difflib.unified_diff(lines_expected, lines_imported, tofile="expected.beancount", fromfile="imported.beancount") + return list(diff) + + +def write_expected_file(filepath: str, *data, force: bool = False): + """Writes out the expected file.""" + mode = "w" if force else "x" + with open(filepath, mode) as expfile: + write_expected(expfile, *data) + + def _importer(importer): """Check that the passed instance implements the Importer interface. @@ -267,6 +449,8 @@ def _importer(importer): raise TypeError(f'expected bengulp.Importer not {type(importer):}') + + class Ingest: def __init__(self, importers: list, hooks=None): self.importers = [_importer(i) for i in importers] @@ -289,6 +473,8 @@ def cli(ctx): cli.add_command(_archive) cli.add_command(_extract) cli.add_command(_identify) + cli.add_command(_test) + cli.add_command(_generate) self.cli = cli diff --git a/pyproject.toml b/pyproject.toml index 4675487..72dc33a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta' [project] name = 'beangulp' -version = '0.3.0.dev0' +version = '0.3.0.dev1' license = { file = 'LICENSE' } description = 'Importers Framework for Beancount' readme = 'README.rst'