From cab97c20025025d127328748e8672f680fd61326 Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Tue, 17 Feb 2026 02:06:18 +0100 Subject: [PATCH 1/2] added sparql resource, upgrade --- CHANGELOG.md | 17 +- README.md | 107 ++-- docker/cleanup-all.sh | 2 +- docker/fuseki/.env | 7 + docker/fuseki/docker-compose.yml | 16 + docker/start-all.sh | 2 +- docker/stop-all.sh | 2 +- docs/concepts/index.md | 62 ++- docs/examples/example-6.md | 482 ++++++++++++------ docs/examples/example-7.md | 232 +++++++++ docs/examples/index.md | 3 +- docs/getting_started/installation.md | 19 +- docs/index.md | 89 ++-- docs/reference/data_source/rdf.md | 3 + docs/reference/hq/rdf_inferencer.md | 3 + examples/6-ingest-rdf/data/data.ttl | 88 ++++ examples/6-ingest-rdf/data/ontology.ttl | 60 +++ examples/6-ingest-rdf/ingest.py | 149 ++++++ graflo/data_source/__init__.py | 20 + graflo/data_source/base.py | 2 + graflo/data_source/rdf.py | 362 +++++++++++++ graflo/db/connection/config_mapping.py | 2 + graflo/db/connection/onto.py | 105 ++++ graflo/hq/graph_engine.py | 80 ++- graflo/hq/rdf_inferencer.py | 266 ++++++++++ graflo/hq/registry_builder.py | 101 +++- graflo/onto.py | 3 +- graflo/util/onto.py | 151 +++++- mkdocs.yml | 3 +- pyproject.toml | 7 +- test/data_source/sparql/__init__.py | 0 test/data_source/sparql/conftest.py | 45 ++ test/data_source/sparql/data/sample_data.ttl | 45 ++ .../sparql/data/sample_ontology.ttl | 41 ++ .../sparql/test_rdf_file_data_source.py | 112 ++++ test/data_source/sparql/test_rdf_inference.py | 98 ++++ .../sparql/test_sparql_data_source.py | 174 +++++++ test/db/falkordbs/test_performance.py | 10 +- test/db/memgraphs/test_performance.py | 4 + test/test_patterns.py | 3 +- uv.lock | 41 +- 41 files changed, 2722 insertions(+), 296 deletions(-) create mode 100644 docker/fuseki/.env create mode 100644 docker/fuseki/docker-compose.yml create mode 100644 docs/examples/example-7.md create mode 100644 docs/reference/data_source/rdf.md create mode 100644 docs/reference/hq/rdf_inferencer.md create mode 100644 examples/6-ingest-rdf/data/data.ttl create mode 100644 examples/6-ingest-rdf/data/ontology.ttl create mode 100644 examples/6-ingest-rdf/ingest.py create mode 100644 graflo/data_source/rdf.py create mode 100644 graflo/hq/rdf_inferencer.py create mode 100644 test/data_source/sparql/__init__.py create mode 100644 test/data_source/sparql/conftest.py create mode 100644 test/data_source/sparql/data/sample_data.ttl create mode 100644 test/data_source/sparql/data/sample_ontology.ttl create mode 100644 test/data_source/sparql/test_rdf_file_data_source.py create mode 100644 test/data_source/sparql/test_rdf_inference.py create mode 100644 test/data_source/sparql/test_sparql_data_source.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ef02a62..421d3506 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [1.6.0] - 2026-02-17 + +### Added +- **SPARQL / RDF resource support**: Ingest data from SPARQL endpoints (e.g. Apache Fuseki) and local RDF files (`.ttl`, `.rdf`, `.n3`, `.jsonld`) into property graphs + - New `SparqlPattern` for mapping `rdf:Class` instances to resources, alongside existing `FilePattern` and `TablePattern` + - New `RdfDataSource` abstract parent with shared RDF-to-dict conversion logic; concrete subclasses `RdfFileDataSource` (local files via rdflib) and `SparqlEndpointDataSource` (remote endpoints via SPARQLWrapper) + - New `SparqlEndpointConfig` (extends `DBConfig`) with `from_docker_env()` for Fuseki containers + - New `RdfInferenceManager` auto-infers graflo `Schema` from OWL/RDFS ontologies: `owl:Class` to vertices, `owl:DatatypeProperty` to fields, `owl:ObjectProperty` to edges + - `GraphEngine.infer_schema_from_rdf()` and `GraphEngine.create_patterns_from_rdf()` for the RDF inference workflow + - `Patterns` class extended with `sparql_patterns` and `sparql_configs` dicts + - `RegistryBuilder` handles `ResourceType.SPARQL` to create the appropriate data sources + - `ResourceType.SPARQL`, `DataSourceType.SPARQL`, `DBType.SPARQL` enum values + - `rdflib` and `SPARQLWrapper` available as the `sparql` optional extra (`pip install graflo[sparql]`) + - Docker scripts (`start-all.sh`, `stop-all.sh`, `cleanup-all.sh`) updated to include Fuseki + - Test suite with 22 tests: RDF file parsing, ontology inference, and live Fuseki integration ### Changed - **Top-level imports optimized**: Key classes are now importable directly from `graflo`: @@ -17,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **`graflo.filter` package exports**: `FilterExpression`, `ComparisonOperator`, and `LogicalOperator` are now re-exported from `graflo.filter.__init__` (previously only available via `graflo.filter.onto`) ### Documentation +- Added data-flow diagram (Pattern -> DataSource -> Resource -> GraphContainer -> Target DB) to Concepts page - Added **Mermaid class diagrams** to Concepts page showing: - `GraphEngine` orchestration: how `GraphEngine` delegates to `InferenceManager`, `ResourceMapper`, `Caster`, and `ConnectionManager` - `Schema` architecture: the full hierarchy from `Schema` through `VertexConfig`/`EdgeConfig`, `Resource`, `Actor` subtypes, `Field`, and `FilterExpression` diff --git a/README.md b/README.md index dd59d263..68c07c3f 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # GraFlo graflo logo -A framework for transforming **tabular** (CSV, SQL) and **hierarchical** data (JSON, XML) into property graphs and ingesting them into graph databases (ArangoDB, Neo4j, **TigerGraph**, **FalkorDB**, **Memgraph**). +A framework for transforming **tabular** (CSV, SQL), **hierarchical** (JSON, XML), and **RDF/SPARQL** data into property graphs and ingesting them into graph databases (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph). -> **⚠️ Package Renamed**: This package was formerly known as `graphcast`. +> **Package Renamed**: This package was formerly known as `graphcast`. ![Python](https://img.shields.io/badge/python-3.11%2B-blue.svg) [![PyPI version](https://badge.fury.io/py/graflo.svg)](https://badge.fury.io/py/graflo) @@ -11,56 +11,35 @@ A framework for transforming **tabular** (CSV, SQL) and **hierarchical** data (J [![pre-commit](https://github.com/growgraph/graflo/actions/workflows/pre-commit.yml/badge.svg)](https://github.com/growgraph/graflo/actions/workflows/pre-commit.yml) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.15446131.svg)]( https://doi.org/10.5281/zenodo.15446131) -## Core Concepts +## Overview -### Property Graphs -graflo works with property graphs, which consist of: +graflo reads data from multiple source types, transforms it according to a declarative schema, and writes property-graph vertices and edges to a target graph database. The pipeline is: -- **Vertices**: Nodes with properties and optional unique identifiers -- **Edges**: Relationships between vertices with their own properties -- **Properties**: Both vertices and edges may have properties +**Pattern** (where data lives) --> **DataSource** (how to read it) --> **Resource** (what to extract) --> **GraphContainer** --> **Target DB** -### Schema -The Schema defines how your data should be transformed into a graph and contains: +### Supported sources -- **Vertex Definitions**: Specify vertex types, their properties, and unique identifiers - - Fields can be specified as strings (backward compatible) or typed `Field` objects with types (INT, FLOAT, STRING, DATETIME, BOOL) - - Type information enables better validation and database-specific optimizations -- **Edge Definitions**: Define relationships between vertices and their properties - - Weight fields support typed definitions for better type safety -- **Resource Mapping**: describe how data sources map to vertices and edges -- **Transforms**: Modify data during the casting process -- **Automatic Schema Inference**: Generate schemas automatically from PostgreSQL 3NF databases +| Source type | Pattern | DataSource | Schema inference | +|---|---|---|---| +| CSV / JSON / JSONL / Parquet files | `FilePattern` | `FileDataSource` | manual | +| PostgreSQL tables | `TablePattern` | `SQLDataSource` | automatic (3NF with PK/FK) | +| RDF files (`.ttl`, `.rdf`, `.n3`) | `SparqlPattern` | `RdfFileDataSource` | automatic (OWL/RDFS ontology) | +| SPARQL endpoints (Fuseki, ...) | `SparqlPattern` | `SparqlEndpointDataSource` | automatic (OWL/RDFS ontology) | +| REST APIs | -- | `APIDataSource` | manual | +| In-memory (list / DataFrame) | -- | `InMemoryDataSource` | manual | -### Resources -Resources are your data sources that can be: +### Supported targets -- **Table-like**: CSV files, database tables -- **JSON-like**: JSON files, nested data structures +ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph -- same API for all. ## Features -- **Graph Transformation Meta-language**: A powerful declarative language to describe how your data becomes a property graph: - - Define vertex and edge structures with typed fields - - Set compound indexes for vertices and edges - - Use blank vertices for complex relationships - - Specify edge constraints and properties with typed weight fields - - Apply advanced filtering and transformations -- **Typed Schema Definitions**: Enhanced type support throughout the schema system - - Vertex fields support types (INT, FLOAT, STRING, DATETIME, BOOL) for better validation - - Edge weight fields can specify types for improved type safety - - Backward compatible: fields without types default to None (suitable for databases like ArangoDB) -- **πŸš€ PostgreSQL Schema Inference**: **Automatically generate schemas from PostgreSQL 3NF databases** - No manual schema definition needed! - - Introspect PostgreSQL schemas to identify vertex-like and edge-like tables - - Automatically map PostgreSQL data types to graflo Field types (INT, FLOAT, STRING, DATETIME, BOOL) - - Infer vertex configurations from table structures with proper indexes - - Infer edge configurations from foreign key relationships - - Create Resource mappings from PostgreSQL tables automatically - - Direct database access - ingest data without exporting to files first -- **Async ingestion**: Efficient async/await-based ingestion pipeline for better performance -- **Parallel processing**: Use as many cores as you have -- **Database support**: Ingest into ArangoDB, Neo4j, **TigerGraph**, **FalkorDB**, and **Memgraph** using the same API (database agnostic). Source data from PostgreSQL and other SQL databases. -- **Server-side filtering**: Efficient querying with server-side filtering support (TigerGraph REST++ API) +- **Declarative graph transformation**: Define vertex/edge structures, indexes, weights, and transforms in YAML or Python dicts. Resources describe how each data source maps to vertices and edges. +- **Schema inference**: Automatically generate schemas from PostgreSQL 3NF databases (PK/FK heuristics) or from OWL/RDFS ontologies (class/property introspection). +- **RDF / SPARQL ingestion**: Read `.ttl` files via rdflib or query SPARQL endpoints (e.g. Apache Fuseki). `owl:Class` maps to vertices, `owl:ObjectProperty` to edges, `owl:DatatypeProperty` to vertex fields. +- **Typed fields**: Vertex fields and edge weights support types (`INT`, `FLOAT`, `STRING`, `DATETIME`, `BOOL`) for validation and database-specific optimisation. +- **Parallel batch processing**: Configurable batch sizes and multi-core execution. +- **Database-agnostic**: Single API targeting ArangoDB, Neo4j, TigerGraph, FalkorDB, and Memgraph. Source data from PostgreSQL, SPARQL endpoints, files, APIs, or in-memory objects. ## Documentation Full documentation is available at: [growgraph.github.io/graflo](https://growgraph.github.io/graflo) @@ -69,6 +48,9 @@ Full documentation is available at: [growgraph.github.io/graflo](https://growgra ```bash pip install graflo + +# With RDF / SPARQL support (adds rdflib + SPARQLWrapper) +pip install graflo[sparql] ``` ## Usage Examples @@ -187,6 +169,34 @@ caster = Caster(schema) # ... continue with ingestion ``` +### RDF / SPARQL Ingestion + +```python +from pathlib import Path +from graflo.hq import GraphEngine +from graflo.db.connection.onto import ArangoConfig + +engine = GraphEngine() + +# Infer schema from an OWL/RDFS ontology file +ontology = Path("ontology.ttl") +schema = engine.infer_schema_from_rdf(source=ontology) + +# Create data-source patterns (reads a local .ttl file per rdf:Class) +patterns = engine.create_patterns_from_rdf(source=ontology) + +# Or point at a SPARQL endpoint instead: +# from graflo.db.connection.onto import SparqlEndpointConfig +# sparql_cfg = SparqlEndpointConfig(uri="http://localhost:3030", dataset="mydata") +# patterns = engine.create_patterns_from_rdf( +# source=ontology, +# endpoint_url=sparql_cfg.query_endpoint, +# ) + +target = ArangoConfig.from_docker_env() +engine.define_and_ingest(schema=schema, target_db_config=target, patterns=patterns) +``` + ## Development To install requirements @@ -235,25 +245,32 @@ FalkorDB from [falkordb docker folder](./docker/falkordb) by docker-compose --env-file .env up falkordb ``` -and Memgraph from [memgraph docker folder](./docker/memgraph) by +Memgraph from [memgraph docker folder](./docker/memgraph) by ```shell docker-compose --env-file .env up memgraph ``` +and Apache Fuseki from [fuseki docker folder](./docker/fuseki) by + +```shell +docker-compose --env-file .env up fuseki +``` + To run unit tests ```shell pytest test ``` -> **Note**: Tests require external database containers (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph) to be running. CI builds intentionally skip test execution. Tests must be run locally with the required database images started (see [Test databases](#test-databases) section above). +> **Note**: Tests require external database containers (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, Fuseki) to be running. CI builds intentionally skip test execution. Tests must be run locally with the required database images started (see [Test databases](#test-databases) section above). ## Requirements - Python 3.11+ (Python 3.11 and 3.12 are officially supported) - python-arango - sqlalchemy>=2.0.0 (for PostgreSQL and SQL data sources) +- rdflib>=7.0.0 + SPARQLWrapper>=2.0.0 (optional, install with `pip install graflo[sparql]`) ## Contributing diff --git a/docker/cleanup-all.sh b/docker/cleanup-all.sh index 61c8c308..32bec197 100755 --- a/docker/cleanup-all.sh +++ b/docker/cleanup-all.sh @@ -9,7 +9,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" # Database directories -DATABASES=("arango" "neo4j" "postgres" "falkordb" "memgraph" "nebula" "tigergraph") +DATABASES=("arango" "neo4j" "postgres" "falkordb" "memgraph" "nebula" "tigergraph" "fuseki") # Colors for output GREEN='\033[0;32m' diff --git a/docker/fuseki/.env b/docker/fuseki/.env new file mode 100644 index 00000000..f2464b93 --- /dev/null +++ b/docker/fuseki/.env @@ -0,0 +1,7 @@ +IMAGE_VERSION=secoresearch/fuseki:5.1.0 +SPEC=graflo +CONTAINER_NAME="${SPEC}.fuseki" +TS_PORT=3032 +TS_PASSWORD="abc123-qwe" +TS_USERNAME="admin" +TS_DATASET="test" diff --git a/docker/fuseki/docker-compose.yml b/docker/fuseki/docker-compose.yml new file mode 100644 index 00000000..a2fbecdd --- /dev/null +++ b/docker/fuseki/docker-compose.yml @@ -0,0 +1,16 @@ +services: + fuseki: + image: ${IMAGE_VERSION} + user: "${UID}:${GID}" + restart: "no" + profiles: ["${CONTAINER_NAME}"] + ports: + - "${TS_PORT}:3030" + container_name: ${CONTAINER_NAME} + volumes: + - fuseki_data:/fuseki + environment: + - ADMIN_PASSWORD=${TS_PASSWORD} +volumes: + fuseki_data: + driver: local diff --git a/docker/start-all.sh b/docker/start-all.sh index 30ba9159..c2342922 100755 --- a/docker/start-all.sh +++ b/docker/start-all.sh @@ -8,7 +8,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" # Database directories -DATABASES=("arango" "neo4j" "postgres" "falkordb" "memgraph" "nebula" "tigergraph") +DATABASES=("arango" "neo4j" "postgres" "falkordb" "memgraph" "nebula" "tigergraph" "fuseki") # Colors for output GREEN='\033[0;32m' diff --git a/docker/stop-all.sh b/docker/stop-all.sh index 02b62663..e0328ac7 100755 --- a/docker/stop-all.sh +++ b/docker/stop-all.sh @@ -8,7 +8,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" # Database directories -DATABASES=("arango" "neo4j" "postgres" "falkordb" "memgraph" "nebula" "tigergraph") +DATABASES=("arango" "neo4j" "postgres" "falkordb" "memgraph" "nebula" "tigergraph" "fuseki") # Colors for output GREEN='\033[0;32m' diff --git a/docs/concepts/index.md b/docs/concepts/index.md index 3f5f7447..4673b945 100644 --- a/docs/concepts/index.md +++ b/docs/concepts/index.md @@ -10,6 +10,51 @@ graflo transforms data sources into property graphs through a pipeline of compon Each component plays a specific role in this transformation process. +### Data flow: Pattern β†’ DataSource β†’ Resource β†’ GraphContainer β†’ Target DB + +The diagram below shows how different data sources (files, SQL tables, RDF/SPARQL) +flow through the unified ingestion pipeline. + +```mermaid +flowchart LR + subgraph sources [Data Sources] + TTL["*.ttl / *.rdf files"] + Fuseki["SPARQL Endpoint\n(Fuseki)"] + Files["CSV / JSON files"] + PG["PostgreSQL"] + end + subgraph patterns [Patterns] + FP[FilePattern] + TP[TablePattern] + SP[SparqlPattern] + end + subgraph datasources [DataSource Layer] + subgraph rdfFamily ["RdfDataSource (abstract)"] + RdfDS[RdfFileDataSource] + SparqlDS[SparqlEndpointDataSource] + end + FileDS[FileDataSource] + SQLDS[SQLDataSource] + end + subgraph pipeline [Shared Pipeline] + Res[Resource Pipeline] + GC[GraphContainer] + DBW[DBWriter] + end + + TTL --> SP --> RdfDS --> Res + Fuseki --> SP --> SparqlDS --> Res + Files --> FP --> FileDS --> Res + PG --> TP --> SQLDS --> Res + Res --> GC --> DBW +``` + +- **Patterns** describe *where* data comes from (file paths, SQL tables, SPARQL endpoints). +- **DataSources** handle *how* to read data in batches from each source type. +- **Resources** define *what* to extract from each document (vertices, edges, transforms). +- **GraphContainer** collects the resulting vertices and edges. +- **DBWriter** pushes the graph data into the target database (ArangoDB, Neo4j, TigerGraph, etc.). + ## Class Diagrams ### GraphEngine orchestration @@ -28,6 +73,8 @@ classDiagram +introspect(postgres_config) SchemaIntrospectionResult +infer_schema(postgres_config) Schema +create_patterns(postgres_config) Patterns + +infer_schema_from_rdf(source) Schema + +create_patterns_from_rdf(source) Patterns +define_schema(schema, target_db_config) +define_and_ingest(schema, target_db_config, ...) +ingest(schema, target_db_config, ...) @@ -63,6 +110,7 @@ classDiagram class Patterns { +file_patterns: list~FilePattern~ +table_patterns: list~TablePattern~ + +sparql_patterns: list~SparqlPattern~ } class DBConfig { @@ -293,7 +341,7 @@ The `Schema` is the central configuration that defines how data sources are tran - Resource mappings - Data transformations - Index configurations -- Automatic schema inference from normalized PostgreSQL databases (3NF) with proper primary keys (PK) and foreign keys (FK) using intelligent heuristics +- Automatic schema inference from normalized PostgreSQL databases (3NF with PK/FK) or from OWL/RDFS ontologies ### Vertex A `Vertex` describes vertices and their database indexes. It supports: @@ -386,11 +434,13 @@ Edges in graflo support a rich set of attributes that enable flexible relationsh A `DataSource` defines where data comes from and how it's retrieved. graflo supports multiple data source types: - **File Data Sources**: JSON, JSONL, CSV/TSV files +- **RDF File Data Sources**: Turtle (`.ttl`), RDF/XML (`.rdf`), N3 (`.n3`), JSON-LD files -- parsed via `rdflib`, triples grouped by subject into flat dictionaries +- **SPARQL Data Sources**: Remote SPARQL endpoints (e.g. Apache Fuseki) queried via `SPARQLWrapper` with pagination - **API Data Sources**: REST API endpoints with pagination, authentication, and retry logic - **SQL Data Sources**: SQL databases via SQLAlchemy with parameterized queries - **In-Memory Data Sources**: Python objects (lists, DataFrames) already in memory -Data sources are separate from Resources - they handle data retrieval, while Resources handle data transformation. Many data sources can map to the same Resource, allowing data to be ingested from multiple sources. +Data sources are separate from Resources -- they handle data retrieval, while Resources handle data transformation. Many data sources can map to the same Resource, allowing data to be ingested from multiple sources. ### Resource A `Resource` is a set of mappings and transformations that define how data becomes a graph, defined as a hierarchical structure of `Actors`. Resources are part of the Schema and define: @@ -431,7 +481,8 @@ A `Transform` defines data transforms, from renaming and type-casting to arbitra - **Edge Constraints**: Ensure edge uniqueness based on source, target, and weight - **Reusable Transforms**: Define and reference transformations by name - **Vertex Filtering**: Filter vertices based on custom conditions -- **PostgreSQL Schema Inference**: Automatically infer schemas from normalized PostgreSQL databases (3NF) with proper primary keys (PK) and foreign keys (FK) decorated, using intelligent heuristics to detect vertices and edges +- **PostgreSQL Schema Inference**: Infer schemas from normalized PostgreSQL databases (3NF) with PK/FK constraints +- **RDF / OWL Schema Inference**: Infer schemas from OWL/RDFS ontologies -- `owl:Class` becomes vertices, `owl:ObjectProperty` becomes edges, `owl:DatatypeProperty` becomes vertex fields ### Performance Optimization - **Batch Processing**: Process large datasets in configurable batches (`batch_size` parameter of `Caster`) @@ -453,6 +504,7 @@ A `Transform` defines data transforms, from renaming and type-casting to arbitra - Specify types for weight fields when using databases that require type information (e.g., TigerGraph) - Use typed `Field` objects or dicts with `type` key for better validation 8. Leverage key matching (`match_source`, `match_target`) for complex matching scenarios -9. Use PostgreSQL schema inference for automatic schema generation from normalized databases (3NF) with proper PK/FK constraints - the heuristics work best when primary keys and foreign keys are properly decorated -10. Specify field types for better validation and database-specific optimizations, especially when targeting TigerGraph +9. Use PostgreSQL schema inference for automatic schema generation from normalized databases (3NF) with proper PK/FK constraints +10. Use RDF/OWL schema inference (`infer_schema_from_rdf`) when ingesting data from SPARQL endpoints or `.ttl` files with a well-defined ontology +11. Specify field types for better validation and database-specific optimizations, especially when targeting TigerGraph diff --git a/docs/examples/example-6.md b/docs/examples/example-6.md index 32538656..08209081 100644 --- a/docs/examples/example-6.md +++ b/docs/examples/example-6.md @@ -1,232 +1,384 @@ -# Example 6: REST API Data Source [coming] +# Example 6: RDF / Turtle Ingestion with Explicit Resource Mapping -This example demonstrates how to ingest data from a REST API endpoint into a graph database. +This example demonstrates how to ingest data from RDF (Turtle) files into a graph database, using OWL ontology inference for the schema and explicit `SparqlPattern` resource mapping for the data. -## Scenario +## Overview -Suppose you have a REST API that provides user data with pagination. You want to ingest this data into a graph database. +Instead of manually writing a YAML schema, this example shows how to: -## API Response Format +- **Infer the graph schema** automatically from an OWL ontology (TBox) +- **Build patterns explicitly** using `SparqlPattern` β€” one per `rdf:Class` β€” pointing at a local Turtle data file +- **Ingest RDF instance data** (ABox) into a graph database (ArangoDB, Neo4j, TigerGraph, FalkorDB) -The API returns data in the following format: +## Requirements -```json -{ - "data": [ - {"id": 1, "name": "Alice", "department": "Engineering"}, - {"id": 2, "name": "Bob", "department": "Sales"} - ], - "has_more": true, - "offset": 0, - "limit": 100, - "total": 250 -} +- A target graph database running (ArangoDB, Neo4j, TigerGraph, or FalkorDB) +- The `sparql` extra installed: + +```bash +pip install graflo[sparql] ``` -## Schema Definition +This pulls in `rdflib` (for local RDF file parsing) and `SPARQLWrapper` (for remote SPARQL endpoints). -Define your schema as usual: +## Dataset: Academic Knowledge Graph -```yaml -general: - name: users_api - -vertices: - - name: person - fields: - - id - - name - - department - indexes: - - fields: - - id +The example models a small academic knowledge graph with three entity types and three relationship types. -resources: - - resource_name: users - apply: - - vertex: person -``` +### Ontology (TBox) β€” `data/ontology.ttl` -## Using API Data Source +The ontology declares classes, datatype properties (vertex fields), and object properties (edges) using standard OWL vocabulary: -### Python Code +```turtle +@prefix owl: . +@prefix rdfs: . +@prefix xsd: . +@prefix ex: . -```python -from suthing import FileHandle -from graflo import Caster, DataSourceRegistry, Schema -from graflo.data_source import DataSourceFactory, APIConfig, PaginationConfig -from graflo.db.connection.onto import DBConfig - -# Load schema -schema = Schema.from_dict(FileHandle.load("schema.yaml")) - -# Create API configuration -api_config = APIConfig( - url="https://api.example.com/users", - method="GET", - headers={"Authorization": "Bearer your-token"}, - pagination=PaginationConfig( - strategy="offset", - offset_param="offset", - limit_param="limit", - page_size=100, - has_more_path="has_more", - data_path="data", - ), -) +# Classes +ex:Researcher a owl:Class . +ex:Publication a owl:Class . +ex:Institution a owl:Class . -# Create API data source -api_source = DataSourceFactory.create_api_data_source(api_config) +# Datatype properties (become vertex fields) +ex:fullName a owl:DatatypeProperty ; rdfs:domain ex:Researcher ; rdfs:range xsd:string . +ex:orcid a owl:DatatypeProperty ; rdfs:domain ex:Researcher ; rdfs:range xsd:string . +ex:title a owl:DatatypeProperty ; rdfs:domain ex:Publication ; rdfs:range xsd:string . +ex:year a owl:DatatypeProperty ; rdfs:domain ex:Publication ; rdfs:range xsd:integer . +ex:doi a owl:DatatypeProperty ; rdfs:domain ex:Publication ; rdfs:range xsd:string . +ex:instName a owl:DatatypeProperty ; rdfs:domain ex:Institution ; rdfs:range xsd:string . +ex:country a owl:DatatypeProperty ; rdfs:domain ex:Institution ; rdfs:range xsd:string . -# Register with resource -registry = DataSourceRegistry() -registry.register(api_source, resource_name="users") +# Object properties (become edges) +ex:authorOf a owl:ObjectProperty ; rdfs:domain ex:Researcher ; rdfs:range ex:Publication . +ex:affiliatedWith a owl:ObjectProperty ; rdfs:domain ex:Researcher ; rdfs:range ex:Institution . +ex:cites a owl:ObjectProperty ; rdfs:domain ex:Publication ; rdfs:range ex:Publication . +``` -# Create caster and ingest -from graflo.hq.caster import IngestionParams +### How the Ontology Maps to a Graph Schema -caster = Caster(schema) -# Load config from file -config_data = FileHandle.load("db.yaml") -conn_conf = DBConfig.from_dict(config_data) +| OWL Construct | graflo Artefact | Example | +|---|---|---| +| `owl:Class` | **Vertex** | `Researcher`, `Publication`, `Institution` | +| `owl:DatatypeProperty` | **Vertex field** | `fullName`, `year`, `country` | +| `owl:ObjectProperty` | **Edge** | `authorOf`, `affiliatedWith`, `cites` | +| `rdfs:domain` / `rdfs:range` | Edge **source** / **target** | `Researcher β†’ Publication` | -ingestion_params = IngestionParams( - clear_data=True, - batch_size=1000, # Process 1000 items per batch -) +### Graph Structure -caster.ingest_data_sources( - data_source_registry=registry, - conn_conf=conn_conf, - ingestion_params=ingestion_params, -) +The inferred graph structure has three vertex types and three edge types: + +```mermaid +graph LR + Researcher -->|authorOf| Publication + Researcher -->|affiliatedWith| Institution + Publication -->|cites| Publication ``` -### Using Configuration File +### Instance Data (ABox) β€” `data/data.ttl` -Create a data source configuration file (`data_sources.yaml`): +The data file contains both the TBox (so it is self-contained) and the instance data: -```yaml -data_sources: - - source_type: api - resource_name: users - config: - url: https://api.example.com/users - method: GET - headers: - Authorization: "Bearer your-token" - pagination: - strategy: offset - offset_param: offset - limit_param: limit - page_size: 100 - has_more_path: has_more - data_path: data -``` +- **3 Institutions**: MIT, ETH ZΓΌrich, University of Oxford +- **4 Researchers**: Alice, Bob, Carol, Dave β€” each affiliated with an institution and authoring one or more publications +- **4 Publications**: With citation links between them -Then use the CLI: +```turtle +ex:alice a ex:Researcher ; + ex:fullName "Alice Smith" ; + ex:orcid "0000-0001-0001-0001" ; + ex:affiliatedWith ex:mit ; + ex:authorOf ex:paper1 , ex:paper3 . -```bash -uv run ingest \ - --db-config-path config/db.yaml \ - --schema-path config/schema.yaml \ - --data-source-config-path data_sources.yaml +ex:paper2 a ex:Publication ; + ex:title "Knowledge Graphs in Practice" ; + ex:year 2022 ; + ex:doi "10.1234/paper2" ; + ex:cites ex:paper1 . ``` -## Pagination Strategies +## Step-by-Step Guide + +### Step 1: Connect to Target Graph Database -### Offset-based Pagination +Choose your target graph database and load connection config from the corresponding Docker `.env` file: ```python -pagination = PaginationConfig( - strategy="offset", - offset_param="offset", - limit_param="limit", - page_size=100, - has_more_path="has_more", - data_path="data", -) +from graflo.db.connection.onto import ArangoConfig + +conn_conf = ArangoConfig.from_docker_env() + +# Alternative targets: +# from graflo.db.connection.onto import Neo4jConfig, TigergraphConfig, FalkordbConfig +# conn_conf = Neo4jConfig.from_docker_env() +# conn_conf = TigergraphConfig.from_docker_env() +# conn_conf = FalkordbConfig.from_docker_env() + +db_type = conn_conf.connection_type ``` -### Cursor-based Pagination +### Step 2: Infer Schema from the OWL Ontology + +`GraphEngine.infer_schema_from_rdf()` reads the ontology file and automatically produces a complete graflo Schema: ```python -pagination = PaginationConfig( - strategy="cursor", - cursor_param="next_cursor", - cursor_path="next_cursor", - page_size=100, - data_path="items", +from graflo.hq import GraphEngine + +engine = GraphEngine(target_db_flavor=db_type) + +schema = engine.infer_schema_from_rdf( + source="data/ontology.ttl", + schema_name="academic_kg", ) ``` -### Page-based Pagination +**What happens during inference:** -```python -pagination = PaginationConfig( - strategy="page", - page_param="page", - per_page_param="per_page", - page_size=50, - data_path="results", -) +1. **Class discovery** β€” `owl:Class` declarations become **vertices** (`Researcher`, `Publication`, `Institution`) +2. **Field discovery** β€” `owl:DatatypeProperty` declarations with `rdfs:domain` become **fields** on the corresponding vertex, plus automatic `_key` and `_uri` fields +3. **Edge discovery** β€” `owl:ObjectProperty` declarations with `rdfs:domain` / `rdfs:range` become **edges** (`authorOf`, `affiliatedWith`, `cites`) +4. **Resource creation** β€” One resource per class is created, wiring the vertex and its outgoing edges + +### Inferred Schema Structure + +The inferred schema is equivalent to this YAML: + +```yaml +general: + name: academic_kg +vertex_config: + vertices: + - name: Researcher + fields: [_key, _uri, fullName, orcid] + - name: Publication + fields: [_key, _uri, title, year, doi] + - name: Institution + fields: [_key, _uri, instName, country] +edge_config: + edges: + - source: Researcher + target: Publication + relation: authorOf + - source: Researcher + target: Institution + relation: affiliatedWith + - source: Publication + target: Publication + relation: cites +resources: +- resource_name: Researcher + apply: + - vertex: Researcher + - source: Researcher + target: Publication + relation: authorOf + - source: Researcher + target: Institution + relation: affiliatedWith +- resource_name: Publication + apply: + - vertex: Publication + - source: Publication + target: Publication + relation: cites +- resource_name: Institution + apply: + - vertex: Institution ``` -## Authentication +### Step 3: Build Patterns with Explicit Resource Mapping -### Basic Authentication +Instead of calling `engine.create_patterns_from_rdf()` (which does this automatically), we construct each `SparqlPattern` by hand. This gives full control over which `rdf:Class` URI maps to which resource and which file (or endpoint) provides the data: ```python -api_config = APIConfig( - url="https://api.example.com/users", - auth={"type": "basic", "username": "user", "password": "pass"}, +from graflo.util.onto import Patterns, SparqlPattern +from pathlib import Path + +DATA_FILE = Path("data/data.ttl") + +patterns = Patterns() + +patterns.add_sparql_pattern( + "Researcher", + SparqlPattern( + rdf_class="http://example.org/Researcher", + rdf_file=DATA_FILE, + resource_name="Researcher", + ), +) + +patterns.add_sparql_pattern( + "Publication", + SparqlPattern( + rdf_class="http://example.org/Publication", + rdf_file=DATA_FILE, + resource_name="Publication", + ), +) + +patterns.add_sparql_pattern( + "Institution", + SparqlPattern( + rdf_class="http://example.org/Institution", + rdf_file=DATA_FILE, + resource_name="Institution", + ), ) ``` -### Bearer Token +Each `SparqlPattern` contains: + +| Field | Purpose | +|---|---| +| `rdf_class` | Full URI of the `rdf:Class` whose instances this pattern fetches | +| `rdf_file` | Path to the local RDF file containing the instance data | +| `resource_name` | Name of the graflo resource this pattern maps to | + +**Alternative: Remote SPARQL Endpoint** + +To read data from a SPARQL endpoint (e.g. Apache Fuseki) instead of a local file, replace `rdf_file` with `endpoint_url`: ```python -api_config = APIConfig( - url="https://api.example.com/users", - auth={"type": "bearer", "token": "your-token"}, +patterns.add_sparql_pattern( + "Researcher", + SparqlPattern( + rdf_class="http://example.org/Researcher", + endpoint_url="http://localhost:3030/dataset/sparql", + resource_name="Researcher", + ), ) ``` -### Custom Headers +### Step 4: Define Schema and Ingest + +Finally, define the graph schema in the target database and ingest the data in one operation: ```python -api_config = APIConfig( - url="https://api.example.com/users", - headers={"X-API-Key": "your-api-key"}, +from graflo.hq import IngestionParams + +engine.define_and_ingest( + schema=schema, + target_db_config=conn_conf, + patterns=patterns, + ingestion_params=IngestionParams(clear_data=True), + recreate_schema=True, ) ``` -## Combining Multiple Data Sources +**What happens during ingestion:** + +1. **Pattern resolution** β€” Each `SparqlPattern` is resolved: the local RDF file is parsed with `rdflib`, filtering triples by the specified `rdf:Class` +2. **Flat dict conversion** β€” RDF triples are grouped by subject URI and converted to flat dictionaries (`{_key, _uri, field1, field2, ...}`) +3. **Vertex creation** β€” For each resource, the corresponding flat dicts become vertices +4. **Edge creation** β€” Object property values (URIs) are matched to target vertices, creating edges +5. **Database write** β€” Data is written to the target graph database -You can combine multiple data sources for the same resource: +## Complete Example ```python -registry = DataSourceRegistry() +import logging +from pathlib import Path + +from graflo.db.connection.onto import ArangoConfig +from graflo.hq import GraphEngine, IngestionParams +from graflo.util.onto import Patterns, SparqlPattern + +logging.basicConfig(level=logging.WARNING, handlers=[logging.StreamHandler()]) +logging.getLogger("graflo").setLevel(logging.DEBUG) + +DATA_DIR = Path(__file__).parent / "data" +ONTOLOGY_FILE = DATA_DIR / "ontology.ttl" +DATA_FILE = DATA_DIR / "data.ttl" + +# Step 1: Target database +conn_conf = ArangoConfig.from_docker_env() +db_type = conn_conf.connection_type + +# Step 2: Infer schema from ontology +engine = GraphEngine(target_db_flavor=db_type) +schema = engine.infer_schema_from_rdf(source=ONTOLOGY_FILE, schema_name="academic_kg") + +# Step 3: Explicit resource mapping +patterns = Patterns() +for cls_name in ("Researcher", "Publication", "Institution"): + patterns.add_sparql_pattern( + cls_name, + SparqlPattern( + rdf_class=f"http://example.org/{cls_name}", + rdf_file=DATA_FILE, + resource_name=cls_name, + ), + ) + +# Step 4: Define schema and ingest +engine.define_and_ingest( + schema=schema, + target_db_config=conn_conf, + patterns=patterns, + ingestion_params=IngestionParams(clear_data=True), + recreate_schema=True, +) -# API source -api_source = DataSourceFactory.create_api_data_source(api_config) -registry.register(api_source, resource_name="users") +print(f"Schema: {schema.general.name}") +print(f"Vertices: {len(schema.vertex_config.vertices)}") +print(f"Edges: {len(list(schema.edge_config.edges_list()))}") +print(f"Resources: {len(schema.resources)}") +``` -# File source -file_source = DataSourceFactory.create_file_data_source(path="users_backup.json") -registry.register(file_source, resource_name="users") +## How It Works -# Both will be processed and combined -from graflo.hq.caster import IngestionParams +### RDF-to-Graph Mapping -ingestion_params = IngestionParams( - clear_data=True, -) +The ingestion pipeline converts RDF triples into graph elements: -caster.ingest_data_sources( - data_source_registry=registry, - conn_conf=conn_conf, - ingestion_params=ingestion_params, -) +| RDF Concept | Graph Element | Example | +|---|---|---| +| Subject URI | Vertex `_key` (local name) | `ex:alice` β†’ `_key: "alice"` | +| `rdf:type` filter | Vertex type | `a ex:Researcher` β†’ `Researcher` vertex | +| Datatype property value | Vertex field | `ex:fullName "Alice"` β†’ `fullName: "Alice"` | +| Object property value | Edge | `ex:worksFor ex:mit` β†’ edge to `Institution` vertex | + +### Data Flow + +```mermaid +flowchart LR + A["ontology.ttl
(TBox)"] -->|infer_schema_from_rdf| B["Schema
(vertices, edges, resources)"] + C["data.ttl
(ABox)"] -->|SparqlPattern + rdflib| D["Flat dicts per class"] + B --> E["define_and_ingest"] + D --> E + E --> F["Graph DB
(ArangoDB / Neo4j / ...)"] ``` +## Key Concepts + +### Explicit vs Automatic Pattern Creation + +| Approach | Method | Best for | +|---|---|---| +| **Explicit** (this example) | Build `SparqlPattern` objects by hand | Fine-grained control over class URIs, files, endpoints | +| **Automatic** | `engine.create_patterns_from_rdf()` | Quick setup when ontology and data are co-located | + +### SparqlPattern Modes + +Each `SparqlPattern` can operate in one of two modes: + +- **File mode** (`rdf_file` set) β€” Parses a local RDF file with `rdflib`, filtering by `rdf_class` +- **Endpoint mode** (`endpoint_url` set) β€” Queries a remote SPARQL endpoint via `SPARQLWrapper` + +These modes are mutually exclusive. Use file mode for small-to-medium datasets shipped alongside the code, and endpoint mode when data lives in a triplestore (Fuseki, Blazegraph, GraphDB, etc.). + +## Key Takeaways + +1. **OWL ontology inference** eliminates manual schema definition β€” `owl:Class` becomes vertices, `owl:DatatypeProperty` becomes fields, `owl:ObjectProperty` becomes edges +2. **Explicit `SparqlPattern` mapping** gives full control over which class URI maps to which resource and data source +3. **Local file and remote endpoint** modes are both supported via the same `SparqlPattern` abstraction +4. **No intermediate formats** β€” RDF triples are converted directly to flat dicts and ingested into the graph database +5. **Reusable ontology** β€” The same ontology file can drive schema inference for different data files or endpoints + +## Next Steps + +- Learn about [RDF data sources](../reference/data_source/rdf.md) for the full API +- Explore [RDF schema inference](../reference/hq/rdf_inferencer.md) for advanced options +- See the [full example code](https://github.com/growgraph/graflo/tree/main/examples/6-ingest-rdf) for the complete implementation + +For more examples and detailed explanations, refer to the [API Reference](../reference/index.md). diff --git a/docs/examples/example-7.md b/docs/examples/example-7.md new file mode 100644 index 00000000..32538656 --- /dev/null +++ b/docs/examples/example-7.md @@ -0,0 +1,232 @@ +# Example 6: REST API Data Source [coming] + +This example demonstrates how to ingest data from a REST API endpoint into a graph database. + +## Scenario + +Suppose you have a REST API that provides user data with pagination. You want to ingest this data into a graph database. + +## API Response Format + +The API returns data in the following format: + +```json +{ + "data": [ + {"id": 1, "name": "Alice", "department": "Engineering"}, + {"id": 2, "name": "Bob", "department": "Sales"} + ], + "has_more": true, + "offset": 0, + "limit": 100, + "total": 250 +} +``` + +## Schema Definition + +Define your schema as usual: + +```yaml +general: + name: users_api + +vertices: + - name: person + fields: + - id + - name + - department + indexes: + - fields: + - id + +resources: + - resource_name: users + apply: + - vertex: person +``` + +## Using API Data Source + +### Python Code + +```python +from suthing import FileHandle +from graflo import Caster, DataSourceRegistry, Schema +from graflo.data_source import DataSourceFactory, APIConfig, PaginationConfig +from graflo.db.connection.onto import DBConfig + +# Load schema +schema = Schema.from_dict(FileHandle.load("schema.yaml")) + +# Create API configuration +api_config = APIConfig( + url="https://api.example.com/users", + method="GET", + headers={"Authorization": "Bearer your-token"}, + pagination=PaginationConfig( + strategy="offset", + offset_param="offset", + limit_param="limit", + page_size=100, + has_more_path="has_more", + data_path="data", + ), +) + +# Create API data source +api_source = DataSourceFactory.create_api_data_source(api_config) + +# Register with resource +registry = DataSourceRegistry() +registry.register(api_source, resource_name="users") + +# Create caster and ingest +from graflo.hq.caster import IngestionParams + +caster = Caster(schema) +# Load config from file +config_data = FileHandle.load("db.yaml") +conn_conf = DBConfig.from_dict(config_data) + +ingestion_params = IngestionParams( + clear_data=True, + batch_size=1000, # Process 1000 items per batch +) + +caster.ingest_data_sources( + data_source_registry=registry, + conn_conf=conn_conf, + ingestion_params=ingestion_params, +) +``` + +### Using Configuration File + +Create a data source configuration file (`data_sources.yaml`): + +```yaml +data_sources: + - source_type: api + resource_name: users + config: + url: https://api.example.com/users + method: GET + headers: + Authorization: "Bearer your-token" + pagination: + strategy: offset + offset_param: offset + limit_param: limit + page_size: 100 + has_more_path: has_more + data_path: data +``` + +Then use the CLI: + +```bash +uv run ingest \ + --db-config-path config/db.yaml \ + --schema-path config/schema.yaml \ + --data-source-config-path data_sources.yaml +``` + +## Pagination Strategies + +### Offset-based Pagination + +```python +pagination = PaginationConfig( + strategy="offset", + offset_param="offset", + limit_param="limit", + page_size=100, + has_more_path="has_more", + data_path="data", +) +``` + +### Cursor-based Pagination + +```python +pagination = PaginationConfig( + strategy="cursor", + cursor_param="next_cursor", + cursor_path="next_cursor", + page_size=100, + data_path="items", +) +``` + +### Page-based Pagination + +```python +pagination = PaginationConfig( + strategy="page", + page_param="page", + per_page_param="per_page", + page_size=50, + data_path="results", +) +``` + +## Authentication + +### Basic Authentication + +```python +api_config = APIConfig( + url="https://api.example.com/users", + auth={"type": "basic", "username": "user", "password": "pass"}, +) +``` + +### Bearer Token + +```python +api_config = APIConfig( + url="https://api.example.com/users", + auth={"type": "bearer", "token": "your-token"}, +) +``` + +### Custom Headers + +```python +api_config = APIConfig( + url="https://api.example.com/users", + headers={"X-API-Key": "your-api-key"}, +) +``` + +## Combining Multiple Data Sources + +You can combine multiple data sources for the same resource: + +```python +registry = DataSourceRegistry() + +# API source +api_source = DataSourceFactory.create_api_data_source(api_config) +registry.register(api_source, resource_name="users") + +# File source +file_source = DataSourceFactory.create_file_data_source(path="users_backup.json") +registry.register(file_source, resource_name="users") + +# Both will be processed and combined +from graflo.hq.caster import IngestionParams + +ingestion_params = IngestionParams( + clear_data=True, +) + +caster.ingest_data_sources( + data_source_registry=registry, + conn_conf=conn_conf, + ingestion_params=ingestion_params, +) +``` + diff --git a/docs/examples/index.md b/docs/examples/index.md index 685d1961..8ab5d27e 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -5,4 +5,5 @@ 3. [CSV with Edge Weights and Multiple Relations](example-3.md) 4. [Neo4j Ingestion with Dynamic Relations from Keys](example-4.md) 5. **[πŸš€ PostgreSQL Schema Inference and Ingestion](example-5.md)** - **Automatically infer graph schemas from normalized PostgreSQL databases (3NF)** with proper primary keys (PK) and foreign keys (FK). Uses intelligent heuristics to detect vertices and edges - no manual schema definition needed! Perfect for migrating relational data to graph databases. -6. [REST API Data Source](example-6.md) \ No newline at end of file +6. **[πŸ”— RDF / Turtle Ingestion with Explicit Resource Mapping](example-6.md)** - **Infer graph schemas from OWL ontologies and ingest RDF data** using explicit `SparqlPattern` resource mapping. Supports local Turtle files and remote SPARQL endpoints. Perfect for knowledge graph pipelines built on semantic web standards. +7. [REST API Data Source](example-7.md) \ No newline at end of file diff --git a/docs/getting_started/installation.md b/docs/getting_started/installation.md index daf314a3..adf429d1 100644 --- a/docs/getting_started/installation.md +++ b/docs/getting_started/installation.md @@ -3,7 +3,7 @@ ## Prerequisites - Python 3.11+ -- A graph database (Neo4j, ArangoDB, or TigerGraph) if you plan to use database features +- A graph database (ArangoDB, Neo4j, TigerGraph, FalkorDB, or Memgraph) if you plan to use database features ## Installation Methods @@ -34,8 +34,19 @@ uv sync --group dev ## Optional Dependencies -graflo has some optional dependencies that can be installed based on your needs. -In order to be able to generate schema visualizations, add graphviz deps (you will need `graphviz` package installed on your computer, e.g. `apt install graphviz-dev`) +graflo has optional dependency groups that can be installed based on your needs. + +### RDF / SPARQL support + +Adds `rdflib` and `SPARQLWrapper` for reading `.ttl`/`.rdf`/`.n3` files and querying SPARQL endpoints: + +```bash +pip install graflo[sparql] +``` + +### Schema visualisation + +For generating schema visualisations with graphviz (requires the system `graphviz` package, e.g. `apt install graphviz-dev`): ```bash pip install graflo[graphviz] @@ -53,7 +64,7 @@ print(graflo.__version__) ## Spinning up databases -Instructions on how to spin up `ArangoDB`, `Neo4j`, and `TigerGraph` as docker images using `docker compose` are provided here [github.com/growgraph/graflo/docker](https://github.com/growgraph/graflo/tree/main/docker) +Instructions on how to spin up ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, and Apache Fuseki as Docker containers using `docker compose` are provided here: [github.com/growgraph/graflo/docker](https://github.com/growgraph/graflo/tree/main/docker) ## Configuration diff --git a/docs/index.md b/docs/index.md index 00dec79e..1e3163b2 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # GraFlo graflo logo -graflo is a framework for transforming **tabular** data (CSV, SQL) and **hierarchical** data (JSON, XML) into property graphs and ingesting them into graph databases (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph). **Automatically infer schemas from normalized PostgreSQL databases (3NF)** with proper primary keys (PK) and foreign keys (FK) - uses intelligent heuristics to detect vertices and edges! +graflo is a framework for transforming **tabular** (CSV, SQL), **hierarchical** (JSON, XML), and **RDF/SPARQL** data into property graphs and ingesting them into graph databases (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph). ![Python](https://img.shields.io/badge/python-3.11%2B-blue.svg) [![PyPI version](https://badge.fury.io/py/graflo.svg)](https://badge.fury.io/py/graflo) @@ -9,8 +9,6 @@ graflo is a framework for transforming **tabular** data (CSV, SQL) and **hierarc [![pre-commit](https://github.com/growgraph/graflo/actions/workflows/pre-commit.yml/badge.svg)](https://github.com/growgraph/graflo/actions/workflows/pre-commit.yml) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.15446131.svg)]( https://doi.org/10.5281/zenodo.15446131) - - @@ -24,65 +22,49 @@ graflo works with property graphs, which consist of: - **Properties**: Both vertices and edges may have properties ### Schema -The Schema defines how your data should be transformed into a graph and contains: +The Schema defines how your data should be transformed into a graph: -- **Vertex Definitions**: Specify vertex types, their properties, and unique identifiers - - Fields can be specified as strings (backward compatible) or typed `Field` objects with types (INT, FLOAT, STRING, DATETIME, BOOL) - - Type information enables better validation and database-specific optimizations -- **Edge Definitions**: Define relationships between vertices and their properties - - Weight fields support typed definitions for better type safety -- **Resource Mapping**: describe how data sources map to vertices and edges -- **Transforms**: Modify data during the casting process -- **Automatic Schema Inference**: Generate schemas automatically from PostgreSQL 3NF databases +- **Vertex Definitions**: Vertex types, their properties, and unique identifiers. Fields may carry optional types (`INT`, `FLOAT`, `STRING`, `DATETIME`, `BOOL`). +- **Edge Definitions**: Relationships between vertices, with optional weight fields. +- **Resource Mapping**: How data sources map to vertices and edges. +- **Transforms**: Modify data during the casting process. +- **Automatic Schema Inference**: Generate schemas from PostgreSQL 3NF databases (PK/FK heuristics) or from OWL/RDFS ontologies. ### Data Sources Data Sources define where data comes from: -- **File Sources**: JSON, JSONL, CSV/TSV files +- **File Sources**: CSV, JSON, JSONL, Parquet files +- **SQL Sources**: PostgreSQL and other SQL databases via SQLAlchemy +- **RDF Sources**: Local Turtle/RDF/N3/JSON-LD files via rdflib +- **SPARQL Sources**: Remote SPARQL endpoints (e.g. Apache Fuseki) via SPARQLWrapper - **API Sources**: REST API endpoints with pagination and authentication -- **SQL Sources**: SQL databases via SQLAlchemy - **In-Memory Sources**: Python objects (lists, DataFrames) ### Resources Resources define how data is transformed into a graph (semantic mapping). They work with data from any DataSource type: - **Table-like processing**: CSV files, SQL tables, API responses -- **JSON-like processing**: JSON files, nested data structures, hierarchical API responses +- **JSON-like processing**: JSON files, nested data structures +- **RDF processing**: Triples grouped by subject into flat documents ### GraphEngine -The `GraphEngine` orchestrates graph database operations, providing a unified interface for: -- Schema inference from PostgreSQL databases -- Schema definition in target graph databases (moved from Caster) +`GraphEngine` orchestrates graph database operations: + +- Schema inference from PostgreSQL databases or RDF/OWL ontologies +- Schema definition in target graph databases - Pattern creation from data sources - Data ingestion with async support ## Key Features -- **πŸš€ PostgreSQL Schema Inference**: **Automatically generate schemas from normalized PostgreSQL databases (3NF)** - No manual schema definition needed! - - **Requirements**: Works with normalized databases (3NF) that have proper primary keys (PK) and foreign keys (FK) decorated - - Uses intelligent heuristics to classify tables as vertices or edges based on structure - - Introspect PostgreSQL schemas to identify vertex-like and edge-like tables automatically - - Automatically map PostgreSQL data types to graflo Field types (INT, FLOAT, STRING, DATETIME, BOOL) - - Infer vertex configurations from table structures with proper indexes (primary keys become vertex indexes) - - Infer edge configurations from foreign key relationships (foreign keys become edge source/target mappings) - - Create Resource mappings from PostgreSQL tables automatically - - Direct database access - ingest data without exporting to files first - - See [Example 5: PostgreSQL Schema Inference](examples/example-5.md) for a complete walkthrough -- **Graph Transformation Meta-language**: A powerful declarative language to describe how your data becomes a property graph: - - Define vertex and edge structures with typed fields - - Set compound indexes for vertices and edges - - Use blank vertices for complex relationships - - Specify edge constraints and properties with typed weight fields - - Apply advanced filtering and transformations -- **Typed Schema Definitions**: Enhanced type support throughout the schema system - - Vertex fields support types (INT, FLOAT, STRING, DATETIME, BOOL) for better validation - - Edge weight fields can specify types for improved type safety - - Backward compatible: fields without types default to None (suitable for databases like ArangoDB) -- **Async Ingestion**: Efficient async/await-based ingestion pipeline for better performance -- **Parallel Processing**: Efficient processing with multi-threading -- **Database Integration**: Seamless integration with Neo4j, ArangoDB, TigerGraph, FalkorDB, Memgraph, and PostgreSQL (as source) -- **Advanced Filtering**: Powerful filtering capabilities for data transformation with server-side filtering support -- **Blank Node Support**: Create intermediate vertices for complex relationships +- **Declarative graph transformation**: Define vertex/edge structures, indexes, weights, and transforms in YAML. Resources describe how each data source maps to vertices and edges. +- **PostgreSQL schema inference**: Automatically generate schemas from normalized PostgreSQL databases (3NF) with proper PK/FK constraints. See [Example 5](examples/example-5.md). +- **RDF / SPARQL ingestion**: Read `.ttl`/`.rdf`/`.n3` files or query SPARQL endpoints. Auto-infer schemas from OWL/RDFS ontologies: `owl:Class` maps to vertices, `owl:ObjectProperty` to edges, `owl:DatatypeProperty` to vertex fields. Install with `pip install graflo[sparql]`. +- **Typed fields**: Vertex fields and edge weights support types (`INT`, `FLOAT`, `STRING`, `DATETIME`, `BOOL`) for validation and database-specific optimisation. +- **Parallel batch processing**: Configurable batch sizes and multi-core execution. +- **Database-agnostic target**: Single API for ArangoDB, Neo4j, TigerGraph, FalkorDB, and Memgraph. +- **Advanced filtering**: Server-side filtering (e.g. TigerGraph REST++ API) and client-side filter expressions. +- **Blank vertices**: Create intermediate nodes for complex relationship modelling. ## Quick Links @@ -93,20 +75,19 @@ The `GraphEngine` orchestrates graph database operations, providing a unified in ## Use Cases -- **Data Migration**: Transform relational data into graph structures - - **PostgreSQL to Graph**: Automatically infer schemas from normalized PostgreSQL databases (3NF) with proper PK/FK constraints and migrate data directly - - Uses intelligent heuristics to detect vertices and edges - no manual schema definition required - - Perfect for migrating existing relational databases that follow normalization best practices -- **Knowledge Graphs**: Build complex knowledge representations -- **Data Integration**: Combine multiple data sources into a unified graph -- **Graph Views**: Create graph views of existing PostgreSQL databases without schema changes +- **Data Migration**: Transform relational data into graph structures. Infer schemas from PostgreSQL 3NF databases (PK/FK heuristics) and migrate data directly. +- **RDF-to-Property-Graph**: Read RDF triples from files or SPARQL endpoints, auto-infer schemas from OWL ontologies, and ingest into ArangoDB, Neo4j, etc. +- **Knowledge Graphs**: Build knowledge representations from heterogeneous sources (SQL, files, APIs, RDF). +- **Data Integration**: Combine multiple data sources into a unified property graph. +- **Graph Views**: Create graph views of existing PostgreSQL databases without schema changes. ## Requirements -- Python 3.11 or higher (Python 3.11 and 3.12 are officially supported) -- Graph database (Neo4j, ArangoDB, TigerGraph, FalkorDB, or Memgraph) for storage -- Optional: PostgreSQL or other SQL databases for data sources (with automatic schema inference support) -- Dependencies as specified in pyproject.toml +- Python 3.11 or higher (3.11 and 3.12 officially supported) +- A graph database (ArangoDB, Neo4j, TigerGraph, FalkorDB, or Memgraph) as target +- Optional: PostgreSQL for SQL data sources and schema inference +- Optional: `rdflib` + `SPARQLWrapper` for RDF/SPARQL support (`pip install graflo[sparql]`) +- Full dependency list in `pyproject.toml` ## Contributing diff --git a/docs/reference/data_source/rdf.md b/docs/reference/data_source/rdf.md new file mode 100644 index 00000000..019cb921 --- /dev/null +++ b/docs/reference/data_source/rdf.md @@ -0,0 +1,3 @@ +# `graflo.data_source.rdf` + +::: graflo.data_source.rdf diff --git a/docs/reference/hq/rdf_inferencer.md b/docs/reference/hq/rdf_inferencer.md new file mode 100644 index 00000000..5c99debc --- /dev/null +++ b/docs/reference/hq/rdf_inferencer.md @@ -0,0 +1,3 @@ +# `graflo.hq.rdf_inferencer` + +::: graflo.hq.rdf_inferencer diff --git a/examples/6-ingest-rdf/data/data.ttl b/examples/6-ingest-rdf/data/data.ttl new file mode 100644 index 00000000..0f655330 --- /dev/null +++ b/examples/6-ingest-rdf/data/data.ttl @@ -0,0 +1,88 @@ +@prefix rdf: . +@prefix rdfs: . +@prefix owl: . +@prefix xsd: . +@prefix ex: . + +# --- TBox (included so the file is self-contained) ------------------------- + +ex:Researcher a owl:Class . +ex:Publication a owl:Class . +ex:Institution a owl:Class . + +ex:fullName a owl:DatatypeProperty ; rdfs:domain ex:Researcher ; rdfs:range xsd:string . +ex:orcid a owl:DatatypeProperty ; rdfs:domain ex:Researcher ; rdfs:range xsd:string . +ex:title a owl:DatatypeProperty ; rdfs:domain ex:Publication ; rdfs:range xsd:string . +ex:year a owl:DatatypeProperty ; rdfs:domain ex:Publication ; rdfs:range xsd:integer . +ex:doi a owl:DatatypeProperty ; rdfs:domain ex:Publication ; rdfs:range xsd:string . +ex:instName a owl:DatatypeProperty ; rdfs:domain ex:Institution ; rdfs:range xsd:string . +ex:country a owl:DatatypeProperty ; rdfs:domain ex:Institution ; rdfs:range xsd:string . + +ex:authorOf a owl:ObjectProperty ; rdfs:domain ex:Researcher ; rdfs:range ex:Publication . +ex:affiliatedWith a owl:ObjectProperty ; rdfs:domain ex:Researcher ; rdfs:range ex:Institution . +ex:cites a owl:ObjectProperty ; rdfs:domain ex:Publication ; rdfs:range ex:Publication . + +# --- Institutions ---------------------------------------------------------- + +ex:mit a ex:Institution ; + ex:instName "MIT" ; + ex:country "USA" . + +ex:eth a ex:Institution ; + ex:instName "ETH ZΓΌrich" ; + ex:country "Switzerland" . + +ex:oxford a ex:Institution ; + ex:instName "University of Oxford" ; + ex:country "UK" . + +# --- Researchers ----------------------------------------------------------- + +ex:alice a ex:Researcher ; + ex:fullName "Alice Smith" ; + ex:orcid "0000-0001-0001-0001" ; + ex:affiliatedWith ex:mit ; + ex:authorOf ex:paper1 , ex:paper3 . + +ex:bob a ex:Researcher ; + ex:fullName "Bob Jones" ; + ex:orcid "0000-0001-0002-0002" ; + ex:affiliatedWith ex:eth ; + ex:authorOf ex:paper1 , ex:paper2 . + +ex:carol a ex:Researcher ; + ex:fullName "Carol Wu" ; + ex:orcid "0000-0001-0003-0003" ; + ex:affiliatedWith ex:oxford ; + ex:authorOf ex:paper2 , ex:paper3 . + +ex:dave a ex:Researcher ; + ex:fullName "Dave MΓΌller" ; + ex:orcid "0000-0001-0004-0004" ; + ex:affiliatedWith ex:eth ; + ex:authorOf ex:paper4 . + +# --- Publications ---------------------------------------------------------- + +ex:paper1 a ex:Publication ; + ex:title "Graph Databases: A Survey" ; + ex:year 2021 ; + ex:doi "10.1234/paper1" . + +ex:paper2 a ex:Publication ; + ex:title "Knowledge Graphs in Practice" ; + ex:year 2022 ; + ex:doi "10.1234/paper2" ; + ex:cites ex:paper1 . + +ex:paper3 a ex:Publication ; + ex:title "Ontology-Driven Data Integration" ; + ex:year 2023 ; + ex:doi "10.1234/paper3" ; + ex:cites ex:paper1 , ex:paper2 . + +ex:paper4 a ex:Publication ; + ex:title "SPARQL Query Optimization Techniques" ; + ex:year 2024 ; + ex:doi "10.1234/paper4" ; + ex:cites ex:paper2 , ex:paper3 . diff --git a/examples/6-ingest-rdf/data/ontology.ttl b/examples/6-ingest-rdf/data/ontology.ttl new file mode 100644 index 00000000..07e0675e --- /dev/null +++ b/examples/6-ingest-rdf/data/ontology.ttl @@ -0,0 +1,60 @@ +@prefix rdf: . +@prefix rdfs: . +@prefix owl: . +@prefix xsd: . +@prefix ex: . + +# --- Classes --------------------------------------------------------------- + +ex:Researcher a owl:Class ; + rdfs:label "Researcher" . + +ex:Publication a owl:Class ; + rdfs:label "Publication" . + +ex:Institution a owl:Class ; + rdfs:label "Institution" . + +# --- Datatype properties (vertex fields) ----------------------------------- + +ex:fullName a owl:DatatypeProperty ; + rdfs:domain ex:Researcher ; + rdfs:range xsd:string . + +ex:orcid a owl:DatatypeProperty ; + rdfs:domain ex:Researcher ; + rdfs:range xsd:string . + +ex:title a owl:DatatypeProperty ; + rdfs:domain ex:Publication ; + rdfs:range xsd:string . + +ex:year a owl:DatatypeProperty ; + rdfs:domain ex:Publication ; + rdfs:range xsd:integer . + +ex:doi a owl:DatatypeProperty ; + rdfs:domain ex:Publication ; + rdfs:range xsd:string . + +ex:instName a owl:DatatypeProperty ; + rdfs:domain ex:Institution ; + rdfs:range xsd:string . + +ex:country a owl:DatatypeProperty ; + rdfs:domain ex:Institution ; + rdfs:range xsd:string . + +# --- Object properties (edges) -------------------------------------------- + +ex:authorOf a owl:ObjectProperty ; + rdfs:domain ex:Researcher ; + rdfs:range ex:Publication . + +ex:affiliatedWith a owl:ObjectProperty ; + rdfs:domain ex:Researcher ; + rdfs:range ex:Institution . + +ex:cites a owl:ObjectProperty ; + rdfs:domain ex:Publication ; + rdfs:range ex:Publication . diff --git a/examples/6-ingest-rdf/ingest.py b/examples/6-ingest-rdf/ingest.py new file mode 100644 index 00000000..212145d2 --- /dev/null +++ b/examples/6-ingest-rdf/ingest.py @@ -0,0 +1,149 @@ +"""Example 6: Ingest data from RDF / Turtle files into a graph database. + +This example demonstrates: +- Inferring a Schema from an OWL ontology (TBox) +- Creating Patterns with explicit SparqlPattern resource mapping +- Ingesting RDF instance data (ABox) into a graph database (ArangoDB/Neo4j) + +The dataset models a small academic knowledge graph with Researchers, +Publications, and Institutions connected by authorOf, affiliatedWith, +and cites relationships. + +Prerequisites: +- Target graph database (ArangoDB or Neo4j) running +- Environment variables or .env files configured for the target database +- graflo[sparql] extra installed: pip install graflo[sparql] +""" + +import logging +from pathlib import Path + +from graflo.db.connection.onto import ArangoConfig +from graflo.hq import GraphEngine, IngestionParams +from graflo.util.onto import Patterns, SparqlPattern + +logger = logging.getLogger(__name__) + +# Configure logging: INFO level for graflo module, WARNING for others +logging.basicConfig(level=logging.WARNING, handlers=[logging.StreamHandler()]) +logging.getLogger("graflo").setLevel(logging.DEBUG) + +# --------------------------------------------------------------------------- +# Paths to RDF files (relative to this script) +# --------------------------------------------------------------------------- +DATA_DIR = Path(__file__).parent / "data" +ONTOLOGY_FILE = DATA_DIR / "ontology.ttl" +DATA_FILE = DATA_DIR / "data.ttl" + +# --------------------------------------------------------------------------- +# Step 1: Connect to target graph database +# --------------------------------------------------------------------------- +# Load config from docker/arango/.env (recommended) +# This automatically reads ARANGO_URI, ARANGO_USERNAME, ARANGO_PASSWORD, etc. +conn_conf = ArangoConfig.from_docker_env() + +# Alternative targets – uncomment the one you need: +# from graflo.db.connection.onto import Neo4jConfig, TigergraphConfig, FalkordbConfig +# conn_conf = Neo4jConfig.from_docker_env() +# conn_conf = TigergraphConfig.from_docker_env() +# conn_conf = FalkordbConfig.from_docker_env() + +# Or specify directly: +# conn_conf = ArangoConfig( +# uri="http://localhost:8535", +# username="root", +# password="123", +# database="_system", +# ) + +# Determine DB type from connection config +db_type = conn_conf.connection_type + +# --------------------------------------------------------------------------- +# Step 2: Infer Schema from the OWL ontology +# --------------------------------------------------------------------------- +# GraphEngine reads the TBox (owl:Class, owl:DatatypeProperty, +# owl:ObjectProperty) and builds vertices, fields, and edges automatically. +engine = GraphEngine(target_db_flavor=db_type) + +schema = engine.infer_schema_from_rdf( + source=ONTOLOGY_FILE, + schema_name="academic_kg", +) + +logger.info( + "Inferred schema: %d vertices, %d edges", + len(schema.vertex_config.vertices), + len(list(schema.edge_config.edges_list())), +) + +# --------------------------------------------------------------------------- +# Step 3: Build Patterns with EXPLICIT resource mapping +# --------------------------------------------------------------------------- +# Instead of engine.create_patterns_from_rdf() we construct each +# SparqlPattern by hand, pointing at the local data file and specifying +# the rdf:Class URI that each resource should fetch. +patterns = Patterns() + +patterns.add_sparql_pattern( + "Researcher", + SparqlPattern( + rdf_class="http://example.org/Researcher", + rdf_file=DATA_FILE, + resource_name="Researcher", + ), +) + +patterns.add_sparql_pattern( + "Publication", + SparqlPattern( + rdf_class="http://example.org/Publication", + rdf_file=DATA_FILE, + resource_name="Publication", + ), +) + +patterns.add_sparql_pattern( + "Institution", + SparqlPattern( + rdf_class="http://example.org/Institution", + rdf_file=DATA_FILE, + resource_name="Institution", + ), +) + +# Alternative: point patterns at a remote SPARQL endpoint instead of a file +# patterns.add_sparql_pattern( +# "Researcher", +# SparqlPattern( +# rdf_class="http://example.org/Researcher", +# endpoint_url="http://localhost:3030/dataset/sparql", +# resource_name="Researcher", +# ), +# ) + +# --------------------------------------------------------------------------- +# Step 4: Define schema and ingest in one operation +# --------------------------------------------------------------------------- +engine.define_and_ingest( + schema=schema, + target_db_config=conn_conf, + patterns=patterns, + ingestion_params=IngestionParams(clear_data=True), + recreate_schema=True, +) + +print("\n" + "=" * 80) +print("Ingestion complete!") +print("=" * 80) +print(f"\nSchema: {schema.general.name}") +print(f"Vertices: {len(schema.vertex_config.vertices)}") +print(f"Edges: {len(list(schema.edge_config.edges_list()))}") +print(f"Resources: {len(schema.resources)}") +print("=" * 80) + +# View the ingested data in your graph database's web interface: +# - ArangoDB: http://localhost:8535 (check ARANGO_PORT in docker/arango/.env) +# - Neo4j: http://localhost:7475 (check NEO4J_PORT in docker/neo4j/.env) +# - TigerGraph: http://localhost:14241 (check TG_WEB in docker/tigergraph/.env) +# - FalkorDB: http://localhost:3001 (check FALKORDB_BROWSER_PORT in docker/falkordb/.env) diff --git a/graflo/data_source/__init__.py b/graflo/data_source/__init__.py index 7f8f1a8e..761cae4b 100644 --- a/graflo/data_source/__init__.py +++ b/graflo/data_source/__init__.py @@ -46,3 +46,23 @@ "SQLDataSource", "TableFileDataSource", ] + +# RDF / SPARQL data sources are optional (require graflo[sparql]) +try: + from .rdf import ( # noqa: F401 + RdfDataSource, + RdfFileDataSource, + SparqlDataSource, + SparqlEndpointDataSource, + SparqlSourceConfig, + ) + + __all__ += [ + "RdfDataSource", + "RdfFileDataSource", + "SparqlDataSource", + "SparqlEndpointDataSource", + "SparqlSourceConfig", + ] +except ImportError: + pass diff --git a/graflo/data_source/base.py b/graflo/data_source/base.py index 30dd4072..e21c2f46 100644 --- a/graflo/data_source/base.py +++ b/graflo/data_source/base.py @@ -21,12 +21,14 @@ class DataSourceType(BaseEnum): API: REST API data sources SQL: SQL database data sources IN_MEMORY: In-memory data sources (lists, DataFrames) + SPARQL: RDF data sources (local files via rdflib, remote endpoints via SPARQLWrapper) """ FILE = "file" API = "api" SQL = "sql" IN_MEMORY = "in_memory" + SPARQL = "sparql" class AbstractDataSource(ConfigBaseModel, abc.ABC): diff --git a/graflo/data_source/rdf.py b/graflo/data_source/rdf.py new file mode 100644 index 00000000..99f1959d --- /dev/null +++ b/graflo/data_source/rdf.py @@ -0,0 +1,362 @@ +"""RDF data source hierarchy. + +Provides two concrete data sources that share a common abstract parent: + +* :class:`RdfFileDataSource` – reads local RDF files (Turtle, RDF/XML, N3, + JSON-LD, …) via *rdflib*. +* :class:`SparqlEndpointDataSource` – queries a remote SPARQL endpoint + (e.g. Apache Fuseki) via *SPARQLWrapper*. + +Both convert RDF triples into flat dictionaries grouped by subject URI, one +dict per ``rdf:Class`` instance. + +Requires the ``sparql`` extra:: + + pip install graflo[sparql] +""" + +from __future__ import annotations + +import abc +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any, Iterator + +from pydantic import Field + +from graflo.architecture.base import ConfigBaseModel +from graflo.data_source.base import AbstractDataSource, DataSourceType + +if TYPE_CHECKING: + from rdflib import Graph + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------ # +# Shared helpers # +# ------------------------------------------------------------------ # + +# rdflib extension -> format mapping +_EXT_FORMAT: dict[str, str] = { + ".ttl": "turtle", + ".turtle": "turtle", + ".rdf": "xml", + ".xml": "xml", + ".n3": "n3", + ".nt": "nt", + ".nq": "nquads", + ".jsonld": "json-ld", + ".json": "json-ld", + ".trig": "trig", +} + + +def _local_name(uri: str) -> str: + """Extract the local name (fragment or last path segment) from a URI.""" + if "#" in uri: + return uri.rsplit("#", 1)[-1] + return uri.rsplit("/", 1)[-1] + + +def _triples_to_docs( + graph: Graph, + rdf_class: str | None = None, +) -> list[dict]: + """Convert triples from *graph* into flat dictionaries grouped by subject. + + When *rdf_class* is given only subjects that are ``a `` are + returned. Otherwise all subjects are included. + + Each dict has an ``_uri`` key with the full subject URI plus one key per + predicate local-name. + """ + from rdflib import RDF, URIRef + from rdflib.term import BNode, Literal + + if rdf_class: + subjects = { + s + for s in graph.subjects(RDF.type, URIRef(rdf_class)) + if isinstance(s, (URIRef, BNode)) + } + else: + subjects = {s for s in graph.subjects() if isinstance(s, (URIRef, BNode))} + + docs: list[dict] = [] + for subj in subjects: + doc: dict = {"_uri": str(subj), "_key": _local_name(str(subj))} + for pred, obj in graph.predicate_objects(subj): + pred_name = _local_name(str(pred)) + if pred_name == "type": + continue + value = obj.toPython() if isinstance(obj, Literal) else str(obj) + if pred_name in doc: + existing = doc[pred_name] + if isinstance(existing, list): + existing.append(value) + else: + doc[pred_name] = [existing, value] + else: + doc[pred_name] = value + docs.append(doc) + return docs + + +def _sparql_results_to_docs(results: dict[str, Any]) -> list[dict]: + """Convert a SPARQL ``SELECT ?s ?p ?o`` result set to flat dicts. + + Groups bindings by subject and converts predicate/object pairs to + ``{predicate_local_name: value}`` dictionaries. + """ + subjects: dict[str, dict] = {} + for binding in results.get("results", {}).get("bindings", []): + s_val = binding["s"]["value"] + p_val = binding["p"]["value"] + o_binding = binding["o"] + + p_name = _local_name(p_val) + if p_name == "type": + continue + + value: Any + if o_binding["type"] == "literal": + raw = o_binding["value"] + datatype = o_binding.get("datatype", "") + if "integer" in datatype: + value = int(raw) + elif "float" in datatype or "double" in datatype or "decimal" in datatype: + value = float(raw) + elif "boolean" in datatype: + value = raw.lower() in ("true", "1") + else: + value = raw + else: + value = o_binding["value"] + + if s_val not in subjects: + subjects[s_val] = {"_uri": s_val, "_key": _local_name(s_val)} + + doc = subjects[s_val] + if p_name in doc: + existing = doc[p_name] + if isinstance(existing, list): + existing.append(value) + else: + doc[p_name] = [existing, value] + else: + doc[p_name] = value + + return list(subjects.values()) + + +# ------------------------------------------------------------------ # +# Abstract parent # +# ------------------------------------------------------------------ # + + +class RdfDataSource(AbstractDataSource, abc.ABC): + """Abstract base for RDF data sources (file and endpoint). + + Captures the fields and batch-yielding logic shared by both + :class:`RdfFileDataSource` and :class:`SparqlEndpointDataSource`. + + Attributes: + rdf_class: Optional URI of the ``rdf:Class`` to filter subjects by. + """ + + source_type: DataSourceType = DataSourceType.SPARQL + rdf_class: str | None = Field( + default=None, description="URI of the rdf:Class to filter by" + ) + + @staticmethod + def _yield_batches( + docs: list[dict], batch_size: int, limit: int | None + ) -> Iterator[list[dict]]: + """Apply *limit*, then yield *docs* in chunks of *batch_size*.""" + if limit is not None: + docs = docs[:limit] + for i in range(0, max(len(docs), 1), batch_size): + batch = docs[i : i + batch_size] + if batch: + yield batch + + +# ------------------------------------------------------------------ # +# File transport # +# ------------------------------------------------------------------ # + + +class RdfFileDataSource(RdfDataSource): + """Data source for local RDF files. + + Parses RDF files using *rdflib* and yields flat dictionaries grouped by + subject URI. Optionally filters by ``rdf_class`` so that only instances + of a specific class are returned. + + Attributes: + path: Path to the RDF file. + rdf_format: Explicit rdflib format string (e.g. ``"turtle"``). + When ``None`` the format is guessed from the file extension. + """ + + path: Path + rdf_format: str | None = Field( + default=None, description="rdflib serialization format" + ) + + def _resolve_format(self) -> str: + """Return the rdflib format string, guessing from extension if needed.""" + if self.rdf_format: + return self.rdf_format + ext = self.path.suffix.lower() + fmt = _EXT_FORMAT.get(ext) + if fmt is None: + raise ValueError( + f"Cannot determine RDF format for extension '{ext}'. " + f"Set rdf_format explicitly. Known: {list(_EXT_FORMAT.keys())}" + ) + return fmt + + def iter_batches( + self, batch_size: int = 1000, limit: int | None = None + ) -> Iterator[list[dict]]: + """Parse the RDF file and yield batches of flat dictionaries.""" + try: + from rdflib import Graph + except ImportError as exc: + raise ImportError( + "rdflib is required for RDF data sources. " + "Install it with: pip install graflo[sparql]" + ) from exc + + g = Graph() + g.parse(str(self.path), format=self._resolve_format()) + logger.info( + "Parsed %d triples from %s (format=%s)", + len(g), + self.path, + self._resolve_format(), + ) + + docs = _triples_to_docs(g, rdf_class=self.rdf_class) + yield from self._yield_batches(docs, batch_size, limit) + + +# ------------------------------------------------------------------ # +# Endpoint transport # +# ------------------------------------------------------------------ # + + +class SparqlSourceConfig(ConfigBaseModel): + """Configuration for a SPARQL endpoint data source. + + Attributes: + endpoint_url: Full SPARQL query endpoint URL + (e.g. ``http://localhost:3030/dataset/sparql``) + rdf_class: URI of the rdf:Class whose instances to fetch + graph_uri: Named graph to restrict the query to (optional) + sparql_query: Custom SPARQL query override (optional) + username: HTTP basic-auth username (optional) + password: HTTP basic-auth password (optional) + page_size: Number of results per SPARQL LIMIT/OFFSET page + """ + + endpoint_url: str + rdf_class: str | None = None + graph_uri: str | None = None + sparql_query: str | None = None + username: str | None = None + password: str | None = None + page_size: int = Field(default=10_000, description="SPARQL pagination page size") + + def build_query(self, offset: int = 0, limit: int | None = None) -> str: + """Build a SPARQL SELECT query. + + If *sparql_query* is set it is returned with LIMIT/OFFSET appended. + Otherwise generates:: + + SELECT ?s ?p ?o WHERE { ?s a . ?s ?p ?o . } + """ + if self.sparql_query: + base = self.sparql_query.rstrip().rstrip(";") + else: + graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else "" + graph_close = "}" if self.graph_uri else "" + class_filter = f"?s a <{self.rdf_class}> . " if self.rdf_class else "" + base = ( + f"SELECT ?s ?p ?o WHERE {{ " + f"{graph_open} " + f"{class_filter}" + f"?s ?p ?o . " + f"{graph_close} " + f"}}" + ) + + effective_limit = limit if limit is not None else self.page_size + return f"{base} LIMIT {effective_limit} OFFSET {offset}" + + +class SparqlEndpointDataSource(RdfDataSource): + """Data source that reads from a SPARQL endpoint. + + Uses ``SPARQLWrapper`` to query an endpoint and returns flat dictionaries + grouped by subject. + + Attributes: + config: SPARQL source configuration. + """ + + config: SparqlSourceConfig + + def _create_wrapper(self) -> Any: + """Create a configured ``SPARQLWrapper`` instance.""" + try: + from SPARQLWrapper import JSON, SPARQLWrapper + except ImportError as exc: + raise ImportError( + "SPARQLWrapper is required for SPARQL endpoint data sources. " + "Install it with: pip install graflo[sparql]" + ) from exc + + sparql = SPARQLWrapper(self.config.endpoint_url) + sparql.setReturnFormat(JSON) + if self.config.username and self.config.password: + sparql.setCredentials(self.config.username, self.config.password) + return sparql + + def iter_batches( + self, batch_size: int = 1000, limit: int | None = None + ) -> Iterator[list[dict]]: + """Query the SPARQL endpoint and yield batches of flat dictionaries. + + Paginates using SPARQL LIMIT/OFFSET. + """ + wrapper = self._create_wrapper() + offset = 0 + all_docs: list[dict] = [] + + while True: + query = self.config.build_query(offset=offset, limit=self.config.page_size) + wrapper.setQuery(query) + + logger.debug("SPARQL query (offset=%d): %s", offset, query) + results = wrapper.queryAndConvert() + + bindings = results.get("results", {}).get("bindings", []) + if not bindings: + break + + page_docs = _sparql_results_to_docs(results) + all_docs.extend(page_docs) + + if len(bindings) < self.config.page_size: + break + + offset += self.config.page_size + + yield from self._yield_batches(all_docs, batch_size, limit) + + +# Backward-compatible alias +SparqlDataSource = SparqlEndpointDataSource diff --git a/graflo/db/connection/config_mapping.py b/graflo/db/connection/config_mapping.py index fbef6ace..6addfb54 100644 --- a/graflo/db/connection/config_mapping.py +++ b/graflo/db/connection/config_mapping.py @@ -8,6 +8,7 @@ NebulaConfig, Neo4jConfig, PostgresConfig, + SparqlEndpointConfig, TigergraphConfig, ) from ... import DBType @@ -21,6 +22,7 @@ DBType.MEMGRAPH: MemgraphConfig, DBType.NEBULA: NebulaConfig, DBType.POSTGRES: PostgresConfig, + DBType.SPARQL: SparqlEndpointConfig, } diff --git a/graflo/db/connection/onto.py b/graflo/db/connection/onto.py index 7c9ab049..13777cfb 100644 --- a/graflo/db/connection/onto.py +++ b/graflo/db/connection/onto.py @@ -29,6 +29,7 @@ DBType.MYSQL, DBType.MONGODB, DBType.SQLITE, + DBType.SPARQL, # RDF / SPARQL endpoints } # Databases that can be used as targets (OUTPUT) @@ -1221,3 +1222,107 @@ def from_docker_env(cls, docker_dir: str | Path | None = None) -> "PostgresConfi ) return cls(**config_data) + + +class SparqlEndpointConfig(DBConfig): + """Configuration for SPARQL endpoint connections (e.g. Apache Fuseki). + + SPARQL endpoints are used as **source-only** data stores. The endpoint + exposes one or more datasets, each reachable at + ``//sparql`` for queries and ``//data`` for + graph-store operations. + + Attributes: + dataset: Dataset (repository) name on the SPARQL server. + """ + + model_config = SettingsConfigDict( + env_prefix="SPARQL_", + case_sensitive=False, + ) + + dataset: str | None = Field( + default=None, description="Dataset / repository name on the SPARQL server" + ) + + def _get_default_port(self) -> int: + """Default Fuseki HTTP port.""" + return 3030 + + def _get_effective_database(self) -> str | None: + """SPARQL endpoints have no database level.""" + return None + + def _get_effective_schema(self) -> str | None: + """Dataset name acts as the schema equivalent.""" + return self.dataset + + @property + def query_endpoint(self) -> str: + """Full SPARQL query endpoint URL. + + Returns: + URL like ``http://localhost:3030/dataset/sparql`` + """ + base = (self.uri or "").rstrip("/") + ds = self.dataset or "" + return f"{base}/{ds}/sparql" + + @property + def graph_store_endpoint(self) -> str: + """Full SPARQL Graph Store endpoint URL. + + Returns: + URL like ``http://localhost:3030/dataset/data`` + """ + base = (self.uri or "").rstrip("/") + ds = self.dataset or "" + return f"{base}/{ds}/data" + + @classmethod + def from_docker_env( + cls, docker_dir: str | Path | None = None + ) -> "SparqlEndpointConfig": + """Load SPARQL / Fuseki config from ``docker/fuseki/.env``. + + Expected env vars (see ``docker/fuseki/.env``): + + - ``TS_PORT`` – mapped host port (default 3030) + - ``TS_USERNAME`` – admin user (default ``admin``) + - ``TS_PASSWORD`` – admin password + - ``TS_DATASET`` – dataset name (optional) + """ + if docker_dir is None: + docker_dir = ( + Path(__file__).parent.parent.parent.parent / "docker" / "fuseki" + ) + else: + docker_dir = Path(docker_dir) + + env_file = docker_dir / ".env" + if not env_file.exists(): + raise FileNotFoundError(f"Environment file not found: {env_file}") + + env_vars: Dict[str, str] = {} + with open(env_file, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, value = line.split("=", 1) + env_vars[key.strip()] = value.strip().strip('"').strip("'") + + config_data: Dict[str, Any] = {} + + port = env_vars.get("TS_PORT", "3030") + hostname = env_vars.get("TS_HOSTNAME", "localhost") + protocol = env_vars.get("TS_PROTOCOL", "http") + config_data["uri"] = f"{protocol}://{hostname}:{port}" + + config_data["username"] = env_vars.get("TS_USERNAME", "admin") + if "TS_PASSWORD" in env_vars: + config_data["password"] = env_vars["TS_PASSWORD"] + + if "TS_DATASET" in env_vars: + config_data["dataset"] = env_vars["TS_DATASET"] + + return cls(**config_data) diff --git a/graflo/hq/graph_engine.py b/graflo/hq/graph_engine.py index 88ac1565..4f51499d 100644 --- a/graflo/hq/graph_engine.py +++ b/graflo/hq/graph_engine.py @@ -11,12 +11,14 @@ from graflo.onto import DBType from graflo.architecture.onto_sql import SchemaIntrospectionResult from graflo.db import ConnectionManager, PostgresConnection -from graflo.db.connection.onto import DBConfig, PostgresConfig +from graflo.db.connection.onto import DBConfig, PostgresConfig, SparqlEndpointConfig from graflo.hq.caster import Caster, IngestionParams from graflo.hq.inferencer import InferenceManager from graflo.hq.resource_mapper import ResourceMapper from graflo.util.onto import Patterns +from pathlib import Path + logger = logging.getLogger(__name__) @@ -253,3 +255,79 @@ def ingest( patterns=patterns or Patterns(), ingestion_params=ingestion_params, ) + + # ------------------------------------------------------------------ + # RDF / SPARQL inference + # ------------------------------------------------------------------ + + def infer_schema_from_rdf( + self, + source: str | Path, + *, + endpoint_url: str | None = None, + graph_uri: str | None = None, + schema_name: str | None = None, + ) -> Schema: + """Infer a graflo Schema from an RDF / OWL ontology. + + Reads the TBox (class and property declarations) and produces + vertices (from ``owl:Class``), fields (from ``owl:DatatypeProperty``), + and edges (from ``owl:ObjectProperty`` with domain/range). + + Args: + source: Path to an RDF file (e.g. ``ontology.ttl``) or a base + URL when using *endpoint_url*. + endpoint_url: Optional SPARQL endpoint to CONSTRUCT the + ontology from. + graph_uri: Named graph containing the ontology. + schema_name: Name for the resulting schema. + + Returns: + A fully initialised :class:`Schema`. + """ + from graflo.hq.rdf_inferencer import RdfInferenceManager + + mgr = RdfInferenceManager(target_db_flavor=self.target_db_flavor) + return mgr.infer_schema( + source, + endpoint_url=endpoint_url, + graph_uri=graph_uri, + schema_name=schema_name, + ) + + def create_patterns_from_rdf( + self, + source: str | Path, + *, + endpoint_url: str | None = None, + graph_uri: str | None = None, + sparql_config: SparqlEndpointConfig | None = None, + ) -> Patterns: + """Create :class:`Patterns` from an RDF ontology. + + One :class:`SparqlPattern` is created per ``owl:Class`` found in the + ontology. + + Args: + source: Path to an RDF file or base URL. + endpoint_url: SPARQL endpoint for the *data* (ABox). + graph_uri: Named graph containing the data. + sparql_config: Optional :class:`SparqlEndpointConfig` to attach + to the resulting patterns for authentication. + + Returns: + Patterns with SPARQL patterns for each class. + """ + from graflo.hq.rdf_inferencer import RdfInferenceManager + + mgr = RdfInferenceManager(target_db_flavor=self.target_db_flavor) + patterns = mgr.create_patterns( + source, + endpoint_url=endpoint_url, + graph_uri=graph_uri, + ) + + if sparql_config: + patterns.sparql_configs["default"] = sparql_config + + return patterns diff --git a/graflo/hq/rdf_inferencer.py b/graflo/hq/rdf_inferencer.py new file mode 100644 index 00000000..899e1297 --- /dev/null +++ b/graflo/hq/rdf_inferencer.py @@ -0,0 +1,266 @@ +"""RDF / OWL ontology inference manager. + +Reads the TBox (class & property definitions) from an RDF source and +produces a graflo :class:`Schema` with vertices, edges, resources, and +:class:`Patterns`. + +The mapping follows these conventions: + +- ``owl:Class`` / ``rdfs:Class`` -> **Vertex** +- ``owl:DatatypeProperty`` (``rdfs:domain``) -> **Field** on the domain vertex +- ``owl:ObjectProperty`` (``rdfs:domain``, ``rdfs:range``) -> **Edge** + (source = domain class, target = range class) +- Subject URI local name -> ``_key`` + +Requires the ``sparql`` extra:: + + pip install graflo[sparql] +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any + +from graflo.architecture.edge import Edge, EdgeConfig +from graflo.architecture.resource import Resource +from graflo.architecture.schema import Schema, SchemaMetadata +from graflo.architecture.vertex import Field as VertexField, Vertex, VertexConfig +from graflo.onto import DBType +from graflo.util.onto import Patterns, SparqlPattern + +logger = logging.getLogger(__name__) + + +def _local_name(uri: str) -> str: + """Extract the local name (fragment or last path segment) from a URI.""" + if "#" in uri: + return uri.rsplit("#", 1)[-1] + return uri.rsplit("/", 1)[-1] + + +def _load_graph( + source: str | Path, + *, + endpoint_url: str | None = None, + graph_uri: str | None = None, +) -> Any: + """Load an rdflib Graph from a file or SPARQL endpoint. + + Args: + source: Path to an RDF file **or** a SPARQL endpoint URL. + endpoint_url: If provided, used as SPARQL endpoint (overrides *source*). + graph_uri: Named graph to query from the endpoint. + + Returns: + An ``rdflib.Graph`` instance. + """ + from rdflib import Graph + + g = Graph() + + if endpoint_url: + from SPARQLWrapper import N3, SPARQLWrapper + + sparql = SPARQLWrapper(endpoint_url) + sparql.setReturnFormat(N3) + + query = "CONSTRUCT { ?s ?p ?o } WHERE { " + if graph_uri: + query += f"GRAPH <{graph_uri}> {{ ?s ?p ?o }} " + else: + query += "?s ?p ?o " + query += "}" + + sparql.setQuery(query) + raw: bytes = sparql.query().convert() # type: ignore[assignment] + g.parse(data=raw, format="n3") + else: + g.parse(str(source)) + + logger.info("Loaded %d triples from %s", len(g), source or endpoint_url) + return g + + +class RdfInferenceManager: + """Infer a graflo :class:`Schema` from an RDF / OWL ontology. + + The manager reads the TBox (class and property declarations) from an + rdflib ``Graph`` and constructs the corresponding graflo artefacts. + + Attributes: + target_db_flavor: Target graph-database flavour for downstream + schema sanitisation. + """ + + def __init__(self, target_db_flavor: DBType = DBType.ARANGO): + self.target_db_flavor = target_db_flavor + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def infer_schema( + self, + source: str | Path, + *, + endpoint_url: str | None = None, + graph_uri: str | None = None, + schema_name: str | None = None, + ) -> Schema: + """Infer a complete graflo Schema from an RDF/OWL ontology. + + Args: + source: Path to an RDF file or a base URL (when using endpoint). + endpoint_url: SPARQL endpoint to CONSTRUCT the ontology from. + graph_uri: Named graph containing the ontology. + schema_name: Name for the resulting schema. + + Returns: + A fully initialised :class:`Schema`. + """ + from rdflib import OWL, RDF, RDFS + + g = _load_graph(source, endpoint_url=endpoint_url, graph_uri=graph_uri) + + # -- Discover classes ------------------------------------------------- + classes: dict[str, str] = {} # local_name -> full URI + for cls_uri in set(g.subjects(RDF.type, OWL.Class)) | set( + g.subjects(RDF.type, RDFS.Class) + ): + uri_str = str(cls_uri) + name = _local_name(uri_str) + if ( + name + and not uri_str.startswith(str(OWL)) + and not uri_str.startswith(str(RDFS)) + ): + classes[name] = uri_str + + logger.info("Discovered %d classes: %s", len(classes), list(classes.keys())) + + # -- Discover datatype properties -> vertex fields -------------------- + fields_by_class: dict[str, list[str]] = {c: ["_key", "_uri"] for c in classes} + + for dp in g.subjects(RDF.type, OWL.DatatypeProperty): + dp_name = _local_name(str(dp)) + for domain in g.objects(dp, RDFS.domain): + domain_name = _local_name(str(domain)) + if domain_name in fields_by_class: + fields_by_class[domain_name].append(dp_name) + + # -- Discover object properties -> edges ------------------------------ + edges: list[dict[str, str]] = [] + for op in g.subjects(RDF.type, OWL.ObjectProperty): + op_name = _local_name(str(op)) + domains = [_local_name(str(d)) for d in g.objects(op, RDFS.domain)] + ranges = [_local_name(str(r)) for r in g.objects(op, RDFS.range)] + + for src in domains: + for tgt in ranges: + if src in classes and tgt in classes: + edges.append( + {"source": src, "target": tgt, "relation": op_name} + ) + + logger.info("Discovered %d edges", len(edges)) + + # -- Build Schema artefacts ------------------------------------------- + vertices = [] + for cls_name, fields in fields_by_class.items(): + vertex_fields = [VertexField(name=f) for f in fields] + vertices.append(Vertex(name=cls_name, fields=vertex_fields)) + + vertex_config = VertexConfig(vertices=vertices, db_flavor=self.target_db_flavor) + + edge_objects = [ + Edge( + source=e["source"], + target=e["target"], + relation=e.get("relation"), + ) + for e in edges + ] + edge_config = EdgeConfig(edges=edge_objects) + + # -- Build Resources (one per class) ---------------------------------- + resources: list[Resource] = [] + for cls_name in classes: + pipeline: list[dict[str, Any]] = [{"vertex": cls_name}] + for edge_def in edges: + if edge_def["source"] == cls_name: + pipeline.append( + { + "source": edge_def["source"], + "target": edge_def["target"], + "relation": edge_def.get("relation"), + } + ) + resources.append(Resource(resource_name=cls_name, pipeline=pipeline)) + + effective_name = schema_name or "rdf_schema" + schema = Schema( + general=SchemaMetadata(name=effective_name), + vertex_config=vertex_config, + edge_config=edge_config, + resources=resources, + ) + schema.finish_init() + return schema + + def create_patterns( + self, + source: str | Path, + *, + endpoint_url: str | None = None, + graph_uri: str | None = None, + ) -> Patterns: + """Create :class:`Patterns` from an RDF ontology. + + One :class:`SparqlPattern` is created per ``owl:Class`` / ``rdfs:Class``. + The ontology is always loaded from *source* (a local file). The + *endpoint_url* is attached to each pattern for runtime data queries + but is **not** used to load the ontology itself. + + Args: + source: Path to an RDF file containing the ontology. + endpoint_url: SPARQL endpoint for the data (ABox) at runtime. + graph_uri: Named graph containing the data. + + Returns: + Patterns with one SparqlPattern per class. + """ + from rdflib import OWL, RDF, RDFS + + # Always load the ontology from the local file, not from the endpoint. + g = _load_graph(source) + + classes: dict[str, str] = {} + for cls_uri in set(g.subjects(RDF.type, OWL.Class)) | set( + g.subjects(RDF.type, RDFS.Class) + ): + uri_str = str(cls_uri) + name = _local_name(uri_str) + if ( + name + and not uri_str.startswith(str(OWL)) + and not uri_str.startswith(str(RDFS)) + ): + classes[name] = uri_str + + patterns = Patterns() + for cls_name, cls_uri in classes.items(): + sp = SparqlPattern( + rdf_class=cls_uri, + endpoint_url=endpoint_url, + graph_uri=graph_uri, + rdf_file=Path(source) if not endpoint_url else None, + resource_name=cls_name, + ) + patterns.add_sparql_pattern(cls_name, sp) + + logger.info( + "Created %d SPARQL patterns from ontology", len(patterns.sparql_patterns) + ) + return patterns diff --git a/graflo/hq/registry_builder.py b/graflo/hq/registry_builder.py index 484e502a..2d2e8938 100644 --- a/graflo/hq/registry_builder.py +++ b/graflo/hq/registry_builder.py @@ -15,7 +15,7 @@ from graflo.data_source import DataSourceFactory, DataSourceRegistry from graflo.data_source.sql import SQLConfig, SQLDataSource from graflo.filter.sql import datetime_range_where_sql -from graflo.util.onto import FilePattern, ResourceType, TablePattern +from graflo.util.onto import FilePattern, ResourceType, SparqlPattern, TablePattern if TYPE_CHECKING: from graflo.hq.caster import IngestionParams @@ -87,6 +87,16 @@ def build( registry, resource_name, pattern, patterns, ingestion_params ) + elif resource_type == ResourceType.SPARQL: + if not isinstance(pattern, SparqlPattern): + logger.warning( + f"Pattern for resource '{resource_name}' is not a SparqlPattern, skipping" + ) + continue + self._register_sparql_sources( + registry, resource_name, pattern, patterns, ingestion_params + ) + else: logger.warning( f"Unsupported resource type '{resource_type}' for resource '{resource_name}', skipping" @@ -254,3 +264,92 @@ def _register_sql_table_sources( f"Failed to create data source for PostgreSQL table '{resource_name}': {e}", exc_info=True, ) + + # ------------------------------------------------------------------ + # SPARQL / RDF sources + # ------------------------------------------------------------------ + + def _register_sparql_sources( + self, + registry: DataSourceRegistry, + resource_name: str, + pattern: SparqlPattern, + patterns: "Patterns", + ingestion_params: "IngestionParams", + ) -> None: + """Register SPARQL data sources for a resource. + + Handles two modes: + + * **Endpoint mode** (``pattern.endpoint_url`` is set): creates a + :class:`SparqlEndpointDataSource` that queries the remote SPARQL + endpoint. + * **File mode** (``pattern.rdf_file`` is set): creates an + :class:`RdfFileDataSource` that parses a local RDF file. + """ + try: + if pattern.endpoint_url: + from graflo.data_source.rdf import ( + SparqlEndpointDataSource, + SparqlSourceConfig, + ) + + sparql_config = patterns.get_sparql_config(resource_name) + username = ( + getattr(sparql_config, "username", None) if sparql_config else None + ) + password = ( + getattr(sparql_config, "password", None) if sparql_config else None + ) + + source_config = SparqlSourceConfig( + endpoint_url=pattern.endpoint_url, + rdf_class=pattern.rdf_class, + graph_uri=pattern.graph_uri, + sparql_query=pattern.sparql_query, + username=username, + password=password, + page_size=ingestion_params.batch_size, + ) + sparql_source = SparqlEndpointDataSource(config=source_config) + registry.register(sparql_source, resource_name=resource_name) + + logger.info( + "Created SPARQL endpoint data source for class <%s> at '%s' " + "mapped to resource '%s'", + pattern.rdf_class, + pattern.endpoint_url, + resource_name, + ) + + elif pattern.rdf_file: + from graflo.data_source.rdf import RdfFileDataSource + + rdf_source = RdfFileDataSource( + path=pattern.rdf_file, + rdf_class=pattern.rdf_class, + ) + registry.register(rdf_source, resource_name=resource_name) + + logger.info( + "Created RDF file data source for class <%s> from '%s' " + "mapped to resource '%s'", + pattern.rdf_class, + pattern.rdf_file, + resource_name, + ) + + else: + logger.warning( + "SparqlPattern for resource '%s' has neither endpoint_url nor " + "rdf_file set, skipping", + resource_name, + ) + + except Exception as e: + logger.error( + "Failed to create data source for SPARQL resource '%s': %s", + resource_name, + e, + exc_info=True, + ) diff --git a/graflo/onto.py b/graflo/onto.py index 8c4d0687..da5207f7 100644 --- a/graflo/onto.py +++ b/graflo/onto.py @@ -146,11 +146,12 @@ class DBType(StrEnum, metaclass=MetaEnum): MEMGRAPH = "memgraph" NEBULA = "nebula" - # Source databases (SQL, NoSQL) + # Source databases (SQL, NoSQL, RDF) POSTGRES = "postgres" MYSQL = "mysql" MONGODB = "mongodb" SQLITE = "sqlite" + SPARQL = "sparql" # Mapping from graph DB type to expression flavor for filter rendering. diff --git a/graflo/util/onto.py b/graflo/util/onto.py index e7556403..bc0146bc 100644 --- a/graflo/util/onto.py +++ b/graflo/util/onto.py @@ -42,10 +42,12 @@ class ResourceType(BaseEnum): Attributes: FILE: File-based data source (any format: CSV, JSON, JSONL, Parquet, etc.) SQL_TABLE: SQL database table (e.g., PostgreSQL table) + SPARQL: SPARQL / RDF data source (endpoint or .ttl/.rdf files via rdflib) """ FILE = "file" SQL_TABLE = "sql_table" + SPARQL = "sparql" class ResourcePattern(ConfigBaseModel, abc.ABC): @@ -374,6 +376,87 @@ def build_query(self, effective_schema: str | None = None) -> str: return query +class SparqlPattern(ResourcePattern): + """Pattern for matching SPARQL / RDF data sources. + + Each ``SparqlPattern`` targets instances of a single ``rdf:Class``. + It can be backed either by a remote SPARQL endpoint (Fuseki, Blazegraph, ...) + or by a local RDF file parsed with *rdflib*. + + Attributes: + rdf_class: Full URI of the ``rdf:Class`` whose instances this pattern + fetches (e.g. ``"http://example.org/Person"``). + endpoint_url: SPARQL query endpoint URL. When set, instances are + fetched via HTTP. When ``None`` the pattern is for local file mode. + graph_uri: Named-graph URI to restrict the query to (optional). + sparql_query: Custom SPARQL ``SELECT`` query override. When provided + the auto-generated per-class query is skipped. + rdf_file: Path to a local RDF file (``.ttl``, ``.rdf``, ``.n3``, + ``.jsonld``). Mutually exclusive with *endpoint_url*. + """ + + rdf_class: str = Field( + ..., description="URI of the rdf:Class to fetch instances of" + ) + endpoint_url: str | None = Field( + default=None, description="SPARQL query endpoint URL" + ) + graph_uri: str | None = Field( + default=None, description="Named graph URI (optional)" + ) + sparql_query: str | None = Field( + default=None, description="Custom SPARQL query override" + ) + rdf_file: pathlib.Path | None = Field( + default=None, description="Path to a local RDF file" + ) + + def matches(self, resource_identifier: str) -> bool: + """Match by the local name (fragment) of the rdf:Class URI. + + Args: + resource_identifier: Identifier to match against + + Returns: + True when *resource_identifier* equals the class local name + """ + local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1] + return resource_identifier == local_name + + def get_resource_type(self) -> ResourceType: + """Return ``ResourceType.SPARQL``.""" + return ResourceType.SPARQL + + def build_select_query(self) -> str: + """Build a SPARQL SELECT query for instances of ``rdf_class``. + + If *sparql_query* is set it is returned as-is. Otherwise a simple + per-class query is generated:: + + SELECT ?s ?p ?o WHERE { + ?s a . + ?s ?p ?o . + } + + Returns: + SPARQL query string + """ + if self.sparql_query: + return self.sparql_query + + graph_open = f"GRAPH <{self.graph_uri}> {{" if self.graph_uri else "" + graph_close = "}" if self.graph_uri else "" + + return ( + "SELECT ?s ?p ?o WHERE { " + f"{graph_open} " + f"?s a <{self.rdf_class}> . " + f"?s ?p ?o . " + f"{graph_close} " + "}" + ) + + class Patterns(ConfigBaseModel): """Collection of named resource patterns with connection management. @@ -390,19 +473,23 @@ class Patterns(ConfigBaseModel): Attributes: file_patterns: Dictionary mapping resource names to FilePattern instances table_patterns: Dictionary mapping resource names to TablePattern instances - patterns: Property that merges file_patterns and table_patterns (for backward compatibility) + sparql_patterns: Dictionary mapping resource names to SparqlPattern instances + patterns: Property that merges all pattern dicts (for backward compatibility) postgres_configs: Dictionary mapping (config_key, schema_name) to PostgresConfig postgres_table_configs: Dictionary mapping resource_name to (config_key, schema_name, table_name) + sparql_configs: Dictionary mapping config_key to SparqlEndpointConfig """ file_patterns: dict[str, FilePattern] = Field(default_factory=dict) table_patterns: dict[str, TablePattern] = Field(default_factory=dict) + sparql_patterns: dict[str, SparqlPattern] = Field(default_factory=dict) postgres_configs: dict[tuple[str, str | None], Any] = Field( default_factory=dict, exclude=True ) postgres_table_configs: dict[str, tuple[str, str | None, str]] = Field( default_factory=dict, exclude=True ) + sparql_configs: dict[str, Any] = Field(default_factory=dict, exclude=True) # Initialization parameters (not stored in serialization); accept both _name and name resource_mapping: dict[str, str | tuple[str, str]] | None = Field( default=None, @@ -421,15 +508,16 @@ class Patterns(ConfigBaseModel): ) @property - def patterns(self) -> dict[str, TablePattern | FilePattern]: - """Merged dictionary of all patterns (file and table) for backward compatibility. + def patterns(self) -> dict[str, TablePattern | FilePattern | SparqlPattern]: + """Merged dictionary of all patterns (file, table, and SPARQL). Returns: Dictionary mapping resource names to ResourcePattern instances """ - result: dict[str, TablePattern | FilePattern] = {} + result: dict[str, TablePattern | FilePattern | SparqlPattern] = {} result.update(self.file_patterns) result.update(self.table_patterns) + result.update(self.sparql_patterns) return result @model_validator(mode="after") @@ -508,10 +596,14 @@ def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self: """ if isinstance(data, list): return cls.model_validate(data) - if "file_patterns" in data or "table_patterns" in data: + if ( + "file_patterns" in data + or "table_patterns" in data + or "sparql_patterns" in data + ): # Strip __tag__ from nested pattern dicts so extra="forbid" does not fail data = copy.deepcopy(data) - for key in ("file_patterns", "table_patterns"): + for key in ("file_patterns", "table_patterns", "sparql_patterns"): if key in data and isinstance(data[key], dict): for name, val in data[key].items(): if isinstance(val, dict) and "__tag__" in val: @@ -537,6 +629,10 @@ def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self: instance.table_patterns[pattern_name] = TablePattern.model_validate( pattern_dict ) + elif tag_val == "sparql": + instance.sparql_patterns[pattern_name] = SparqlPattern.model_validate( + pattern_dict + ) else: if "table_name" in pattern_dict: instance.table_patterns[pattern_name] = TablePattern.model_validate( @@ -546,12 +642,17 @@ def from_dict(cls, data: dict[str, Any] | list[Any]) -> Self: instance.file_patterns[pattern_name] = FilePattern.model_validate( pattern_dict ) + elif "rdf_class" in pattern_dict: + instance.sparql_patterns[pattern_name] = ( + SparqlPattern.model_validate(pattern_dict) + ) else: raise ValueError( f"Unable to determine pattern type for '{pattern_name}'. " - "Expected either '__tag__: file' or '__tag__: table', " + "Expected '__tag__: file|table|sparql', " "or pattern fields (table_name for TablePattern, " - "regex/sub_path for FilePattern)" + "regex/sub_path for FilePattern, " + "rdf_class for SparqlPattern)" ) return instance @@ -573,6 +674,38 @@ def add_table_pattern(self, name: str, table_pattern: TablePattern): """ self.table_patterns[name] = table_pattern + def add_sparql_pattern(self, name: str, sparql_pattern: SparqlPattern): + """Add a SPARQL pattern to the collection. + + Args: + name: Name of the pattern (typically the rdf:Class local name) + sparql_pattern: SparqlPattern instance + """ + self.sparql_patterns[name] = sparql_pattern + + def get_sparql_config(self, resource_name: str) -> Any: + """Get SPARQL endpoint config for a resource. + + Args: + resource_name: Name of the resource + + Returns: + SparqlEndpointConfig if resource is a SPARQL pattern, None otherwise + """ + if resource_name in self.sparql_patterns: + pattern = self.sparql_patterns[resource_name] + if pattern.endpoint_url: + for cfg in self.sparql_configs.values(): + if ( + hasattr(cfg, "query_endpoint") + and cfg.query_endpoint == pattern.endpoint_url + ): + return cfg + # Return the first config if only one is registered + if self.sparql_configs: + return next(iter(self.sparql_configs.values())) + return None + def get_postgres_config(self, resource_name: str) -> Any: """Get PostgreSQL connection config for a resource. @@ -600,6 +733,8 @@ def get_resource_type(self, resource_name: str) -> ResourceType | None: return self.file_patterns[resource_name].get_resource_type() if resource_name in self.table_patterns: return self.table_patterns[resource_name].get_resource_type() + if resource_name in self.sparql_patterns: + return self.sparql_patterns[resource_name].get_resource_type() return None def get_table_info(self, resource_name: str) -> tuple[str, str | None] | None: diff --git a/mkdocs.yml b/mkdocs.yml index 6b3409b0..2a88a542 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -62,7 +62,8 @@ nav: - 3 - CSV with Edge Weights and Multiple Relations (Neo4j): examples/example-3.md - 4 - Dynamic Relations from Keys (Neo4j): examples/example-4.md - 5 - Ingesting Data from PostgreSQL Tables: examples/example-5.md -# - 6 - REST API Data Source: examples/example-6.md + - 6 - RDF / Turtle Ingestion with Explicit Resource Mapping: examples/example-6.md +# - 7 - REST API Data Source: examples/example-7.md - API Reference: reference/ - Contributing: contributing.md markdown_extensions: diff --git a/pyproject.toml b/pyproject.toml index 83443eab..8ee586fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,10 @@ version = "1.6.0" plot = [ "pygraphviz>=1.14" ] +sparql = [ + "SPARQLWrapper>=2.0.0", + "rdflib>=7.0.0" +] [project.scripts] ingest = "graflo.cli.ingest:ingest" @@ -76,6 +80,7 @@ include = ["graflo"] include = ["graflo"] [tool.pytest.ini_options] +addopts = "-m 'not slow'" markers = [ - "slow: marks tests as slow (deselect with '-m \"not slow\"')" + "slow: marks tests as slow (run with '-m slow' or '--run-slow')" ] diff --git a/test/data_source/sparql/__init__.py b/test/data_source/sparql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/data_source/sparql/conftest.py b/test/data_source/sparql/conftest.py new file mode 100644 index 00000000..f9aef5c7 --- /dev/null +++ b/test/data_source/sparql/conftest.py @@ -0,0 +1,45 @@ +"""Fixtures for SPARQL / RDF data source tests.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + + +DATA_DIR = Path(__file__).parent / "data" +SAMPLE_ONTOLOGY = DATA_DIR / "sample_ontology.ttl" +SAMPLE_DATA = DATA_DIR / "sample_data.ttl" + + +@pytest.fixture() +def sample_ontology_path() -> Path: + """Path to the sample TBox-only ontology file.""" + return SAMPLE_ONTOLOGY + + +@pytest.fixture() +def sample_data_path() -> Path: + """Path to the sample combined TBox + ABox data file.""" + return SAMPLE_DATA + + +@pytest.fixture(scope="module") +def fuseki_config(): + """Load Fuseki / SPARQL endpoint config from docker/fuseki/.env. + + Skips if the .env file is missing (CI without Fuseki). + """ + from graflo.db.connection.onto import SparqlEndpointConfig + + try: + config = SparqlEndpointConfig.from_docker_env() + except FileNotFoundError: + pytest.skip("docker/fuseki/.env not found – Fuseki not configured") + return config + + +@pytest.fixture(scope="module") +def fuseki_query_endpoint(fuseki_config) -> str: + """Full SPARQL query endpoint URL for the Fuseki test dataset.""" + return fuseki_config.query_endpoint diff --git a/test/data_source/sparql/data/sample_data.ttl b/test/data_source/sparql/data/sample_data.ttl new file mode 100644 index 00000000..07c355c3 --- /dev/null +++ b/test/data_source/sparql/data/sample_data.ttl @@ -0,0 +1,45 @@ +@prefix rdf: . +@prefix rdfs: . +@prefix owl: . +@prefix xsd: . +@prefix ex: . + +# --- Ontology (TBox) ------------------------------------------------------- + +ex:Person a owl:Class . +ex:Organization a owl:Class . + +ex:name a owl:DatatypeProperty ; rdfs:domain ex:Person ; rdfs:range xsd:string . +ex:age a owl:DatatypeProperty ; rdfs:domain ex:Person ; rdfs:range xsd:integer . +ex:orgName a owl:DatatypeProperty ; rdfs:domain ex:Organization ; rdfs:range xsd:string . +ex:founded a owl:DatatypeProperty ; rdfs:domain ex:Organization ; rdfs:range xsd:integer . +ex:worksFor a owl:ObjectProperty ; rdfs:domain ex:Person ; rdfs:range ex:Organization . +ex:knows a owl:ObjectProperty ; rdfs:domain ex:Person ; rdfs:range ex:Person . + +# --- Instance data (ABox) -------------------------------------------------- + +ex:alice a ex:Person ; + ex:name "Alice" ; + ex:age 30 ; + ex:worksFor ex:acme ; + ex:knows ex:bob . + +ex:bob a ex:Person ; + ex:name "Bob" ; + ex:age 25 ; + ex:worksFor ex:acme ; + ex:knows ex:alice . + +ex:carol a ex:Person ; + ex:name "Carol" ; + ex:age 35 ; + ex:worksFor ex:globex ; + ex:knows ex:alice . + +ex:acme a ex:Organization ; + ex:orgName "Acme Corp" ; + ex:founded 1990 . + +ex:globex a ex:Organization ; + ex:orgName "Globex Inc" ; + ex:founded 2005 . diff --git a/test/data_source/sparql/data/sample_ontology.ttl b/test/data_source/sparql/data/sample_ontology.ttl new file mode 100644 index 00000000..74d8fd70 --- /dev/null +++ b/test/data_source/sparql/data/sample_ontology.ttl @@ -0,0 +1,41 @@ +@prefix rdf: . +@prefix rdfs: . +@prefix owl: . +@prefix xsd: . +@prefix ex: . + +# --- Classes --------------------------------------------------------------- + +ex:Person a owl:Class ; + rdfs:label "Person" . + +ex:Organization a owl:Class ; + rdfs:label "Organization" . + +# --- Datatype properties (fields) ------------------------------------------ + +ex:name a owl:DatatypeProperty ; + rdfs:domain ex:Person ; + rdfs:range xsd:string . + +ex:age a owl:DatatypeProperty ; + rdfs:domain ex:Person ; + rdfs:range xsd:integer . + +ex:orgName a owl:DatatypeProperty ; + rdfs:domain ex:Organization ; + rdfs:range xsd:string . + +ex:founded a owl:DatatypeProperty ; + rdfs:domain ex:Organization ; + rdfs:range xsd:integer . + +# --- Object properties (edges) --------------------------------------------- + +ex:worksFor a owl:ObjectProperty ; + rdfs:domain ex:Person ; + rdfs:range ex:Organization . + +ex:knows a owl:ObjectProperty ; + rdfs:domain ex:Person ; + rdfs:range ex:Person . diff --git a/test/data_source/sparql/test_rdf_file_data_source.py b/test/data_source/sparql/test_rdf_file_data_source.py new file mode 100644 index 00000000..cff40667 --- /dev/null +++ b/test/data_source/sparql/test_rdf_file_data_source.py @@ -0,0 +1,112 @@ +"""Tests for :class:`RdfFileDataSource`.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +rdflib = pytest.importorskip( + "rdflib", reason="rdflib not installed (need graflo[sparql])" +) + +from graflo.data_source.rdf import RdfFileDataSource, _triples_to_docs # noqa: E402 + + +class TestRdfFileDataSource: + """Unit tests for RDF file parsing.""" + + def test_parse_all_subjects(self, sample_data_path: Path): + """Parse a .ttl file and get all subjects as flat dicts.""" + ds = RdfFileDataSource(path=sample_data_path) + batches = list(ds.iter_batches(batch_size=100)) + + assert len(batches) >= 1 + all_docs = [doc for batch in batches for doc in batch] + + # 3 persons + 2 organisations + class/property declarations (URIRefs) + # We should get at least the 5 instance subjects + uris = {doc["_uri"] for doc in all_docs} + assert "http://example.org/alice" in uris + assert "http://example.org/acme" in uris + + def test_filter_by_rdf_class(self, sample_data_path: Path): + """Only subjects of a specific rdf:Class should be returned.""" + ds = RdfFileDataSource( + path=sample_data_path, + rdf_class="http://example.org/Person", + ) + batches = list(ds.iter_batches()) + all_docs = [doc for batch in batches for doc in batch] + + assert len(all_docs) == 3 + names = {doc.get("name") for doc in all_docs} + assert names == {"Alice", "Bob", "Carol"} + + def test_filter_organization_class(self, sample_data_path: Path): + """Filter for Organization class.""" + ds = RdfFileDataSource( + path=sample_data_path, + rdf_class="http://example.org/Organization", + ) + batches = list(ds.iter_batches()) + all_docs = [doc for batch in batches for doc in batch] + + assert len(all_docs) == 2 + names = {doc.get("orgName") for doc in all_docs} + assert names == {"Acme Corp", "Globex Inc"} + + def test_limit(self, sample_data_path: Path): + """Limit should cap the number of returned items.""" + ds = RdfFileDataSource( + path=sample_data_path, + rdf_class="http://example.org/Person", + ) + batches = list(ds.iter_batches(limit=2)) + all_docs = [doc for batch in batches for doc in batch] + assert len(all_docs) == 2 + + def test_batch_size(self, sample_data_path: Path): + """Batch size should control the batch partitioning.""" + ds = RdfFileDataSource( + path=sample_data_path, + rdf_class="http://example.org/Person", + ) + batches = list(ds.iter_batches(batch_size=1)) + assert len(batches) == 3 + for batch in batches: + assert len(batch) == 1 + + def test_key_extraction(self, sample_data_path: Path): + """Each doc should have a _key extracted from the URI local name.""" + ds = RdfFileDataSource( + path=sample_data_path, + rdf_class="http://example.org/Person", + ) + all_docs = [doc for batch in ds.iter_batches() for doc in batch] + keys = {doc["_key"] for doc in all_docs} + assert keys == {"alice", "bob", "carol"} + + def test_object_property_as_uri(self, sample_data_path: Path): + """Object properties should appear as URI strings.""" + ds = RdfFileDataSource( + path=sample_data_path, + rdf_class="http://example.org/Person", + ) + all_docs = {doc["_key"]: doc for batch in ds.iter_batches() for doc in batch} + alice = all_docs["alice"] + assert alice["worksFor"] == "http://example.org/acme" + + def test_triples_to_docs_no_class_filter(self, sample_data_path: Path): + """_triples_to_docs without class filter returns all subjects.""" + from rdflib import Graph + + g = Graph() + g.parse(str(sample_data_path), format="turtle") + + docs = _triples_to_docs(g, rdf_class=None) + assert len(docs) > 0 + # Should contain at least the 5 instance URIs + uris = {d["_uri"] for d in docs} + for expected in ("alice", "bob", "carol", "acme", "globex"): + assert any(expected in u for u in uris) diff --git a/test/data_source/sparql/test_rdf_inference.py b/test/data_source/sparql/test_rdf_inference.py new file mode 100644 index 00000000..e1249f28 --- /dev/null +++ b/test/data_source/sparql/test_rdf_inference.py @@ -0,0 +1,98 @@ +"""Tests for :class:`RdfInferenceManager`.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +rdflib = pytest.importorskip( + "rdflib", reason="rdflib not installed (need graflo[sparql])" +) + +from graflo.hq.rdf_inferencer import RdfInferenceManager # noqa: E402 + + +class TestRdfInferenceManager: + """Unit tests for ontology-based schema inference.""" + + def test_infer_schema_vertices(self, sample_ontology_path: Path): + """Vertices should be inferred from owl:Class declarations.""" + mgr = RdfInferenceManager() + schema = mgr.infer_schema(sample_ontology_path, schema_name="test_rdf") + + vertex_names = {v.name for v in schema.vertex_config.vertices} + assert "Person" in vertex_names + assert "Organization" in vertex_names + + def test_infer_schema_fields(self, sample_ontology_path: Path): + """Datatype properties should become vertex fields.""" + mgr = RdfInferenceManager() + schema = mgr.infer_schema(sample_ontology_path, schema_name="test_rdf") + + person_fields = schema.vertex_config.fields_names("Person") + assert "name" in person_fields + assert "age" in person_fields + assert "_key" in person_fields + assert "_uri" in person_fields + + org_fields = schema.vertex_config.fields_names("Organization") + assert "orgName" in org_fields + assert "founded" in org_fields + + def test_infer_schema_edges(self, sample_ontology_path: Path): + """Object properties should become edges with correct source/target.""" + mgr = RdfInferenceManager() + schema = mgr.infer_schema(sample_ontology_path, schema_name="test_rdf") + + edges = schema.edge_config.edges + edge_tuples = {(e.source, e.target, e.relation) for e in edges} + + assert ("Person", "Organization", "worksFor") in edge_tuples + assert ("Person", "Person", "knows") in edge_tuples + + def test_infer_schema_resources(self, sample_ontology_path: Path): + """One Resource per class should be created.""" + mgr = RdfInferenceManager() + schema = mgr.infer_schema(sample_ontology_path, schema_name="test_rdf") + + resource_names = {r.name for r in schema.resources} + assert "Person" in resource_names + assert "Organization" in resource_names + + def test_infer_schema_name(self, sample_ontology_path: Path): + """Schema should use the provided name.""" + mgr = RdfInferenceManager() + schema = mgr.infer_schema(sample_ontology_path, schema_name="my_ontology") + assert schema.general.name == "my_ontology" + + def test_create_patterns(self, sample_ontology_path: Path): + """Patterns should contain one SparqlPattern per class.""" + mgr = RdfInferenceManager() + patterns = mgr.create_patterns(sample_ontology_path) + + assert "Person" in patterns.sparql_patterns + assert "Organization" in patterns.sparql_patterns + + person_pat = patterns.sparql_patterns["Person"] + assert person_pat.rdf_class == "http://example.org/Person" + assert person_pat.rdf_file is not None + + def test_create_patterns_with_endpoint(self, sample_ontology_path: Path): + """When endpoint_url is given, patterns should reference it.""" + mgr = RdfInferenceManager() + endpoint = "http://localhost:3030/test/sparql" + patterns = mgr.create_patterns(sample_ontology_path, endpoint_url=endpoint) + + for pat in patterns.sparql_patterns.values(): + assert pat.endpoint_url == endpoint + assert pat.rdf_file is None + + def test_infer_from_combined_file(self, sample_data_path: Path): + """Inference should work on a file containing both TBox and ABox.""" + mgr = RdfInferenceManager() + schema = mgr.infer_schema(sample_data_path, schema_name="combined") + + vertex_names = {v.name for v in schema.vertex_config.vertices} + assert "Person" in vertex_names + assert "Organization" in vertex_names diff --git a/test/data_source/sparql/test_sparql_data_source.py b/test/data_source/sparql/test_sparql_data_source.py new file mode 100644 index 00000000..fbd78719 --- /dev/null +++ b/test/data_source/sparql/test_sparql_data_source.py @@ -0,0 +1,174 @@ +"""Integration tests for :class:`SparqlEndpointDataSource` against a running Fuseki instance. + +These tests require a running Fuseki container (see ``docker/fuseki/``). +They are skipped automatically when Fuseki is not reachable. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +rdflib = pytest.importorskip( + "rdflib", reason="rdflib not installed (need graflo[sparql])" +) +SPARQLWrapper = pytest.importorskip( + "SPARQLWrapper", reason="SPARQLWrapper not installed (need graflo[sparql])" +) + +import requests # noqa: E402 + +from graflo.data_source.rdf import SparqlEndpointDataSource, SparqlSourceConfig # noqa: E402 + +DATA_DIR = Path(__file__).parent / "data" +SAMPLE_DATA = DATA_DIR / "sample_data.ttl" + + +def _fuseki_is_reachable(endpoint: str) -> bool: + """Return True if Fuseki responds on the given base URL.""" + try: + base = endpoint.rsplit("/", 2)[0] # strip /dataset/sparql + r = requests.get(f"{base}/$/ping", timeout=3) + return r.status_code == 200 + except Exception: + return False + + +def _ensure_dataset(config) -> None: + """Create the test dataset in Fuseki if it does not exist.""" + base = (config.uri or "").rstrip("/") + dataset = config.dataset or "test" + url = f"{base}/$/datasets" + + r = requests.get(url, auth=(config.username, config.password), timeout=5) + existing = {ds.get("ds.name", "").strip("/") for ds in r.json().get("datasets", [])} + + if dataset not in existing: + requests.post( + url, + data={"dbName": dataset, "dbType": "tdb2"}, + auth=(config.username, config.password), + timeout=10, + ) + + +def _upload_data(config) -> None: + """Upload sample_data.ttl to the Fuseki test dataset.""" + endpoint = config.graph_store_endpoint + with open(SAMPLE_DATA, "rb") as f: + requests.put( + endpoint, + data=f.read(), + headers={"Content-Type": "text/turtle"}, + params={"default": ""}, + auth=(config.username, config.password), + timeout=10, + ) + + +@pytest.fixture(scope="module") +def fuseki_ready(fuseki_config): + """Ensure Fuseki is running, dataset exists, and data is loaded.""" + if not _fuseki_is_reachable(fuseki_config.query_endpoint): + pytest.skip("Fuseki is not reachable") + + _ensure_dataset(fuseki_config) + _upload_data(fuseki_config) + return fuseki_config + + +class TestSparqlEndpointDataSource: + """Integration tests against a live Fuseki endpoint.""" + + def test_query_all(self, fuseki_ready): + """Query all triples from the endpoint.""" + config = SparqlSourceConfig( + endpoint_url=fuseki_ready.query_endpoint, + username=fuseki_ready.username, + password=fuseki_ready.password, + ) + ds = SparqlEndpointDataSource(config=config) + batches = list(ds.iter_batches(batch_size=100)) + + all_docs = [doc for batch in batches for doc in batch] + assert len(all_docs) > 0 + + def test_query_person_class(self, fuseki_ready): + """Query only Person instances.""" + config = SparqlSourceConfig( + endpoint_url=fuseki_ready.query_endpoint, + rdf_class="http://example.org/Person", + username=fuseki_ready.username, + password=fuseki_ready.password, + ) + ds = SparqlEndpointDataSource(config=config) + all_docs = [doc for batch in ds.iter_batches() for doc in batch] + + assert len(all_docs) == 3 + names = {doc.get("name") for doc in all_docs} + assert names == {"Alice", "Bob", "Carol"} + + def test_query_organization_class(self, fuseki_ready): + """Query only Organization instances.""" + config = SparqlSourceConfig( + endpoint_url=fuseki_ready.query_endpoint, + rdf_class="http://example.org/Organization", + username=fuseki_ready.username, + password=fuseki_ready.password, + ) + ds = SparqlEndpointDataSource(config=config) + all_docs = [doc for batch in ds.iter_batches() for doc in batch] + + assert len(all_docs) == 2 + names = {doc.get("orgName") for doc in all_docs} + assert names == {"Acme Corp", "Globex Inc"} + + def test_custom_sparql_query(self, fuseki_ready): + """Use a custom SPARQL query.""" + custom_query = ( + "SELECT ?s ?p ?o WHERE { " + "?s a . " + "?s ?name . " + "FILTER(?name = 'Alice') " + "?s ?p ?o . " + "}" + ) + config = SparqlSourceConfig( + endpoint_url=fuseki_ready.query_endpoint, + sparql_query=custom_query, + username=fuseki_ready.username, + password=fuseki_ready.password, + ) + ds = SparqlEndpointDataSource(config=config) + all_docs = [doc for batch in ds.iter_batches() for doc in batch] + + assert len(all_docs) == 1 + assert all_docs[0]["name"] == "Alice" + + def test_limit(self, fuseki_ready): + """Limit should cap the total results.""" + config = SparqlSourceConfig( + endpoint_url=fuseki_ready.query_endpoint, + rdf_class="http://example.org/Person", + username=fuseki_ready.username, + password=fuseki_ready.password, + ) + ds = SparqlEndpointDataSource(config=config) + all_docs = [doc for batch in ds.iter_batches(limit=1) for doc in batch] + assert len(all_docs) == 1 + + def test_key_extraction(self, fuseki_ready): + """Documents should have _key from URI local name.""" + config = SparqlSourceConfig( + endpoint_url=fuseki_ready.query_endpoint, + rdf_class="http://example.org/Person", + username=fuseki_ready.username, + password=fuseki_ready.password, + ) + ds = SparqlEndpointDataSource(config=config) + all_docs = {doc["_key"]: doc for batch in ds.iter_batches() for doc in batch} + + assert "alice" in all_docs + assert "bob" in all_docs + assert "carol" in all_docs diff --git a/test/db/falkordbs/test_performance.py b/test/db/falkordbs/test_performance.py index 1a41feb3..6ac14b56 100644 --- a/test/db/falkordbs/test_performance.py +++ b/test/db/falkordbs/test_performance.py @@ -155,6 +155,7 @@ def benchmark(operation_name: str, ops: int, func: Callable) -> BenchmarkResult: # ============================================================================= +@pytest.mark.slow class TestThroughput: """Throughput tests - operations per second measurements.""" @@ -268,6 +269,7 @@ def run_query(): # ============================================================================= +@pytest.mark.slow class TestScalability: """Scalability tests - behavior with increasing data volumes.""" @@ -362,6 +364,7 @@ def test_query_scaling_with_data_volume(self, conn_conf, test_graph_name, clean_ # ============================================================================= +@pytest.mark.slow class TestConcurrency: """Concurrency tests - parallel access patterns.""" @@ -465,7 +468,6 @@ def reader(thread_id): assert len(errors) == 0 - @pytest.mark.slow def test_mixed_read_write_load(self, conn_conf, test_graph_name, clean_db): """Mixed concurrent read/write workload.""" _ = clean_db @@ -543,6 +545,7 @@ def reader(): # ============================================================================= +@pytest.mark.slow class TestBatchSizing: """Tests to find optimal batch size.""" @@ -627,10 +630,10 @@ def test_batch_size_memory_impact(self, conn_conf, test_graph_name, clean_db): # ============================================================================= +@pytest.mark.slow class TestSustainedLoad: """Sustained load tests - stability over time.""" - @pytest.mark.slow def test_sustained_write_load(self, conn_conf, test_graph_name, clean_db): """Sustained write load over multiple seconds.""" _ = clean_db @@ -689,7 +692,6 @@ def test_sustained_write_load(self, conn_conf, test_graph_name, clean_db): if degradation > 5.0: print(" WARNING: Unstable latency under sustained load") - @pytest.mark.slow def test_sustained_mixed_load(self, conn_conf, test_graph_name, clean_db): """Sustained mixed load with reads and writes.""" _ = clean_db @@ -768,6 +770,7 @@ def reader(): # ============================================================================= +@pytest.mark.slow class TestLimits: """System limits tests.""" @@ -885,6 +888,7 @@ def connection_test(conn_id): # ============================================================================= +@pytest.mark.slow class TestGraphOperationsLoad: """Graph-specific operation load tests.""" diff --git a/test/db/memgraphs/test_performance.py b/test/db/memgraphs/test_performance.py index 6fe09e0e..5874ed9a 100644 --- a/test/db/memgraphs/test_performance.py +++ b/test/db/memgraphs/test_performance.py @@ -551,6 +551,7 @@ def reader(): class TestBatchSizing: """Tests to find optimal batch size.""" + @pytest.mark.slow def test_optimal_batch_size(self, conn_conf, test_graph_name, clean_db): """Compare different batch sizes for optimal throughput.""" _ = clean_db @@ -595,6 +596,7 @@ def test_optimal_batch_size(self, conn_conf, test_graph_name, clean_db): print(f"\n Optimal batch size: {best_size}") + @pytest.mark.slow def test_batch_size_memory_impact(self, conn_conf, test_graph_name, clean_db): """Measure memory impact of different batch sizes.""" _ = clean_db @@ -805,6 +807,7 @@ def test_max_property_count(self, conn_conf, test_graph_name, clean_db): for count, dur, status in results: print(f" {count:>5} props: {dur:>8.1f} ms - {status}") + @pytest.mark.slow def test_max_batch_size(self, conn_conf, test_graph_name, clean_db): """Find practical maximum batch size.""" _ = clean_db @@ -894,6 +897,7 @@ def connection_test(conn_id): class TestGraphOperationsLoad: """Graph-specific operation load tests.""" + @pytest.mark.slow def test_edge_creation_throughput(self, conn_conf, test_graph_name, clean_db): """Measure edge creation throughput.""" _ = clean_db diff --git a/test/test_patterns.py b/test/test_patterns.py index 2d988255..06af0272 100644 --- a/test/test_patterns.py +++ b/test/test_patterns.py @@ -233,11 +233,12 @@ def test_patterns_with_filtering(): ) patterns.add_table_pattern("events", table_pattern) - # Verify patterns are stored correctly (narrow to FilePattern for .regex) + # Verify patterns are stored correctly (narrow with isinstance checks) users_pattern = patterns.patterns["users"] events_pattern = patterns.patterns["events"] assert isinstance(users_pattern, FilePattern) assert users_pattern.regex == r".*\.csv$" + assert isinstance(events_pattern, TablePattern) assert events_pattern.date_field == "created_at" assert events_pattern.date_filter == "> '2020-10-10'" diff --git a/uv.lock b/uv.lock index de4ee54e..798171e9 100644 --- a/uv.lock +++ b/uv.lock @@ -375,6 +375,10 @@ dependencies = [ plot = [ { name = "pygraphviz" }, ] +sparql = [ + { name = "rdflib" }, + { name = "sparqlwrapper" }, +] [package.dev-dependencies] dev = [ @@ -410,14 +414,16 @@ requires-dist = [ { name = "pygraphviz", marker = "extra == 'plot'", specifier = ">=1.14" }, { name = "pymgclient", specifier = ">=1.3.1" }, { name = "python-arango", specifier = ">=8.1.2,<9" }, + { name = "rdflib", marker = "extra == 'sparql'", specifier = ">=7.0.0" }, { name = "redis", specifier = ">=5.0.0" }, { name = "requests", specifier = ">=2.31.0" }, + { name = "sparqlwrapper", marker = "extra == 'sparql'", specifier = ">=2.0.0" }, { name = "sqlalchemy", specifier = ">=2.0.0" }, { name = "suthing", specifier = ">=0.5.1" }, { name = "urllib3", specifier = ">=2.0.0" }, { name = "xmltodict", specifier = ">=0.14.2,<0.15" }, ] -provides-extras = ["plot"] +provides-extras = ["plot", "sparql"] [package.metadata.requires-dev] dev = [ @@ -1351,6 +1357,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/81/ef2b1dfd1862567d573a4fdbc9f969067621764fbb74338496840a1d2977/pyopenssl-25.3.0-py3-none-any.whl", hash = "sha256:1fda6fc034d5e3d179d39e59c1895c9faeaf40a79de5fc4cbbfbe0d36f4a77b6", size = 57268, upload-time = "2025-09-17T00:32:19.474Z" }, ] +[[package]] +name = "pyparsing" +version = "3.3.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f3/91/9c6ee907786a473bf81c5f53cf703ba0957b23ab84c264080fb5a450416f/pyparsing-3.3.2.tar.gz", hash = "sha256:c777f4d763f140633dcb6d8a3eda953bf7a214dc4eff598413c070bcdc117cbc", size = 6851574, upload-time = "2026-01-21T03:57:59.36Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/bd/c038d7cc38edc1aa5bf91ab8068b63d4308c66c4c8bb3cbba7dfbc049f9c/pyparsing-3.3.2-py3-none-any.whl", hash = "sha256:850ba148bd908d7e2411587e247a1e4f0327839c40e2e5e6d05a007ecc69911d", size = 122781, upload-time = "2026-01-21T03:57:55.912Z" }, +] + [[package]] name = "pytest" version = "8.4.2" @@ -1482,6 +1497,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/11/432f32f8097b03e3cd5fe57e88efb685d964e2e5178a48ed61e841f7fdce/pyyaml_env_tag-1.1-py3-none-any.whl", hash = "sha256:17109e1a528561e32f026364712fee1264bc2ea6715120891174ed1b980d2e04", size = 4722, upload-time = "2025-05-13T15:23:59.629Z" }, ] +[[package]] +name = "rdflib" +version = "7.6.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyparsing" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/98/f5/18bb77b7af9526add0c727a3b2048959847dc5fb030913e2918bf384fec3/rdflib-7.6.0.tar.gz", hash = "sha256:6c831288d5e4a5a7ece85d0ccde9877d512a3d0f02d7c06455d00d6d0ea379df", size = 4943826, upload-time = "2026-02-13T07:15:55.938Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/c2/6604a71269e0c1bd75656d5a001432d16f2cc5b8c057140ec797155c295e/rdflib-7.6.0-py3-none-any.whl", hash = "sha256:30c0a3ebf4c0e09215f066be7246794b6492e054e782d7ac2a34c9f70a15e0dd", size = 615416, upload-time = "2026-02-13T07:15:46.487Z" }, +] + [[package]] name = "redis" version = "7.1.0" @@ -1592,6 +1619,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "sparqlwrapper" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "rdflib" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/cc/453752fffa759ef41a3ceadb3f167e13dae1a74c1db057d9f6a7affa9240/SPARQLWrapper-2.0.0.tar.gz", hash = "sha256:3fed3ebcc77617a4a74d2644b86fd88e0f32e7f7003ac7b2b334c026201731f1", size = 98429, upload-time = "2022-03-13T23:14:00.671Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/89/176e3db96e31e795d7dfd91dd67749d3d1f0316bb30c6931a6140e1a0477/SPARQLWrapper-2.0.0-py3-none-any.whl", hash = "sha256:c99a7204fff676ee28e6acef327dc1ff8451c6f7217dcd8d49e8872f324a8a20", size = 28620, upload-time = "2022-03-13T23:13:58.969Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.45" From 763149d4bde750727f580707bb694586313a496c Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Tue, 17 Feb 2026 02:08:26 +0100 Subject: [PATCH 2/2] update gh action --- .github/workflows/pre-commit.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 4dba3386..79e7091f 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -15,6 +15,6 @@ jobs: with: version: "0.9.28" - name: Install dependencies (including dev) - run: uv sync --group dev + run: uv sync --group dev --extra sparql - name: Run pre-commit run: uv run pre-commit run --all-files