Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions beangulp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@
__license__ = "GNU GPLv2"


import datetime
import difflib
import os
import sys
import io
import warnings
from os import path

import click

from beancount import loader
from typing import Optional
from typing import List
from typing import TextIO
from beancount.core import data
from beancount.parser import printer

from beangulp import archive
from beangulp import cache # noqa: F401
Expand Down Expand Up @@ -249,6 +256,181 @@ def _identify(ctx: 'Ingest', src: str, failfast: bool, verbose: bool):
sys.exit(1)


@click.command("test")
@click.argument("src", nargs=-1, type=click.Path(exists=True, resolve_path=True))
@click.option(
"--expected",
"-e",
metavar="DIR",
type=click.Path(file_okay=False, resolve_path=True),
help="Directory containing the expecrted output files.",
)
@click.option("--verbose", "-v", count=True, help="Enable verbose output.")
@click.option("--quiet", "-q", count=True, help="Suppress all output.")
@click.option("--failfast", "-x", is_flag=True, help="Stop at the first test failure.")
@click.pass_obj
def _test(ctx, src: List[str], expected: str, verbose: int, quiet: int, failfast: bool):
"""Test the importer.

Run the importer on all DOCUMENTS and verify that it produces the
desired output. The desired output is stored in Beancount ledger
files with a header containing additional metadata and can be
generated with the "generate" command.

The name of the desired output files is derived appending a
".beancount" suffix to the name of the input files, and are
searched in the same directory where the input document is
located, unless a different location is specified through the
"--expected DIR" option.

DOCUMENTS can be files or directories. Directories are walked and
the importer is called on all files with names not ending in
".beancount". All and only the documents for which a desired
output file exists must be positively identify by the importer.

"""
return _run(ctx, src, expected, verbose, quiet, failfast=failfast)


@click.command("generate")
@click.argument("src", nargs=-1, type=click.Path(exists=True, resolve_path=True))
@click.option(
"--expected",
"-e",
metavar="DIR",
type=click.Path(file_okay=False, resolve_path=True),
help="Directory containing the expecrted output files.",
)
@click.option("--verbose", "-v", count=True, help="Enable verbose output.")
@click.option("--quiet", "-q", count=True, help="Suppress all output.")
@click.option("--force", "-f", is_flag=True, help="Alow to overwrite existing files.")
@click.pass_obj
def _generate(ctx, src: List[str], expected: str, verbose: int, quiet: int, force: bool):
"""Generate expected files for tests.

Run the importer on all DOCUMENTS and save the import results in
Beancount ledger files with an header containing additional
metadata that can be used to as regression tests for the importer.

The name of the desired output files is derived appending a
".beancount" suffix to the name of the input files, and are
written in the same directory where the input document is located,
unless a different location is specified through the "--expected
DIR" option.

DOCUMENTS can be files or directories. Directories are walked and
the importer is called on all files with names not ending in
".beancount".

"""
return _run(ctx, src, expected, verbose, quiet, generate=True, force=force)


def _run(ctx, src, expected, verbose, quiet, generate=False, failfast=False, force=False):
log = utils.logger(verbose)
errors = exceptions.ExceptionsTrap(log)

verbosity = verbose - quiet
failures = 0

if len(src) == 0 and "src" in ctx.defaults:
src = (ctx.defaults["src"],) if isinstance(ctx.defaults["src"], str) else ctx.defaults["src"]

for filename in _walk(src, log):
with errors:
importer = identify.identify(ctx.importers, filename)
if not importer:
log("") # Newline.
continue

account = importer.account(filename)
date = importer.date(filename)
name = importer.filename(filename)
entries = extract.extract_from_file(importer, filename, [])

expected_filename = f"{filename}.beancount"
if expected:
expected_filename = path.join(expected, path.basename(expected_filename))

if generate:
try:
write_expected_file(expected_filename, account, date, name, entries, force=force)
except FileExistsError as ex:
failures += 1
log(" ERROR", fg="red")
log(" FileExistsError: {}".format(ex.filename))
continue
log(" OK", fg="green")
continue

try:
diff = compare_expected(expected_filename, account, date, name, entries)
except FileNotFoundError:
# The importer has positively identified a document
# for which there is no expecred output file.
failures += 1
log(" ERROR", fg="red")
log(" ExpectedOutputFileNotFound")
continue
if diff:
# Test failure. Log an error.
failures += 1
log(" ERROR", fg="red")
if verbosity >= 0:
sys.stdout.writelines(diff)
sys.stdout.write(os.linesep)
continue
log(" PASSED", fg="green")

if failfast and errors:
break

if errors:
sys.exit(1)


def write_expected(
outfile: TextIO, account: data.Account, date: Optional[datetime.date], name: Optional[str], entries: data.Entries
):
"""Produce the expected output file.

Args:
outfile: The file object where to write.
account: The account name produced by the importer.
date: The date of the downloads file, produced by the importer.
name: The filename for filing, produced by the importer.
entries: The list of entries extracted by the importer.
"""
date = date.isoformat() if date else ""
name = name or ""
print(f";; Account: {account}", file=outfile)
print(f";; Date: {date}", file=outfile)
print(f";; Name: {name}", file=outfile)
printer.print_entries(entries, file=outfile)


def compare_expected(filepath: str, *data) -> List[str]:
"""Compare the expected file with extracted data."""
with io.StringIO() as buffer:
write_expected(buffer, *data)
# rewind to the beginning of the stream
buffer.seek(0)
lines_imported = buffer.readlines()

with open(filepath, "r") as infile:
lines_expected = infile.readlines()

diff = difflib.unified_diff(lines_expected, lines_imported, tofile="expected.beancount", fromfile="imported.beancount")
return list(diff)


def write_expected_file(filepath: str, *data, force: bool = False):
"""Writes out the expected file."""
mode = "w" if force else "x"
with open(filepath, mode) as expfile:
write_expected(expfile, *data)


def _importer(importer):
"""Check that the passed instance implements the Importer interface.

Expand All @@ -267,6 +449,8 @@ def _importer(importer):
raise TypeError(f'expected bengulp.Importer not {type(importer):}')




class Ingest:
def __init__(self, importers: list, hooks=None):
self.importers = [_importer(i) for i in importers]
Expand All @@ -289,6 +473,8 @@ def cli(ctx):
cli.add_command(_archive)
cli.add_command(_extract)
cli.add_command(_identify)
cli.add_command(_test)
cli.add_command(_generate)

self.cli = cli

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta'

[project]
name = 'beangulp'
version = '0.3.0.dev0'
version = '0.3.0.dev1'
license = { file = 'LICENSE' }
description = 'Importers Framework for Beancount'
readme = 'README.rst'
Expand Down