From f37416100d9721b319be03f44885df6702c5b28c Mon Sep 17 00:00:00 2001 From: aredier Date: Tue, 14 Apr 2020 12:47:16 +0200 Subject: [PATCH 01/13] adding the pipeline link class --- chariots/op_store/_op_store.py | 16 ++++-- chariots/op_store/_op_store_client.py | 17 +++--- chariots/op_store/models/__init__.py | 4 +- chariots/op_store/models/pipeline.py | 1 - chariots/op_store/models/pipeline_link.py | 12 +++++ tests/unit/test_op_store.py | 66 ++++++++++++++++++++++- 6 files changed, 98 insertions(+), 18 deletions(-) create mode 100644 chariots/op_store/models/pipeline_link.py diff --git a/chariots/op_store/_op_store.py b/chariots/op_store/_op_store.py index b5642a9..e04726d 100644 --- a/chariots/op_store/_op_store.py +++ b/chariots/op_store/_op_store.py @@ -7,7 +7,7 @@ from flask_migrate import Migrate from sqlalchemy.orm import aliased -from .models import db +from .models import db, DBPipelineLink from .models.version import DBVersion from .models.op import DBOp from .models.validated_link import DBValidatedLink @@ -287,16 +287,22 @@ def register_new_pipeline(self): """endpoint ot register a new pipeline""" pipeline_name = request.json['pipeline_name'] - last_op_name = request.json['last_op_name'] - - last_node_id = self.get_or_register_db_op(last_op_name).id + pipeline_links = request.json['pipeline_links'] db_pipeline = DBPipeline( pipeline_name=pipeline_name, - last_op_id=last_node_id, ) self._session.add(db_pipeline) self._session.commit() + for upstream_node_name, downstream_node_name in pipeline_links: + upstream_op_id = self._get_db_op(upstream_node_name).id + downstream_op_id = self._get_db_op(downstream_node_name).id if downstream_node_name else None + self._session.add( + DBPipelineLink(pipeline_id=db_pipeline.id, upstream_op_id=upstream_op_id, + downstream_op_id=downstream_op_id) + ) + self._session.commit() + return jsonify({}) def _init_routes(self): diff --git a/chariots/op_store/_op_store_client.py b/chariots/op_store/_op_store_client.py index 298d366..4b6860c 100644 --- a/chariots/op_store/_op_store_client.py +++ b/chariots/op_store/_op_store_client.py @@ -125,15 +125,14 @@ def register_new_pipeline(self, pipeline: 'pipelines.Pipeline'): :param pipeline: the pipeline to register """ - - for upstream_node, downstream_node in pipeline.get_all_op_links(): - if downstream_node is None: - self.post('/v1/register_new_pipeline', { - 'pipeline_name': pipeline.name, - 'last_op_name': upstream_node.name - }) - return - raise ValueError('did not manage to find last node of the pipeline') + self.post('/v1/register_new_pipeline', { + 'pipeline_name': pipeline.name, + 'pipeline_links': [ + (upstream_node.name, downstream_node.name if downstream_node is not None else None) + for upstream_node, downstream_node in pipeline.get_all_op_links() + ] + }) + return class OpStoreClient(BaseOpStoreClient): diff --git a/chariots/op_store/models/__init__.py b/chariots/op_store/models/__init__.py index 22f9a56..2200016 100644 --- a/chariots/op_store/models/__init__.py +++ b/chariots/op_store/models/__init__.py @@ -7,6 +7,7 @@ from .version import DBVersion # noqa from .validated_link import DBValidatedLink # noqa from .op import DBOp # noqa +from .pipeline_link import DBPipelineLink __all__ = [ @@ -14,5 +15,6 @@ 'SQLAlchemy', 'DBValidatedLink', 'DBValidatedLink', - 'DBOp' + 'DBOp', + 'DBPipelineLink' ] diff --git a/chariots/op_store/models/pipeline.py b/chariots/op_store/models/pipeline.py index 027eb0b..6f21170 100644 --- a/chariots/op_store/models/pipeline.py +++ b/chariots/op_store/models/pipeline.py @@ -9,4 +9,3 @@ class DBPipeline(db.Model): id = Column(Integer, primary_key=True) pipeline_name = Column(String) - last_op_id = Column(Integer, ForeignKey(DBOp.id)) diff --git a/chariots/op_store/models/pipeline_link.py b/chariots/op_store/models/pipeline_link.py new file mode 100644 index 0000000..188387c --- /dev/null +++ b/chariots/op_store/models/pipeline_link.py @@ -0,0 +1,12 @@ +from sqlalchemy import Column, Integer, ForeignKey + +from . import DBPipeline, DBOp +from ..models import db + + +class DBPipelineLink(db.Model): + + id = Column(Integer, primary_key=True) + pipeline_id = Column(Integer, ForeignKey(DBPipeline.id)) + upstream_op_id = Column(Integer, ForeignKey(DBOp.id), nullable=False) + downstream_op_id = Column(Integer, ForeignKey(DBOp.id), nullable=True) diff --git a/tests/unit/test_op_store.py b/tests/unit/test_op_store.py index 186a1af..c484c7d 100644 --- a/tests/unit/test_op_store.py +++ b/tests/unit/test_op_store.py @@ -1,8 +1,9 @@ import os +from collections import Counter from typing import Type import pytest -from sqlalchemy import create_engine +from sqlalchemy import create_engine, alias from sqlalchemy.orm import sessionmaker from chariots import versioning @@ -194,6 +195,24 @@ def small_test_pipeline(): ], name='cooking_pipeline') +def get_all_links(session, pipeline_id): + links = session.query( + models.DBPipelineLink, models.DBOp + ).filter( + models.DBPipelineLink.pipeline_id == pipeline_id + ).filter( + models.DBPipelineLink.upstream_op_id == models.DBOp.id + ).all() + res = [] + for link, upstream in links: + if link.downstream_op_id is None: + res.append((link, upstream.op_name, None)) + continue + downstream = session.query(models.DBOp).filter(models.DBOp.id == link.downstream_op_id).one() + res.append((link, upstream.op_name, downstream.op_name)) + return res + + def test_register_new_pipeline(op_store_client: TestOpStoreClient, small_test_pipeline: Pipeline, session_func: sessionmaker): @@ -207,7 +226,50 @@ def test_register_new_pipeline(op_store_client: TestOpStoreClient, small_test_pi db_op = session.query(models.DBOp).one() db_pipeline = session.query(models.DBPipeline).one() assert db_pipeline.pipeline_name == 'cooking_pipeline' - assert db_pipeline.last_op_id == db_op.id + + all_ops = get_all_links(session, db_pipeline.id) + assert len(all_ops) == 1 + op_link, upstream_op_name, downstream_op_name = all_ops[0] + assert upstream_op_name == db_op.op_name + assert downstream_op_name == None + + +def test_register_new_pipeline_complex_dag(op_store_client: TestOpStoreClient, session_func: sessionmaker): + + # defining the necesasry op and pipeline + class OpwithInput(ops.BaseOp): + + def execute(self, input1='sad', input2=None): + if input2 is None: + return input1 + '_foo' + return input1 + input2 + '_bar' + + test_pipeline = Pipeline([ + nodes.Node(OpwithInput(), input_nodes=['__pipeline_input__'], output_nodes=['input_1']), + nodes.Node(OpwithInput(), output_nodes=['input_2']), + nodes.Node(OpwithInput(), input_nodes=['input_1', 'input_2'], output_nodes=['__pipeline_output__']), + ], name='complex_dag_pipeline') + + # registering the ops and pipeline + for upstream_node, downstream_node in test_pipeline.get_all_op_links(): + if downstream_node: + op_store_client.register_valid_link(downstream_node.name, upstream_node.name, upstream_node.node_version) + op_store_client.register_new_pipeline(test_pipeline) + + # test the registration + session = session_func() + db_op = session.query(models.DBOp).one() + assert db_op.op_name == 'opwithinput' + db_pipeline = session.query(models.DBPipeline).one() + assert db_pipeline.pipeline_name == 'complex_dag_pipeline' + + all_links = get_all_links(session, db_pipeline.id) + assert len(all_links) == 3 + assert Counter([(link[1], link[2]) for link in all_links]) == { + ('opwithinput', 'opwithinput'): 2, + ('opwithinput', None): 1 + } + def test_pipeline_exists(op_store_client: TestOpStoreClient, small_test_pipeline: Pipeline): From e0ac8b1c66885e5ef638bbc74d67a1e6c32d87db Mon Sep 17 00:00:00 2001 From: aredier Date: Wed, 15 Apr 2020 12:14:33 +0200 Subject: [PATCH 02/13] graphql bare bones implementation --- chariots/metadata/__init__.py | 0 chariots/metadata/metadata_server.py | 77 ++++++++++++++++++++++ chariots/op_store/_op_store.py | 12 +++- chariots/op_store/_op_store_client.py | 2 +- chariots/op_store/models/__init__.py | 9 +-- chariots/op_store/models/op.py | 13 +++- chariots/op_store/models/pipeline.py | 6 +- chariots/op_store/models/pipeline_link.py | 7 +- chariots/op_store/models/validated_link.py | 8 +-- chariots/op_store/models/version.py | 8 ++- 10 files changed, 120 insertions(+), 22 deletions(-) create mode 100644 chariots/metadata/__init__.py create mode 100644 chariots/metadata/metadata_server.py diff --git a/chariots/metadata/__init__.py b/chariots/metadata/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chariots/metadata/metadata_server.py b/chariots/metadata/metadata_server.py new file mode 100644 index 0000000..3ea82e7 --- /dev/null +++ b/chariots/metadata/metadata_server.py @@ -0,0 +1,77 @@ +from flask import Flask + +from chariots.op_store import models + +import graphene +from graphene import relay +from flask_graphql import GraphQLView +from graphene_sqlalchemy import SQLAlchemyObjectType, SQLAlchemyConnectionField + + +class Op(SQLAlchemyObjectType): + class Meta: + model = models.DBOp + interfaces = (relay.Node, ) + + +class Version(SQLAlchemyObjectType): + class Meta: + model = models.DBVersion + interfaces = (relay.Node, ) + + +class PipelineLink(SQLAlchemyObjectType): + class Meta: + model = models.DBPipelineLink + interfaces = (relay.Node, ) + + +class ValidatedLink(SQLAlchemyObjectType): + class Meta: + model = models.DBValidatedLink + interfaces = (relay.Node,) + + +class Pipeline(SQLAlchemyObjectType): + class Meta: + model = models.DBPipeline + interfaces = (relay.Node,) + + +class Query(graphene.ObjectType): + node = relay.Node.Field() + + op = relay.Node.Field(Op) + ops = SQLAlchemyConnectionField(Op) + + version = relay.Node.Field(Version) + versions = SQLAlchemyConnectionField(Version) + + pipelineLink = relay.Node.Field(PipelineLink) + pipelineLinks = SQLAlchemyConnectionField(PipelineLink) + + + + +schema = graphene.Schema(query=Query) + + +class MetadataServer: + def __init__(self, db_url='sqlite:///:memory:'): + self.flask = Flask('OpStoreServer') + self.flask.config['SQLALCHEMY_DATABASE_URI'] = db_url + self.flask.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + self.db = models.db # pylint: disable=invalid-name + self.db.app = self.flask + self.db.init_app(self.flask) + self._init_routes() + + def _init_routes(self): + self.flask.add_url_rule( + '/graphql', + view_func=GraphQLView.as_view( + 'graphql', + schema=schema, + graphiql=True # for having the GraphiQL interface + ) + ) diff --git a/chariots/op_store/_op_store.py b/chariots/op_store/_op_store.py index e04726d..035a37a 100644 --- a/chariots/op_store/_op_store.py +++ b/chariots/op_store/_op_store.py @@ -193,11 +193,19 @@ def register_valid_link(self): if downstream_op_name is None: return jsonify({}) downstream_op_id = self.get_or_register_db_op(downstream_op_name).id + exists_query = self._session.query(DBValidatedLink).filter( + DBValidatedLink.upstream_op_id == upstream_op_id, + DBValidatedLink.downstream_op_id == downstream_op_id, + upstream_version_id == upstream_version_id, + ) + if exists_query.one_or_none() is not None: + return jsonify({}) validated_link = DBValidatedLink( upstream_op_id=upstream_op_id, downstream_op_id=downstream_op_id, upstream_op_version_id=upstream_version_id ) + self._session.add(validated_link) self._session.commit() @@ -295,8 +303,8 @@ def register_new_pipeline(self): self._session.add(db_pipeline) self._session.commit() for upstream_node_name, downstream_node_name in pipeline_links: - upstream_op_id = self._get_db_op(upstream_node_name).id - downstream_op_id = self._get_db_op(downstream_node_name).id if downstream_node_name else None + upstream_op_id = self.get_or_register_db_op(upstream_node_name).id + downstream_op_id = self.get_or_register_db_op(downstream_node_name).id if downstream_node_name else None self._session.add( DBPipelineLink(pipeline_id=db_pipeline.id, upstream_op_id=upstream_op_id, downstream_op_id=downstream_op_id) diff --git a/chariots/op_store/_op_store_client.py b/chariots/op_store/_op_store_client.py index 4b6860c..3900daf 100644 --- a/chariots/op_store/_op_store_client.py +++ b/chariots/op_store/_op_store_client.py @@ -145,7 +145,7 @@ def __init__(self, url): def post(self, route, arguments_json): response = requests.post( - route, + self.url + route, headers={'Content-Type': 'application/json'}, data=json.dumps(arguments_json) ) diff --git a/chariots/op_store/models/__init__.py b/chariots/op_store/models/__init__.py index 2200016..6fa1c6a 100644 --- a/chariots/op_store/models/__init__.py +++ b/chariots/op_store/models/__init__.py @@ -3,11 +3,11 @@ from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() # pylint: disable=invalid-name -from .pipeline import DBPipeline # noqa -from .version import DBVersion # noqa from .validated_link import DBValidatedLink # noqa -from .op import DBOp # noqa +from .version import DBVersion # noqa from .pipeline_link import DBPipelineLink +from .pipeline import DBPipeline # noqa +from .op import DBOp # noqa __all__ = [ @@ -16,5 +16,6 @@ 'DBValidatedLink', 'DBValidatedLink', 'DBOp', - 'DBPipelineLink' + 'DBPipelineLink', + 'DBVersion' ] diff --git a/chariots/op_store/models/op.py b/chariots/op_store/models/op.py index 157d101..71cf01b 100644 --- a/chariots/op_store/models/op.py +++ b/chariots/op_store/models/op.py @@ -1,12 +1,23 @@ # pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods from sqlalchemy import Column, String, Integer +from sqlalchemy.orm import relationship -from ..models import db +from ..models import db, DBVersion, DBPipelineLink, DBValidatedLink class DBOp(db.Model): id = Column(Integer, primary_key=True) op_name = Column(String) + versions = relationship(DBVersion, backref='op') + upstream_links = relationship(DBPipelineLink, primaryjoin='DBOp.id==DBPipelineLink.downstream_op_id', + backref='downstream_op') + downstream_links = relationship(DBPipelineLink, primaryjoin='DBOp.id==DBPipelineLink.upstream_op_id', + backref='upstream_op') + upstream_validated_links = relationship(DBValidatedLink, primaryjoin='DBOp.id==DBValidatedLink.downstream_op_id', + backref='downstream_op') + downstream_validated_links = relationship(DBValidatedLink, primaryjoin='DBOp.id==DBValidatedLink.upstream_op_id', + backref='upstream_op') + def __repr__(self): return 'DBOp(id={}, op_name={})'.format(self.id, self.op_name) diff --git a/chariots/op_store/models/pipeline.py b/chariots/op_store/models/pipeline.py index 6f21170..98f7d59 100644 --- a/chariots/op_store/models/pipeline.py +++ b/chariots/op_store/models/pipeline.py @@ -1,11 +1,13 @@ # pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods from sqlalchemy import Column, Integer, ForeignKey, String +from sqlalchemy.orm import relationship -from .op import DBOp -from ..models import db +from ..models import db, DBPipelineLink class DBPipeline(db.Model): id = Column(Integer, primary_key=True) pipeline_name = Column(String) + + links = relationship(DBPipelineLink, backref='pipeline') diff --git a/chariots/op_store/models/pipeline_link.py b/chariots/op_store/models/pipeline_link.py index 188387c..5d7743a 100644 --- a/chariots/op_store/models/pipeline_link.py +++ b/chariots/op_store/models/pipeline_link.py @@ -1,12 +1,11 @@ from sqlalchemy import Column, Integer, ForeignKey -from . import DBPipeline, DBOp from ..models import db class DBPipelineLink(db.Model): id = Column(Integer, primary_key=True) - pipeline_id = Column(Integer, ForeignKey(DBPipeline.id)) - upstream_op_id = Column(Integer, ForeignKey(DBOp.id), nullable=False) - downstream_op_id = Column(Integer, ForeignKey(DBOp.id), nullable=True) + pipeline_id = Column(Integer, ForeignKey('db_pipeline.id')) + upstream_op_id = Column(Integer, ForeignKey('db_op.id'), nullable=False) + downstream_op_id = Column(Integer, ForeignKey('db_op.id'), nullable=True) diff --git a/chariots/op_store/models/validated_link.py b/chariots/op_store/models/validated_link.py index 085298b..f768f0d 100644 --- a/chariots/op_store/models/validated_link.py +++ b/chariots/op_store/models/validated_link.py @@ -1,16 +1,14 @@ # pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods from sqlalchemy import Column, Integer, ForeignKey -from .op import DBOp -from .version import DBVersion from ..models import db class DBValidatedLink(db.Model): id = Column(Integer, primary_key=True) - upstream_op_id = Column(Integer, ForeignKey(DBOp.id)) - downstream_op_id = Column(Integer, ForeignKey(DBOp.id)) - upstream_op_version_id = Column(Integer, ForeignKey(DBVersion.id)) + upstream_op_id = Column(Integer, ForeignKey('db_op.id')) + downstream_op_id = Column(Integer, ForeignKey('db_op.id')) + upstream_op_version_id = Column(Integer, ForeignKey('db_version.id')) def __repr__(self): return 'DBValidatedLink(id={}, upstream_op_id={}, downstream_op_id={}, upstream_op_version_id={})'.format( diff --git a/chariots/op_store/models/version.py b/chariots/op_store/models/version.py index 847e6fa..aaf64dd 100644 --- a/chariots/op_store/models/version.py +++ b/chariots/op_store/models/version.py @@ -1,15 +1,15 @@ # pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods from sqlalchemy import Column, String, Integer, ForeignKey, DateTime +from sqlalchemy.orm import relationship from chariots import versioning -from .op import DBOp -from ..models import db +from ..models import db, DBValidatedLink class DBVersion(db.Model): id = Column(Integer, primary_key=True) - op_id = Column(Integer, ForeignKey(DBOp.id)) + op_id = Column(Integer, ForeignKey('db_op.id')) version_time = Column(DateTime) major_hash = Column(String) @@ -21,6 +21,8 @@ class DBVersion(db.Model): patch_hash = Column(String) patch_version_number = Column(Integer) + validated_downstream_links = relationship(DBValidatedLink, backref='upstream_version') + def to_chariots_version(self): """converts DBVersion to equivalent `chariots.versioning.Version`""" return versioning.Version(self.major_hash, self.minor_hash, self.patch_hash, self.version_time) From ba44780a03af4353a7a07c72b7571f8772430c17 Mon Sep 17 00:00:00 2001 From: aredier Date: Thu, 30 Apr 2020 13:24:18 +0200 Subject: [PATCH 03/13] tests --- chariots/metadata/__init__.py | 5 + chariots/metadata/metadata_server.py | 110 +++---- chariots/op_store/_op_store.py | 4 +- .gitignore => tests/unit/.gitignore | 0 tests/unit/test_metadata_store.py | 432 +++++++++++++++++++++++++++ tests/unit/test_op_store.py | 62 ++-- 6 files changed, 540 insertions(+), 73 deletions(-) rename .gitignore => tests/unit/.gitignore (100%) create mode 100644 tests/unit/test_metadata_store.py diff --git a/chariots/metadata/__init__.py b/chariots/metadata/__init__.py index e69de29..6617463 100644 --- a/chariots/metadata/__init__.py +++ b/chariots/metadata/__init__.py @@ -0,0 +1,5 @@ +from .metadata_server import MetadataServer + +__all__ = [ + 'MetadataServer' +] diff --git a/chariots/metadata/metadata_server.py b/chariots/metadata/metadata_server.py index 3ea82e7..29d8454 100644 --- a/chariots/metadata/metadata_server.py +++ b/chariots/metadata/metadata_server.py @@ -1,69 +1,23 @@ + + from flask import Flask -from chariots.op_store import models +from chariots import op_store import graphene -from graphene import relay +from graphene import relay, String from flask_graphql import GraphQLView from graphene_sqlalchemy import SQLAlchemyObjectType, SQLAlchemyConnectionField - -class Op(SQLAlchemyObjectType): - class Meta: - model = models.DBOp - interfaces = (relay.Node, ) - - -class Version(SQLAlchemyObjectType): - class Meta: - model = models.DBVersion - interfaces = (relay.Node, ) - - -class PipelineLink(SQLAlchemyObjectType): - class Meta: - model = models.DBPipelineLink - interfaces = (relay.Node, ) - - -class ValidatedLink(SQLAlchemyObjectType): - class Meta: - model = models.DBValidatedLink - interfaces = (relay.Node,) - - -class Pipeline(SQLAlchemyObjectType): - class Meta: - model = models.DBPipeline - interfaces = (relay.Node,) - - -class Query(graphene.ObjectType): - node = relay.Node.Field() - - op = relay.Node.Field(Op) - ops = SQLAlchemyConnectionField(Op) - - version = relay.Node.Field(Version) - versions = SQLAlchemyConnectionField(Version) - - pipelineLink = relay.Node.Field(PipelineLink) - pipelineLinks = SQLAlchemyConnectionField(PipelineLink) - - - - -schema = graphene.Schema(query=Query) - - class MetadataServer: def __init__(self, db_url='sqlite:///:memory:'): - self.flask = Flask('OpStoreServer') + self.flask = Flask('metadarta') self.flask.config['SQLALCHEMY_DATABASE_URI'] = db_url self.flask.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False - self.db = models.db # pylint: disable=invalid-name + self.db = op_store.models.db # pylint: disable=invalid-name self.db.app = self.flask self.db.init_app(self.flask) + self.schema = self._build_schema() self._init_routes() def _init_routes(self): @@ -71,7 +25,55 @@ def _init_routes(self): '/graphql', view_func=GraphQLView.as_view( 'graphql', - schema=schema, + schema=self.schema, graphiql=True # for having the GraphiQL interface ) ) + + def _build_schema(self): + class Op(SQLAlchemyObjectType): + class Meta: + model = op_store.models.DBOp + filter_fields = ['op_name'] + interfaces = (relay.Node, ) + + class Version(SQLAlchemyObjectType): + class Meta: + model = op_store.models.DBVersion + interfaces = (relay.Node, ) + + class PipelineLink(SQLAlchemyObjectType): + class Meta: + model = op_store.models.DBPipelineLink + interfaces = (relay.Node, ) + + class ValidatedLink(SQLAlchemyObjectType): + class Meta: + model = op_store.models.DBValidatedLink + interfaces = (relay.Node,) + + class Pipeline(SQLAlchemyObjectType): + class Meta: + model = op_store.models.DBPipeline + interfaces = (relay.Node,) + + class Query(graphene.ObjectType): + node = relay.Node.Field() + + ops = SQLAlchemyConnectionField(Op, name=String()) + versions = SQLAlchemyConnectionField(Version) + pipelines = SQLAlchemyConnectionField(Pipeline) + + op = relay.Node.Field(Op) + pipeline = relay.Node.Field(Pipeline) + version = relay.Node.Field(Version) + pipelineLink = relay.Node.Field(PipelineLink) + validatedLink = relay.Node.Field(ValidatedLink) + + def resolve_ops(self, info, **args): + base_query = Op.get_query(info) + if 'name' in args: + base_query = base_query.filter(op_store.models.DBOp.op_name == args['name']) + return base_query.all() + + return graphene.Schema(query=Query) diff --git a/chariots/op_store/_op_store.py b/chariots/op_store/_op_store.py index 035a37a..98999ae 100644 --- a/chariots/op_store/_op_store.py +++ b/chariots/op_store/_op_store.py @@ -184,6 +184,7 @@ def register_valid_link(self): registers a link between an upstream and a downstream op. This means that in future relaods the downstream op will whitelist this version for this upstream op """ + old_db_link = self._session.query(DBValidatedLink) downstream_op_name = request.json['downstream_op_name'] upstream_op_name = request.json['upstream_op_name'] upstream_op_version = Version.parse(request.json['upstream_op_version']) @@ -196,8 +197,9 @@ def register_valid_link(self): exists_query = self._session.query(DBValidatedLink).filter( DBValidatedLink.upstream_op_id == upstream_op_id, DBValidatedLink.downstream_op_id == downstream_op_id, - upstream_version_id == upstream_version_id, + DBValidatedLink.upstream_op_version_id == upstream_version_id, ) + old_db_link = self._session.query(DBValidatedLink) if exists_query.one_or_none() is not None: return jsonify({}) validated_link = DBValidatedLink( diff --git a/.gitignore b/tests/unit/.gitignore similarity index 100% rename from .gitignore rename to tests/unit/.gitignore diff --git a/tests/unit/test_metadata_store.py b/tests/unit/test_metadata_store.py new file mode 100644 index 0000000..5eb678b --- /dev/null +++ b/tests/unit/test_metadata_store.py @@ -0,0 +1,432 @@ +import os +import datetime as dt +import random + +import pytest +from sqlalchemy.orm import sessionmaker + + +@pytest.fixture +def db_uri(tmpdir): + return 'sqlite:///{}'.format(os.path.join(str(tmpdir), 'test.db')) + + +@pytest.fixture() +def models(): + from chariots import op_store + return op_store.models + + +@pytest.fixture +def metadata_server(db_uri): + from chariots.metadata import MetadataServer + server = MetadataServer(db_uri) + server.db.create_all() + return server + + +@pytest.fixture +def metadata_schema(metadata_server): + return metadata_server.schema + + +@pytest.fixture +def session_cls(models): + return sessionmaker(bind=models.DBOp.query.session.bind) + + +@pytest.fixture +def op(session_cls, models): + session = session_cls(expire_on_commit=False) + op = models.DBOp( + op_name='test' + ) + session.add(op) + session.commit() + session.expunge_all() + return op + + +@pytest.fixture +def pipeline(session_cls, models): + session = session_cls(expire_on_commit=False) + pipeline = models.DBPipeline( + pipeline_name='test-pipeline' + ) + session.add(pipeline) + session.commit() + return pipeline + + +@pytest.fixture +def pipeline_link(session_cls, pipeline, op, models): + session = session_cls(expire_on_commit=False) + link = models.DBPipelineLink( + pipeline_id=pipeline.id, + upstream_op_id=op.id, + downstream_op_id=None, + ) + session.add(link) + session.commit() + return session + + +@pytest.fixture +def version(session_cls, op, models): + session = session_cls(expire_on_commit=False) + version = models.DBVersion( + op_id=op.id, + version_time=dt.datetime.utcnow(), + major_hash='0', + major_version_number=0, + minor_hash='1', + minor_version_number=1, + patch_hash='0', + patch_version_number='0' + ) + session.add(version) + session.commit() + session.expunge_all() + return version + + +@pytest.fixture +def complex_ops(session_cls, models): + op_1 = models.DBOp(op_name='op-1') + op_2 = models.DBOp(op_name='op-2') + op_3 = models.DBOp(op_name='op-3') + op_4 = models.DBOp(op_name='op-4') + + session = session_cls(expire_on_commit=False) + session.add(op_1) + session.add(op_2) + session.add(op_3) + session.add(op_4) + session.commit() + return { + 1: op_1, + 2: op_2, + 3: op_3, + 4: op_4, + } + + +@pytest.fixture +def complex_versions(session_cls, complex_ops, models): + session = session_cls(expire_on_commit=False) + versions = {} + for op_number, op in complex_ops.items(): + op_version = models.DBVersion( + op_id=op.id, + major_hash=str(random.randint(0, 10)), + minor_hash=str(random.randint(0, 10)), + patch_hash=str(random.randint(0, 10)), + major_version_number=random.randint(0, 10), + minor_version_number=random.randint(0, 10), + patch_version_number=random.randint(0, 10), + ) + session.add(op_version) + versions[op_number] = op_version + session.commit() + return versions + + +@pytest.fixture +def complex_links(session_cls, complex_ops, pipeline, models): + session = session_cls() + link_1 = models.DBPipelineLink(pipeline_id=pipeline.id, upstream_op_id=complex_ops[1].id, + downstream_op_id=complex_ops[2].id) + session.add(link_1) + link_2 = models.DBPipelineLink(pipeline_id=pipeline.id, upstream_op_id=complex_ops[1].id, + downstream_op_id=complex_ops[3].id) + session.add(link_2) + link_3 = models.DBPipelineLink(pipeline_id=pipeline.id, upstream_op_id=complex_ops[2].id, + downstream_op_id=complex_ops[4].id) + session.add(link_3) + link_4 = models.DBPipelineLink(pipeline_id=pipeline.id, upstream_op_id=complex_ops[3].id, + downstream_op_id=complex_ops[4].id) + session.add(link_4) + session.commit() + return { + 1: link_1, + 2: link_2, + 3: link_3, + 4: link_4, + } + + +@pytest.fixture +def complex_validated_links(session_cls, complex_versions, complex_ops, pipeline, models): + session = session_cls() + link_1 = models.DBValidatedLink(upstream_op_id=complex_ops[1].id, + downstream_op_id=complex_ops[2].id, upstream_op_version_id=complex_versions[1].id) + session.add(link_1) + link_2 = models.DBValidatedLink(upstream_op_id=complex_ops[1].id, + downstream_op_id=complex_ops[3].id, upstream_op_version_id=complex_versions[1].id) + session.add(link_2) + link_3 = models.DBValidatedLink(upstream_op_id=complex_ops[2].id, downstream_op_id=complex_ops[4].id, + upstream_op_version_id=complex_versions[2].id) + session.add(link_3) + link_4 = models.DBValidatedLink(upstream_op_id=complex_ops[3].id, downstream_op_id=complex_ops[4].id, + upstream_op_version_id=complex_versions[3].id) + session.add(link_4) + session.commit() + return { + 1: link_1, + 2: link_2, + 3: link_3, + 4: link_4, + } + + +def clean_db(session_cls, models): + session = session_cls() + session.query(models.DBValidatedLink).delete() + session.query(models.DBVersion).delete() + session.query(models.DBPipelineLink).delete() + session.query(models.DBPipeline).delete() + session.query(models.DBOp).delete() + session.commit() + + +def test_op_query(metadata_schema, op, db_uri, session_cls, models): + # clean_db(session_cls, models) + metadata_ops = metadata_schema.execute(""" +query{ + ops { + edges { + node { + id + opName + } + } + } +} +""") + assert len(metadata_ops.data['ops']['edges']) == 1 + assert metadata_ops.data['ops']['edges'][0]['node']['opName'] == op.op_name + graphql_op_id = metadata_ops.data['ops']['edges'][0]['node']['id'] + # raise + + direct_query = metadata_schema.execute(""" +query getOp ($id: ID!){ + op (id: $id){ + id + opName + } +}""", variables={'id': graphql_op_id}) + assert direct_query.data['op']['opName'] == op.op_name + clean_db(session_cls, models) + + +def test_version_query(metadata_schema, op, version, db_uri, models, session_cls): + all_versions = metadata_schema.execute(""" +query{ + versions { + edges{ + node { + majorHash + minorHash + patchHash + op{ + id + opName + } + } + } + } +} + """) + + assert len(all_versions.data['versions']['edges']) == 1 + assert all_versions.data['versions']['edges'][0]['node']['majorHash'] == version.major_hash + assert all_versions.data['versions']['edges'][0]['node']['minorHash'] == version.minor_hash + assert all_versions.data['versions']['edges'][0]['node']['patchHash'] == version.patch_hash + assert all_versions.data['versions']['edges'][0]['node']['op']['opName'] == op.op_name + op_id = all_versions.data['versions']['edges'][0]['node']['op']['id'] + + op_query = metadata_schema.execute(""" +query getOp ($id: ID!){ + op (id: $id){ + id + opName + versions{ + edges{ + node { + majorHash + minorHash + patchHash + } + } + } + } +} + """, variables={'id': op_id}) + assert op_query.data['op']['opName'] == op.op_name + assert len(op_query.data['op']['versions']['edges']) == 1 + assert op_query.data['op']['versions']['edges'][0]['node']['majorHash'] == version.major_hash + assert op_query.data['op']['versions']['edges'][0]['node']['minorHash'] == version.minor_hash + assert op_query.data['op']['versions']['edges'][0]['node']['patchHash'] == version.patch_hash + clean_db(session_cls, models) + + +pipelines_query_str = """ +query { + pipelines{ + edges{ + node{ + id + pipelineName + links{ + edges{ + node{ + upstreamOp{ + opName + } + downstreamOp{ + opName + } + } + } + } + } + } + } +} +""" + +single_pipeline_query_str = """ +query getPipeline($id: ID!){ + pipeline(id: $id){ + pipelineName + links{ + edges{ + node{ + upstreamOp{ + opName + } + downstreamOp{ + opName + } + } + } + } + } +} +""" + + +def test_pipeline_query(metadata_schema, op, pipeline, pipeline_link, models, session_cls): + # querying all the pipelines + pipeline_query = metadata_schema.execute(pipelines_query_str) + assert not pipeline_query.errors + assert len(pipeline_query.data['pipelines']['edges']) == 1 + assert pipeline_query.data['pipelines']['edges'][0]['node']['pipelineName'] == pipeline.pipeline_name + assert len(pipeline_query.data['pipelines']['edges'][0]['node']['links']['edges']) == 1 + assert pipeline_query.data['pipelines']['edges'][0]['node']['links']['edges'][0]['node']['upstreamOp'][ + 'opName'] == op.op_name + assert pipeline_query.data['pipelines']['edges'][0]['node']['links']['edges'][0]['node']['downstreamOp'] is None + pipeline_id = pipeline_query.data['pipelines']['edges'][0]['node']['id'] + + # direct query + direct_query = metadata_schema.execute(single_pipeline_query_str, variable_values={'id': pipeline_id}) + + assert not direct_query.errors + assert direct_query.data['pipeline']['pipelineName'] == pipeline.pipeline_name + assert len(direct_query.data['pipeline']['links']['edges']) == 1 + assert direct_query.data['pipeline']['links']['edges'][0]['node']['upstreamOp']['opName'] == op.op_name + assert direct_query.data['pipeline']['links']['edges'][0]['node']['downstreamOp'] is None + clean_db(session_cls, models) + + +def test_pipeline_links_multiple_links(metadata_schema, pipeline, complex_links, complex_ops, models, session_cls): + # querying all the pipelines + + pipeline_query = metadata_schema.execute(pipelines_query_str) + assert not pipeline_query.errors + assert len(pipeline_query.data['pipelines']['edges']) == 1 + assert pipeline_query.data['pipelines']['edges'][0]['node']['pipelineName'] == pipeline.pipeline_name + assert len(pipeline_query.data['pipelines']['edges'][0]['node']['links']['edges']) == 4 + pipeline_id = pipeline_query.data['pipelines']['edges'][0]['node']['id'] + + def do_link_test(links, complex_ops): + links = { + (link['node']['upstreamOp']['opName'], link['node']['downstreamOp']['opName']) + for link in links + } + assert links == { + (complex_ops[1].op_name, complex_ops[2].op_name), + (complex_ops[1].op_name, complex_ops[3].op_name), + (complex_ops[2].op_name, complex_ops[4].op_name), + (complex_ops[3].op_name, complex_ops[4].op_name), + } + + do_link_test(pipeline_query.data['pipelines']['edges'][0]['node']['links']['edges'], complex_ops) + # direct query + direct_query = metadata_schema.execute(single_pipeline_query_str, variable_values={'id': pipeline_id}) + + assert not direct_query.errors + assert direct_query.data['pipeline']['pipelineName'] == pipeline.pipeline_name + assert len(direct_query.data['pipeline']['links']['edges']) == 4 + do_link_test(direct_query.data['pipeline']['links']['edges'], complex_ops) + clean_db(session_cls, models) + + +def test_pipeline_validated_links_multiple_versions(metadata_schema, complex_ops, pipeline, complex_versions, + complex_validated_links, session_cls, models): + + filtered_op = metadata_schema.execute(""" +query getOp($name: String!){ + ops (name: $name) { + edges { + node { + id + opName + upstreamValidatedLinks { + edges { + node { + id + upstreamOp {opName} + upstreamVersion { + majorHash + minorHash + patchHash + majorVersionNumber + minorVersionNumber + patchVersionNumber + } + } + } + } + } + } + } +} + """, variable_values={'name': complex_ops[4].op_name}) + assert not filtered_op.errors + assert len(filtered_op.data['ops']['edges']) == 1 + assert filtered_op.data['ops']['edges'][0]['node']['opName'] == complex_ops[4].op_name + assert len(filtered_op.data['ops']['edges'][0]['node']['upstreamValidatedLinks']['edges']) == 2 + validated_links = { + link['node']['upstreamOp']['opName']: link['node'] + for link in filtered_op.data['ops']['edges'][0]['node']['upstreamValidatedLinks']['edges'] + } + assert set(validated_links) == {complex_ops[2].op_name, complex_ops[3].op_name} + + def do_upstream_version_test(validated_links, op_n, complex_versions): + assert validated_links[complex_ops[op_n].op_name]['upstreamVersion']['minorHash'] == \ + complex_versions[op_n].minor_hash + assert validated_links[complex_ops[op_n].op_name]['upstreamVersion']['majorHash'] == \ + complex_versions[op_n].major_hash + assert validated_links[complex_ops[op_n].op_name]['upstreamVersion']['patchHash'] == \ + complex_versions[op_n].patch_hash + assert validated_links[complex_ops[op_n].op_name]['upstreamVersion']['majorVersionNumber'] == \ + complex_versions[op_n].major_version_number + assert validated_links[complex_ops[op_n].op_name]['upstreamVersion']['minorVersionNumber'] == \ + complex_versions[op_n].minor_version_number + assert validated_links[complex_ops[op_n].op_name]['upstreamVersion']['patchVersionNumber'] == complex_versions[ + op_n].patch_version_number + + do_upstream_version_test(validated_links, 2, complex_versions) + do_upstream_version_test(validated_links, 3, complex_versions) + clean_db(session_cls, models) diff --git a/tests/unit/test_op_store.py b/tests/unit/test_op_store.py index c484c7d..12c5469 100644 --- a/tests/unit/test_op_store.py +++ b/tests/unit/test_op_store.py @@ -6,10 +6,7 @@ from sqlalchemy import create_engine, alias from sqlalchemy.orm import sessionmaker -from chariots import versioning -from chariots.pipelines import Pipeline, nodes, ops -from chariots.op_store import models -from chariots.testing import TestOpStoreClient +from chariots.pipelines import ops class FakeOp: @@ -17,12 +14,19 @@ class FakeOp: def __init__(self, name): self.name = name + class DumbOp(ops.BaseOp): def execute(self): return 'Samwise Gamgee' +@pytest.fixture +def models(): + # hack around the fact that models are defined at import + from chariots.op_store import models + return models + @pytest.fixture def root_path(tmpdir): return str(tmpdir) @@ -30,13 +34,14 @@ def root_path(tmpdir): @pytest.fixture def op_store_client(root_path: str): + from chariots.testing import TestOpStoreClient client = TestOpStoreClient(path=root_path) client.server.db.create_all() return client @pytest.fixture -def db_uri(op_store_client: TestOpStoreClient): +def db_uri(op_store_client): return 'sqlite:///{}'.format(op_store_client.db_path) @pytest.fixture @@ -49,8 +54,9 @@ def session_func(db_uri: str): engine = create_engine(db_uri) return sessionmaker(bind=engine) +def test_register_valid_link_no_ops(op_store_client, session_func: sessionmaker, models): + from chariots import versioning -def test_register_valid_link_no_ops(op_store_client: TestOpStoreClient, session_func: sessionmaker): downstream_op_name = 'downstream' upstream_op_name = 'upstream' upstream_version = versioning.Version() @@ -60,6 +66,7 @@ def test_register_valid_link_no_ops(op_store_client: TestOpStoreClient, session_ session = session_func() ops = list(session.query(models.DBOp)) + pytest.skip() assert len(ops) == 2 assert set(op.op_name for op in ops) == {downstream_op_name, upstream_op_name} upstream_op = [op for op in ops if op.op_name == upstream_op_name][0] @@ -84,8 +91,8 @@ def test_register_valid_link_no_ops(op_store_client: TestOpStoreClient, session_ assert link.upstream_op_version_id == version.id -def test_register_validated_lin_ops_present(op_store_client: TestOpStoreClient, session_func: sessionmaker): - +def test_register_validated_link_ops_present(op_store_client, session_func: sessionmaker, models): + from chariots import versioning # creating first link downstream_op_name = 'downstream' upstream_op_name = 'upstream' @@ -96,11 +103,14 @@ def test_register_validated_lin_ops_present(op_store_client: TestOpStoreClient, session = session_func() old_db_version = session.query(models.DBVersion).one() old_db_link = session.query(models.DBValidatedLink).one() + assert old_db_version.major_version_number == 1 + print('queried link', old_db_link.upstream_op_version_id) # creating new version and updating link new_version = upstream_version + versioning.Version() assert new_version > upstream_version op_store_client.register_valid_link(downstream_op_name, upstream_op_name, upstream_op_version=new_version) + print('registered') # testing new link ops = list(session.query(models.DBOp)) @@ -113,6 +123,7 @@ def test_register_validated_lin_ops_present(op_store_client: TestOpStoreClient, assert len(versions) == 2 assert set(version.op_id for version in versions) == {upstream_op.id} new_db_version = session.query(models.DBVersion).filter(models.DBVersion.id != old_db_version.id).one() + print(new_db_version.id) assert new_db_version.major_hash == new_version.major assert new_db_version.minor_hash == new_version.minor @@ -122,6 +133,7 @@ def test_register_validated_lin_ops_present(op_store_client: TestOpStoreClient, assert new_db_version.minor_version_number == 0 links = list(session.query(models.DBValidatedLink)) + print(links) assert len(links) == 2 for link in links: assert link.upstream_op_id == upstream_op.id @@ -130,7 +142,9 @@ def test_register_validated_lin_ops_present(op_store_client: TestOpStoreClient, assert new_db_link.upstream_op_version_id == new_db_version.id -def test_get_validated_links(op_store_client: TestOpStoreClient): +def test_get_validated_links(op_store_client): + + from chariots import versioning # creating some links first_version = versioning.Version() @@ -150,7 +164,9 @@ def test_get_validated_links(op_store_client: TestOpStoreClient): assert validated_links == {first_version, new_upstream_version} -def test_save_op_bytes(op_store_client: TestOpStoreClient, ops_path): +def test_save_op_bytes(op_store_client, ops_path): + + from chariots import versioning op_bytes = 'one ring to bind them all and in the darkness bring them'.encode('utf-8') version = versioning.Version() @@ -171,7 +187,9 @@ def test_save_op_bytes(op_store_client: TestOpStoreClient, ops_path): assert op_bytes_file.read() == op_bytes -def test_get_saved_op_bytes(op_store_client: TestOpStoreClient): +def test_get_saved_op_bytes(op_store_client): + + from chariots import versioning op_bytes = 'one ring to bind them all and in the darkness bring them'.encode('utf-8') version = versioning.Version() @@ -190,12 +208,14 @@ def test_get_saved_op_bytes(op_store_client: TestOpStoreClient): @pytest.fixture def small_test_pipeline(): + from chariots.pipelines import Pipeline, nodes + return Pipeline([ nodes.Node(DumbOp(), output_nodes=['__pipeline_output__']) ], name='cooking_pipeline') -def get_all_links(session, pipeline_id): +def get_all_links(session, pipeline_id, models): links = session.query( models.DBPipelineLink, models.DBOp ).filter( @@ -213,8 +233,10 @@ def get_all_links(session, pipeline_id): return res -def test_register_new_pipeline(op_store_client: TestOpStoreClient, small_test_pipeline: Pipeline, - session_func: sessionmaker): +def test_register_new_pipeline(op_store_client, small_test_pipeline, + session_func: sessionmaker, models): + + from chariots import versioning # registering the op and the pipeline version = versioning.Version() @@ -227,14 +249,16 @@ def test_register_new_pipeline(op_store_client: TestOpStoreClient, small_test_pi db_pipeline = session.query(models.DBPipeline).one() assert db_pipeline.pipeline_name == 'cooking_pipeline' - all_ops = get_all_links(session, db_pipeline.id) + all_ops = get_all_links(session, db_pipeline.id, models) assert len(all_ops) == 1 op_link, upstream_op_name, downstream_op_name = all_ops[0] assert upstream_op_name == db_op.op_name assert downstream_op_name == None -def test_register_new_pipeline_complex_dag(op_store_client: TestOpStoreClient, session_func: sessionmaker): +def test_register_new_pipeline_complex_dag(op_store_client, session_func: sessionmaker, models): + + from chariots.pipelines import Pipeline, nodes # defining the necesasry op and pipeline class OpwithInput(ops.BaseOp): @@ -263,7 +287,7 @@ def execute(self, input1='sad', input2=None): db_pipeline = session.query(models.DBPipeline).one() assert db_pipeline.pipeline_name == 'complex_dag_pipeline' - all_links = get_all_links(session, db_pipeline.id) + all_links = get_all_links(session, db_pipeline.id, models) assert len(all_links) == 3 assert Counter([(link[1], link[2]) for link in all_links]) == { ('opwithinput', 'opwithinput'): 2, @@ -272,7 +296,9 @@ def execute(self, input1='sad', input2=None): -def test_pipeline_exists(op_store_client: TestOpStoreClient, small_test_pipeline: Pipeline): +def test_pipeline_exists(op_store_client, small_test_pipeline): + + from chariots import versioning assert not op_store_client.pipeline_exists(small_test_pipeline.name) version = versioning.Version() From 9e546efc647486206fb623ad13afc53bdabc3453 Mon Sep 17 00:00:00 2001 From: aredier Date: Thu, 30 Apr 2020 14:01:24 +0200 Subject: [PATCH 04/13] lint --- chariots/metadata/__init__.py | 6 +++++ chariots/metadata/metadata_server.py | 29 ++++++++++++++++++----- chariots/op_store/_op_store.py | 2 -- chariots/op_store/_op_store_client.py | 1 - chariots/op_store/models/pipeline.py | 2 +- chariots/op_store/models/pipeline_link.py | 4 +++- 6 files changed, 33 insertions(+), 11 deletions(-) diff --git a/chariots/metadata/__init__.py b/chariots/metadata/__init__.py index 6617463..3a2a339 100644 --- a/chariots/metadata/__init__.py +++ b/chariots/metadata/__init__.py @@ -1,3 +1,9 @@ +""" +the metadata module allows you to create and read metadata that will consumed through +a graphql api +""" + + from .metadata_server import MetadataServer __all__ = [ diff --git a/chariots/metadata/metadata_server.py b/chariots/metadata/metadata_server.py index 29d8454..ec81df0 100644 --- a/chariots/metadata/metadata_server.py +++ b/chariots/metadata/metadata_server.py @@ -1,15 +1,20 @@ - +""" +module to handle the metadata server +""" +# pylint: disable=too-few-public-methods from flask import Flask - -from chariots import op_store - import graphene from graphene import relay, String from flask_graphql import GraphQLView from graphene_sqlalchemy import SQLAlchemyObjectType, SQLAlchemyConnectionField +from chariots import op_store + class MetadataServer: + """ + The Metadata server allows you to store and retrieve information about your pipelines, versions and runs + """ def __init__(self, db_url='sqlite:///:memory:'): self.flask = Flask('metadarta') self.flask.config['SQLALCHEMY_DATABASE_URI'] = db_url @@ -30,34 +35,45 @@ def _init_routes(self): ) ) - def _build_schema(self): + def _build_schema(self): # pylint: disable=no-self-use class Op(SQLAlchemyObjectType): + """graphql op""" class Meta: + """metadata of the graphql object type""" model = op_store.models.DBOp filter_fields = ['op_name'] interfaces = (relay.Node, ) class Version(SQLAlchemyObjectType): + """graphql version""" class Meta: + """metadata of the graphql object type""" model = op_store.models.DBVersion interfaces = (relay.Node, ) class PipelineLink(SQLAlchemyObjectType): + """graphql pipeline link """ class Meta: + """metadata of the graphql object type""" model = op_store.models.DBPipelineLink interfaces = (relay.Node, ) class ValidatedLink(SQLAlchemyObjectType): + """graphql validated link""" class Meta: + """metadata of the graphql object type""" model = op_store.models.DBValidatedLink interfaces = (relay.Node,) class Pipeline(SQLAlchemyObjectType): + """graphql pipeline""" class Meta: + """metadata of the graphql object type""" model = op_store.models.DBPipeline interfaces = (relay.Node,) class Query(graphene.ObjectType): + """graphql query""" node = relay.Node.Field() ops = SQLAlchemyConnectionField(Op, name=String()) @@ -70,7 +86,8 @@ class Query(graphene.ObjectType): pipelineLink = relay.Node.Field(PipelineLink) validatedLink = relay.Node.Field(ValidatedLink) - def resolve_ops(self, info, **args): + def resolve_ops(self, info, **args): # pylint: disable=no-self-use + """resolve ops using the query if present""" base_query = Op.get_query(info) if 'name' in args: base_query = base_query.filter(op_store.models.DBOp.op_name == args['name']) diff --git a/chariots/op_store/_op_store.py b/chariots/op_store/_op_store.py index 98999ae..0a6c509 100644 --- a/chariots/op_store/_op_store.py +++ b/chariots/op_store/_op_store.py @@ -184,7 +184,6 @@ def register_valid_link(self): registers a link between an upstream and a downstream op. This means that in future relaods the downstream op will whitelist this version for this upstream op """ - old_db_link = self._session.query(DBValidatedLink) downstream_op_name = request.json['downstream_op_name'] upstream_op_name = request.json['upstream_op_name'] upstream_op_version = Version.parse(request.json['upstream_op_version']) @@ -199,7 +198,6 @@ def register_valid_link(self): DBValidatedLink.downstream_op_id == downstream_op_id, DBValidatedLink.upstream_op_version_id == upstream_version_id, ) - old_db_link = self._session.query(DBValidatedLink) if exists_query.one_or_none() is not None: return jsonify({}) validated_link = DBValidatedLink( diff --git a/chariots/op_store/_op_store_client.py b/chariots/op_store/_op_store_client.py index 3900daf..3958dd3 100644 --- a/chariots/op_store/_op_store_client.py +++ b/chariots/op_store/_op_store_client.py @@ -132,7 +132,6 @@ def register_new_pipeline(self, pipeline: 'pipelines.Pipeline'): for upstream_node, downstream_node in pipeline.get_all_op_links() ] }) - return class OpStoreClient(BaseOpStoreClient): diff --git a/chariots/op_store/models/pipeline.py b/chariots/op_store/models/pipeline.py index 98f7d59..5e132b4 100644 --- a/chariots/op_store/models/pipeline.py +++ b/chariots/op_store/models/pipeline.py @@ -1,5 +1,5 @@ # pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods -from sqlalchemy import Column, Integer, ForeignKey, String +from sqlalchemy import Column, Integer, String from sqlalchemy.orm import relationship from ..models import db, DBPipelineLink diff --git a/chariots/op_store/models/pipeline_link.py b/chariots/op_store/models/pipeline_link.py index 5d7743a..e1bd335 100644 --- a/chariots/op_store/models/pipeline_link.py +++ b/chariots/op_store/models/pipeline_link.py @@ -1,9 +1,11 @@ +"""pipeline link""" from sqlalchemy import Column, Integer, ForeignKey from ..models import db -class DBPipelineLink(db.Model): +class DBPipelineLink(db.Model): # pylint: disable=too-few-public-methods + """a pipeline link is a link inside of a pipeline's DAG""" id = Column(Integer, primary_key=True) pipeline_id = Column(Integer, ForeignKey('db_pipeline.id')) From 340bd678fc657818ff6460aed0fbe0af59ee10c4 Mon Sep 17 00:00:00 2001 From: aredier Date: Thu, 30 Apr 2020 15:34:34 +0200 Subject: [PATCH 05/13] small fixex --- chariots/metadata/metadata_server.py | 1 + chariots/op_store/models/__init__.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/chariots/metadata/metadata_server.py b/chariots/metadata/metadata_server.py index ec81df0..c894deb 100644 --- a/chariots/metadata/metadata_server.py +++ b/chariots/metadata/metadata_server.py @@ -11,6 +11,7 @@ from chariots import op_store + class MetadataServer: """ The Metadata server allows you to store and retrieve information about your pipelines, versions and runs diff --git a/chariots/op_store/models/__init__.py b/chariots/op_store/models/__init__.py index 6fa1c6a..6fa9cc8 100644 --- a/chariots/op_store/models/__init__.py +++ b/chariots/op_store/models/__init__.py @@ -1,5 +1,6 @@ """the module for all the DB models of the op store""" -# pylint: disable=wrong-import-position, missing-module-docstring, missing-class-docstring, too-few-public-methods +# pylint: disable=wrong-import-position, missing-module-docstring, missing-class-docstring, too-few-public-methods; # noqa +# flake8: noqa from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() # pylint: disable=invalid-name From 3612f21b22e28f556a95892f7622e719d7ad0e30 Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 19:13:00 +0200 Subject: [PATCH 06/13] updated requirements --- requirements.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/requirements.txt b/requirements.txt index 22fcb3a..ae496cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,9 +8,12 @@ dill==0.2.9 flake8==3.5.0 flaky==3.6.1 flask==1.0.3 +Flask-GraphQL>=2.0.1 flask-migrate>=2.5.3 flask-sqlalchemy>=2.4.1 google-cloud-storage==1.26.0 +graphene-sqlalchemy>=2.2.2 +graphene>=2.1.8 Keras>=2.0.0 # ml stuff From e09656eae3220b16b1f0aaec54f8f9162bed2ad2 Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 20:03:16 +0200 Subject: [PATCH 07/13] linting problemn --- chariots/_helpers/test_helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chariots/_helpers/test_helpers.py b/chariots/_helpers/test_helpers.py index be244e4..ef15fcd 100644 --- a/chariots/_helpers/test_helpers.py +++ b/chariots/_helpers/test_helpers.py @@ -1,3 +1,4 @@ +# pylint: disable=no-member """ module that provides importable and picklable ops for tests that need them """ From a93e8fe00c2c526e68678033683e1ed3dc2418bc Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 20:29:22 +0200 Subject: [PATCH 08/13] requirement problem with tensorflow --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ae496cb..5fc77d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,7 +31,7 @@ scikit-learn==0.21.2 sphinx==2.2 sphinx_rtd_theme==0.4.3 sqlalchemy>=1.3.15 -tensorflow>1.12.0 +tensorflow>=2.2 tox==3.5.2 watchdog==0.9.0 wheel==0.32.1 From 3453180bc2599b17916e913bf62a1a877f8682e5 Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 21:58:48 +0200 Subject: [PATCH 09/13] bug fix with tensorflow --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 5fc77d6..9f32add 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,7 +31,7 @@ scikit-learn==0.21.2 sphinx==2.2 sphinx_rtd_theme==0.4.3 sqlalchemy>=1.3.15 -tensorflow>=2.2 +tensorflow==2.2 tox==3.5.2 watchdog==0.9.0 wheel==0.32.1 From 401d45669a2feeb2d7ca44d52bbdecea082f0662 Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 22:07:43 +0200 Subject: [PATCH 10/13] bug fix with tensorflow --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9f32add..df70474 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,7 +31,7 @@ scikit-learn==0.21.2 sphinx==2.2 sphinx_rtd_theme==0.4.3 sqlalchemy>=1.3.15 -tensorflow==2.2 +tensorflow==2.0.0 tox==3.5.2 watchdog==0.9.0 wheel==0.32.1 From abd8c0a007282547b9babb06f2732754a1ad052c Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 22:32:32 +0200 Subject: [PATCH 11/13] getting rid of tox in deployment --- .travis.yml | 12 +++++------- tox.ini | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index acbc57e..c3fb1ad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,19 +3,17 @@ python: - 3.7 - 3.6 - 3.5 -matrix: - include: - - python: 3.7 - dist: xenial - sudo: true services: - redis-server install: - - pip install -U tox-travis - pip install pylint - pip install flake8 + - pip install -r setup_requirements.txt + - pip install -r requirements.txt script: - - tox + - flake8 chariots + - pylint chariots + - py.test chariots tests docs --doctest-modules --doctest-glob='*.rst' deploy: provider: pypi distributions: sdist bdist_wheel diff --git a/tox.ini b/tox.ini index db59000..c8f6e14 100644 --- a/tox.ini +++ b/tox.ini @@ -22,7 +22,7 @@ deps = -r{toxinidir}/setup_requirements.txt -r{toxinidir}/requirements.txt commands = - pip install -U pip + pip3 install -U pip flake8 chariots pylint chariots py.test chariots tests docs --doctest-modules --doctest-glob='*.rst' From fb3920b3ceb75c6b46f3b526740522aff37c1469 Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 22:39:02 +0200 Subject: [PATCH 12/13] using tf.keras --- chariots/_helpers/doc_utils.py | 2 +- chariots/_helpers/test_helpers.py | 2 +- chariots/ml/keras/_keras_op.py | 2 +- tests/unit/ml/test_keras.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chariots/_helpers/doc_utils.py b/chariots/_helpers/doc_utils.py index 6e9893a..5591bd0 100644 --- a/chariots/_helpers/doc_utils.py +++ b/chariots/_helpers/doc_utils.py @@ -4,7 +4,7 @@ """ from collections import Counter -from keras.utils import to_categorical +from tensorflow.keras.utils import to_categorical from sklearn import datasets from sklearn.decomposition import PCA from sklearn.linear_model import LogisticRegression diff --git a/chariots/_helpers/test_helpers.py b/chariots/_helpers/test_helpers.py index ef15fcd..b1b83e6 100644 --- a/chariots/_helpers/test_helpers.py +++ b/chariots/_helpers/test_helpers.py @@ -6,7 +6,7 @@ import time import numpy as np -from keras import callbacks, models, layers, optimizers +from tensorflow.keras import callbacks, models, layers, optimizers from sklearn.decomposition import PCA from sklearn.linear_model import LinearRegression diff --git a/chariots/ml/keras/_keras_op.py b/chariots/ml/keras/_keras_op.py index 69a81b6..f5b7a9b 100644 --- a/chariots/ml/keras/_keras_op.py +++ b/chariots/ml/keras/_keras_op.py @@ -26,7 +26,7 @@ class KerasOp(BaseMLOp): >>> from chariots.pipelines.nodes import Node >>> from chariots.ml import MLMode >>> from chariots.versioning import VersionType, VersionedFieldDict - >>> from keras import models, layers + >>> from tensorflow.keras import models, layers ... ... >>> class KerasLinear(KerasOp): diff --git a/tests/unit/ml/test_keras.py b/tests/unit/ml/test_keras.py index 13ac7bc..94adf09 100644 --- a/tests/unit/ml/test_keras.py +++ b/tests/unit/ml/test_keras.py @@ -1,7 +1,7 @@ """module that tests the Keras integration""" import numpy as np from flaky import flaky -from keras import models, layers, optimizers, callbacks +from tensorflow.keras import models, layers, optimizers, callbacks from chariots.pipelines import Pipeline, PipelinesServer from chariots.pipelines.nodes import Node From 4c42be28f01d190f129ee518f5af4a6bb7e8ca93 Mon Sep 17 00:00:00 2001 From: aredier Date: Sun, 28 Jun 2020 23:04:55 +0200 Subject: [PATCH 13/13] using keras serilizer --- chariots/ml/keras/_keras_op.py | 2 ++ chariots/ml/serializers/__init__.py | 2 ++ chariots/ml/serializers/_keras_serializer.py | 24 ++++++++++++++++++++ tests/unit/ml/test_keras.py | 4 ++-- 4 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 chariots/ml/serializers/_keras_serializer.py diff --git a/chariots/ml/keras/_keras_op.py b/chariots/ml/keras/_keras_op.py index f5b7a9b..d5a561f 100644 --- a/chariots/ml/keras/_keras_op.py +++ b/chariots/ml/keras/_keras_op.py @@ -5,6 +5,7 @@ from ...versioning import VersionedFieldDict from .. import MLMode, BaseMLOp +from ..serializers import KerasSerializer class KerasOp(BaseMLOp): @@ -89,6 +90,7 @@ class KerasOp(BaseMLOp): """ input_params = VersionedFieldDict() + serializer_cls = KerasSerializer def __init__(self, mode: MLMode, verbose: Optional[int] = 1): super().__init__(mode) diff --git a/chariots/ml/serializers/__init__.py b/chariots/ml/serializers/__init__.py index 7fc7f6b..4d8a03a 100644 --- a/chariots/ml/serializers/__init__.py +++ b/chariots/ml/serializers/__init__.py @@ -30,10 +30,12 @@ from ._csv_serialzer import CSVSerializer from ._dill_serializer import DillSerializer from ._json_serializer import JSONSerializer +from ._keras_serializer import KerasSerializer __all__ = [ 'BaseSerializer', 'DillSerializer', 'JSONSerializer', 'CSVSerializer', + 'KerasSerializer' ] diff --git a/chariots/ml/serializers/_keras_serializer.py b/chariots/ml/serializers/_keras_serializer.py new file mode 100644 index 0000000..27f32e0 --- /dev/null +++ b/chariots/ml/serializers/_keras_serializer.py @@ -0,0 +1,24 @@ +import os +import tempfile +from typing import Any + +import tensorflow + +from chariots.ml.serializers import BaseSerializer + + +class KerasSerializer(BaseSerializer): + + def serialize_object(self, target: Any) -> bytes: + with tempfile.TemporaryDirectory() as dir_path: + path = os.path.join(dir_path, 'model.h5') + target.save(path) + with open(path, 'rb') as bytes_file: + return bytes_file.read() + + def deserialize_object(self, serialized_object: bytes) -> Any: + with tempfile.TemporaryDirectory() as dir_path: + path = os.path.join(dir_path, 'model.h5') + with open(path, 'wb') as bytes_file: + bytes_file.write(serialized_object) + return tensorflow.keras.models.load_model(path) diff --git a/tests/unit/ml/test_keras.py b/tests/unit/ml/test_keras.py index 94adf09..cb398db 100644 --- a/tests/unit/ml/test_keras.py +++ b/tests/unit/ml/test_keras.py @@ -13,7 +13,7 @@ from chariots._helpers.test_helpers import FromArray, ToArray, LinearDataSet, KerasLogistic -@flaky(5, 1) +@flaky(2, 1) def test_train_keras_pipeline(tmpdir, opstore_func): """tests using an op in training and testing""" @@ -76,7 +76,7 @@ def _init_model(self): return model -@flaky(5, 1) +@flaky(2, 1) def test_keras_multiple_datasets(tmpdir, opstore_func): """tests keras with a multi-input model (build using the functional API)"""