Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ python:
- 3.7
- 3.6
- 3.5
matrix:
include:
- python: 3.7
dist: xenial
sudo: true
services:
- redis-server
install:
- pip install -U tox-travis
- pip install pylint
- pip install flake8
- pip install -r setup_requirements.txt
- pip install -r requirements.txt
script:
- tox
- flake8 chariots
- pylint chariots
- py.test chariots tests docs --doctest-modules --doctest-glob='*.rst'
deploy:
provider: pypi
distributions: sdist bdist_wheel
Expand Down
2 changes: 1 addition & 1 deletion chariots/_helpers/doc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from collections import Counter

from keras.utils import to_categorical
from tensorflow.keras.utils import to_categorical
from sklearn import datasets
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
Expand Down
3 changes: 2 additions & 1 deletion chariots/_helpers/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# pylint: disable=no-member
"""
module that provides importable and picklable ops for tests that need them
"""
import subprocess
import time

import numpy as np
from keras import callbacks, models, layers, optimizers
from tensorflow.keras import callbacks, models, layers, optimizers
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression

Expand Down
11 changes: 11 additions & 0 deletions chariots/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
the metadata module allows you to create and read metadata that will consumed through
a graphql api
"""


from .metadata_server import MetadataServer

__all__ = [
'MetadataServer'
]
97 changes: 97 additions & 0 deletions chariots/metadata/metadata_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
module to handle the metadata server
"""
# pylint: disable=too-few-public-methods

from flask import Flask
import graphene
from graphene import relay, String
from flask_graphql import GraphQLView
from graphene_sqlalchemy import SQLAlchemyObjectType, SQLAlchemyConnectionField

from chariots import op_store


class MetadataServer:
"""
The Metadata server allows you to store and retrieve information about your pipelines, versions and runs
"""
def __init__(self, db_url='sqlite:///:memory:'):
self.flask = Flask('metadarta')
self.flask.config['SQLALCHEMY_DATABASE_URI'] = db_url
self.flask.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
self.db = op_store.models.db # pylint: disable=invalid-name
self.db.app = self.flask
self.db.init_app(self.flask)
self.schema = self._build_schema()
self._init_routes()

def _init_routes(self):
self.flask.add_url_rule(
'/graphql',
view_func=GraphQLView.as_view(
'graphql',
schema=self.schema,
graphiql=True # for having the GraphiQL interface
)
)

def _build_schema(self): # pylint: disable=no-self-use
class Op(SQLAlchemyObjectType):
"""graphql op"""
class Meta:
"""metadata of the graphql object type"""
model = op_store.models.DBOp
filter_fields = ['op_name']
interfaces = (relay.Node, )

class Version(SQLAlchemyObjectType):
"""graphql version"""
class Meta:
"""metadata of the graphql object type"""
model = op_store.models.DBVersion
interfaces = (relay.Node, )

class PipelineLink(SQLAlchemyObjectType):
"""graphql pipeline link """
class Meta:
"""metadata of the graphql object type"""
model = op_store.models.DBPipelineLink
interfaces = (relay.Node, )

class ValidatedLink(SQLAlchemyObjectType):
"""graphql validated link"""
class Meta:
"""metadata of the graphql object type"""
model = op_store.models.DBValidatedLink
interfaces = (relay.Node,)

class Pipeline(SQLAlchemyObjectType):
"""graphql pipeline"""
class Meta:
"""metadata of the graphql object type"""
model = op_store.models.DBPipeline
interfaces = (relay.Node,)

class Query(graphene.ObjectType):
"""graphql query"""
node = relay.Node.Field()

ops = SQLAlchemyConnectionField(Op, name=String())
versions = SQLAlchemyConnectionField(Version)
pipelines = SQLAlchemyConnectionField(Pipeline)

op = relay.Node.Field(Op)
pipeline = relay.Node.Field(Pipeline)
version = relay.Node.Field(Version)
pipelineLink = relay.Node.Field(PipelineLink)
validatedLink = relay.Node.Field(ValidatedLink)

def resolve_ops(self, info, **args): # pylint: disable=no-self-use
"""resolve ops using the query if present"""
base_query = Op.get_query(info)
if 'name' in args:
base_query = base_query.filter(op_store.models.DBOp.op_name == args['name'])
return base_query.all()

return graphene.Schema(query=Query)
4 changes: 3 additions & 1 deletion chariots/ml/keras/_keras_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ...versioning import VersionedFieldDict
from .. import MLMode, BaseMLOp
from ..serializers import KerasSerializer


class KerasOp(BaseMLOp):
Expand All @@ -26,7 +27,7 @@ class KerasOp(BaseMLOp):
>>> from chariots.pipelines.nodes import Node
>>> from chariots.ml import MLMode
>>> from chariots.versioning import VersionType, VersionedFieldDict
>>> from keras import models, layers
>>> from tensorflow.keras import models, layers
...
...
>>> class KerasLinear(KerasOp):
Expand Down Expand Up @@ -89,6 +90,7 @@ class KerasOp(BaseMLOp):
"""

input_params = VersionedFieldDict()
serializer_cls = KerasSerializer

def __init__(self, mode: MLMode, verbose: Optional[int] = 1):
super().__init__(mode)
Expand Down
2 changes: 2 additions & 0 deletions chariots/ml/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
from ._csv_serialzer import CSVSerializer
from ._dill_serializer import DillSerializer
from ._json_serializer import JSONSerializer
from ._keras_serializer import KerasSerializer

__all__ = [
'BaseSerializer',
'DillSerializer',
'JSONSerializer',
'CSVSerializer',
'KerasSerializer'
]
24 changes: 24 additions & 0 deletions chariots/ml/serializers/_keras_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import tempfile
from typing import Any

import tensorflow

from chariots.ml.serializers import BaseSerializer


class KerasSerializer(BaseSerializer):

def serialize_object(self, target: Any) -> bytes:
with tempfile.TemporaryDirectory() as dir_path:
path = os.path.join(dir_path, 'model.h5')
target.save(path)
with open(path, 'rb') as bytes_file:
return bytes_file.read()

def deserialize_object(self, serialized_object: bytes) -> Any:
with tempfile.TemporaryDirectory() as dir_path:
path = os.path.join(dir_path, 'model.h5')
with open(path, 'wb') as bytes_file:
bytes_file.write(serialized_object)
return tensorflow.keras.models.load_model(path)
24 changes: 19 additions & 5 deletions chariots/op_store/_op_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask_migrate import Migrate
from sqlalchemy.orm import aliased

from .models import db
from .models import db, DBPipelineLink
from .models.version import DBVersion
from .models.op import DBOp
from .models.validated_link import DBValidatedLink
Expand Down Expand Up @@ -193,11 +193,19 @@ def register_valid_link(self):
if downstream_op_name is None:
return jsonify({})
downstream_op_id = self.get_or_register_db_op(downstream_op_name).id
exists_query = self._session.query(DBValidatedLink).filter(
DBValidatedLink.upstream_op_id == upstream_op_id,
DBValidatedLink.downstream_op_id == downstream_op_id,
DBValidatedLink.upstream_op_version_id == upstream_version_id,
)
if exists_query.one_or_none() is not None:
return jsonify({})
validated_link = DBValidatedLink(
upstream_op_id=upstream_op_id,
downstream_op_id=downstream_op_id,
upstream_op_version_id=upstream_version_id
)

self._session.add(validated_link)
self._session.commit()

Expand Down Expand Up @@ -287,16 +295,22 @@ def register_new_pipeline(self):
"""endpoint ot register a new pipeline"""

pipeline_name = request.json['pipeline_name']
last_op_name = request.json['last_op_name']

last_node_id = self.get_or_register_db_op(last_op_name).id
pipeline_links = request.json['pipeline_links']

db_pipeline = DBPipeline(
pipeline_name=pipeline_name,
last_op_id=last_node_id,
)
self._session.add(db_pipeline)
self._session.commit()
for upstream_node_name, downstream_node_name in pipeline_links:
upstream_op_id = self.get_or_register_db_op(upstream_node_name).id
downstream_op_id = self.get_or_register_db_op(downstream_node_name).id if downstream_node_name else None
self._session.add(
DBPipelineLink(pipeline_id=db_pipeline.id, upstream_op_id=upstream_op_id,
downstream_op_id=downstream_op_id)
)
self._session.commit()

return jsonify({})

def _init_routes(self):
Expand Down
18 changes: 8 additions & 10 deletions chariots/op_store/_op_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ def register_new_pipeline(self, pipeline: 'pipelines.Pipeline'):

:param pipeline: the pipeline to register
"""

for upstream_node, downstream_node in pipeline.get_all_op_links():
if downstream_node is None:
self.post('/v1/register_new_pipeline', {
'pipeline_name': pipeline.name,
'last_op_name': upstream_node.name
})
return
raise ValueError('did not manage to find last node of the pipeline')
self.post('/v1/register_new_pipeline', {
'pipeline_name': pipeline.name,
'pipeline_links': [
(upstream_node.name, downstream_node.name if downstream_node is not None else None)
for upstream_node, downstream_node in pipeline.get_all_op_links()
]
})


class OpStoreClient(BaseOpStoreClient):
Expand All @@ -146,7 +144,7 @@ def __init__(self, url):

def post(self, route, arguments_json):
response = requests.post(
route,
self.url + route,
headers={'Content-Type': 'application/json'},
data=json.dumps(arguments_json)
)
Expand Down
12 changes: 8 additions & 4 deletions chariots/op_store/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""the module for all the DB models of the op store"""
# pylint: disable=wrong-import-position, missing-module-docstring, missing-class-docstring, too-few-public-methods
# pylint: disable=wrong-import-position, missing-module-docstring, missing-class-docstring, too-few-public-methods; # noqa
# flake8: noqa
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy() # pylint: disable=invalid-name

from .pipeline import DBPipeline # noqa
from .version import DBVersion # noqa
from .validated_link import DBValidatedLink # noqa
from .version import DBVersion # noqa
from .pipeline_link import DBPipelineLink
from .pipeline import DBPipeline # noqa
from .op import DBOp # noqa


Expand All @@ -14,5 +16,7 @@
'SQLAlchemy',
'DBValidatedLink',
'DBValidatedLink',
'DBOp'
'DBOp',
'DBPipelineLink',
'DBVersion'
]
13 changes: 12 additions & 1 deletion chariots/op_store/models/op.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
# pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import relationship

from ..models import db
from ..models import db, DBVersion, DBPipelineLink, DBValidatedLink


class DBOp(db.Model):
id = Column(Integer, primary_key=True)
op_name = Column(String)

versions = relationship(DBVersion, backref='op')
upstream_links = relationship(DBPipelineLink, primaryjoin='DBOp.id==DBPipelineLink.downstream_op_id',
backref='downstream_op')
downstream_links = relationship(DBPipelineLink, primaryjoin='DBOp.id==DBPipelineLink.upstream_op_id',
backref='upstream_op')
upstream_validated_links = relationship(DBValidatedLink, primaryjoin='DBOp.id==DBValidatedLink.downstream_op_id',
backref='downstream_op')
downstream_validated_links = relationship(DBValidatedLink, primaryjoin='DBOp.id==DBValidatedLink.upstream_op_id',
backref='upstream_op')

def __repr__(self):
return 'DBOp(id={}, op_name={})'.format(self.id, self.op_name)
9 changes: 5 additions & 4 deletions chariots/op_store/models/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# pylint: disable=missing-module-docstring, missing-class-docstring, too-few-public-methods
from sqlalchemy import Column, Integer, ForeignKey, String
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship

from .op import DBOp
from ..models import db
from ..models import db, DBPipelineLink


class DBPipeline(db.Model):

id = Column(Integer, primary_key=True)
pipeline_name = Column(String)
last_op_id = Column(Integer, ForeignKey(DBOp.id))

links = relationship(DBPipelineLink, backref='pipeline')
13 changes: 13 additions & 0 deletions chariots/op_store/models/pipeline_link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""pipeline link"""
from sqlalchemy import Column, Integer, ForeignKey

from ..models import db


class DBPipelineLink(db.Model): # pylint: disable=too-few-public-methods
"""a pipeline link is a link inside of a pipeline's DAG"""

id = Column(Integer, primary_key=True)
pipeline_id = Column(Integer, ForeignKey('db_pipeline.id'))
upstream_op_id = Column(Integer, ForeignKey('db_op.id'), nullable=False)
downstream_op_id = Column(Integer, ForeignKey('db_op.id'), nullable=True)
Loading