From 1cb20b66f5dcac5a4689c6150a9906d8bdb5ef1a Mon Sep 17 00:00:00 2001 From: Nathan Evans Date: Tue, 2 Sep 2025 16:15:50 -0700 Subject: [PATCH] Input docs API parameter (#2034) * Add optional input_documents to index API * Semver * Add input dataframe example notebook * Format * Fix docs and notebook --- .../minor-20250826235020448734.json | 4 + docs/examples_notebooks/input_documents.ipynb | 194 ++++++++++++++++++ graphrag/api/index.py | 12 +- graphrag/index/run/run_pipeline.py | 13 ++ graphrag/index/typing/pipeline.py | 4 + 5 files changed, 221 insertions(+), 6 deletions(-) create mode 100644 .semversioner/next-release/minor-20250826235020448734.json create mode 100644 docs/examples_notebooks/input_documents.ipynb diff --git a/.semversioner/next-release/minor-20250826235020448734.json b/.semversioner/next-release/minor-20250826235020448734.json new file mode 100644 index 0000000000..eeec0aa0bb --- /dev/null +++ b/.semversioner/next-release/minor-20250826235020448734.json @@ -0,0 +1,4 @@ +{ + "type": "minor", + "description": "Add optional input documents to indexing API." +} diff --git a/docs/examples_notebooks/input_documents.ipynb b/docs/examples_notebooks/input_documents.ipynb new file mode 100644 index 0000000000..1135cb9eb0 --- /dev/null +++ b/docs/examples_notebooks/input_documents.ipynb @@ -0,0 +1,194 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Copyright (c) 2024 Microsoft Corporation.\n", + "# Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example of indexing from an existing in-memory dataframe\n", + "\n", + "Newer versions of GraphRAG let you submit a dataframe directly instead of running through the input processing step. This notebook demonstrates with regular or update runs.\n", + "\n", + "If performing an update, the assumption is that your dataframe contains only the new documents to add to the index." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "from pprint import pprint\n", + "\n", + "import pandas as pd\n", + "\n", + "import graphrag.api as api\n", + "from graphrag.config.load_config import load_config\n", + "from graphrag.index.typing.pipeline_run_result import PipelineRunResult" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "PROJECT_DIRECTORY = \"\"\n", + "UPDATE = False\n", + "FILENAME = \"new_documents.parquet\" if UPDATE else \".parquet\"\n", + "inputs = pd.read_parquet(f\"{PROJECT_DIRECTORY}/input/{FILENAME}\")\n", + "# Only the bare minimum for input. These are the same fields that would be present after the load_input_documents workflow\n", + "inputs = inputs.loc[:, [\"id\", \"title\", \"text\", \"creation_date\"]]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Generate a `GraphRagConfig` object" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "graphrag_config = load_config(Path(PROJECT_DIRECTORY))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Indexing API\n", + "\n", + "*Indexing* is the process of ingesting raw text data and constructing a knowledge graph. GraphRAG currently supports plaintext (`.txt`) and `.csv` file formats." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build an index" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "index_result: list[PipelineRunResult] = await api.build_index(\n", + " config=graphrag_config, input_documents=inputs, is_update_run=UPDATE\n", + ")\n", + "\n", + "# index_result is a list of workflows that make up the indexing pipeline that was run\n", + "for workflow_result in index_result:\n", + " status = f\"error\\n{workflow_result.errors}\" if workflow_result.errors else \"success\"\n", + " print(f\"Workflow Name: {workflow_result.workflow}\\tStatus: {status}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Query an index\n", + "\n", + "To query an index, several index files must first be read into memory and passed to the query API. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "entities = pd.read_parquet(f\"{PROJECT_DIRECTORY}/output/entities.parquet\")\n", + "communities = pd.read_parquet(f\"{PROJECT_DIRECTORY}/output/communities.parquet\")\n", + "community_reports = pd.read_parquet(\n", + " f\"{PROJECT_DIRECTORY}/output/community_reports.parquet\"\n", + ")\n", + "\n", + "response, context = await api.global_search(\n", + " config=graphrag_config,\n", + " entities=entities,\n", + " communities=communities,\n", + " community_reports=community_reports,\n", + " community_level=2,\n", + " dynamic_community_selection=False,\n", + " response_type=\"Multiple Paragraphs\",\n", + " query=\"What are the top five themes of the dataset?\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The response object is the official reponse from graphrag while the context object holds various metadata regarding the querying process used to obtain the final response." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(response)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Digging into the context a bit more provides users with extremely granular information such as what sources of data (down to the level of text chunks) were ultimately retrieved and used as part of the context sent to the LLM model)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pprint(context) # noqa: T203" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "graphrag", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/graphrag/api/index.py b/graphrag/api/index.py index ae3d0de7ee..7265e46187 100644 --- a/graphrag/api/index.py +++ b/graphrag/api/index.py @@ -11,6 +11,8 @@ import logging from typing import Any +import pandas as pd + from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks from graphrag.config.enums import IndexingMethod @@ -18,7 +20,6 @@ from graphrag.index.run.run_pipeline import run_pipeline from graphrag.index.run.utils import create_callback_chain from graphrag.index.typing.pipeline_run_result import PipelineRunResult -from graphrag.index.typing.workflow import WorkflowFunction from graphrag.index.workflows.factory import PipelineFactory from graphrag.logger.standard_logging import init_loggers @@ -33,6 +34,7 @@ async def build_index( callbacks: list[WorkflowCallbacks] | None = None, additional_context: dict[str, Any] | None = None, verbose: bool = False, + input_documents: pd.DataFrame | None = None, ) -> list[PipelineRunResult]: """Run the pipeline with the given configuration. @@ -48,6 +50,8 @@ async def build_index( A list of callbacks to register. additional_context : dict[str, Any] | None default=None Additional context to pass to the pipeline run. This can be accessed in the pipeline state under the 'additional_context' key. + input_documents : pd.DataFrame | None default=None. + Override document loading and parsing and supply your own dataframe of documents to index. Returns ------- @@ -79,6 +83,7 @@ async def build_index( callbacks=workflow_callbacks, is_update_run=is_update_run, additional_context=additional_context, + input_documents=input_documents, ): outputs.append(output) if output.errors and len(output.errors) > 0: @@ -91,11 +96,6 @@ async def build_index( return outputs -def register_workflow_function(name: str, workflow: WorkflowFunction): - """Register a custom workflow function. You can then include the name in the settings.yaml workflows list.""" - PipelineFactory.register(name, workflow) - - def _get_method(method: IndexingMethod | str, is_update_run: bool) -> str: m = method.value if isinstance(method, IndexingMethod) else method return f"{m}-update" if is_update_run else m diff --git a/graphrag/index/run/run_pipeline.py b/graphrag/index/run/run_pipeline.py index d2047058c7..f652db7acd 100644 --- a/graphrag/index/run/run_pipeline.py +++ b/graphrag/index/run/run_pipeline.py @@ -11,6 +11,8 @@ from dataclasses import asdict from typing import Any +import pandas as pd + from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks from graphrag.config.models.graph_rag_config import GraphRagConfig from graphrag.index.run.utils import create_run_context @@ -30,6 +32,7 @@ async def run_pipeline( callbacks: WorkflowCallbacks, is_update_run: bool = False, additional_context: dict[str, Any] | None = None, + input_documents: pd.DataFrame | None = None, ) -> AsyncIterable[PipelineRunResult]: """Run all workflows using a simplified pipeline.""" root_dir = config.root_dir @@ -60,6 +63,11 @@ async def run_pipeline( state["update_timestamp"] = update_timestamp + # if the user passes in a df directly, write directly to storage so we can skip finding/parsing later + if input_documents is not None: + await write_table_to_storage(input_documents, "documents", delta_storage) + pipeline.remove("load_update_documents") + context = create_run_context( input_storage=input_storage, output_storage=delta_storage, @@ -72,6 +80,11 @@ async def run_pipeline( else: logger.info("Running standard indexing.") + # if the user passes in a df directly, write directly to storage so we can skip finding/parsing later + if input_documents is not None: + await write_table_to_storage(input_documents, "documents", output_storage) + pipeline.remove("load_input_documents") + context = create_run_context( input_storage=input_storage, output_storage=output_storage, diff --git a/graphrag/index/typing/pipeline.py b/graphrag/index/typing/pipeline.py index 0d7b2b1dd5..4e755ba61d 100644 --- a/graphrag/index/typing/pipeline.py +++ b/graphrag/index/typing/pipeline.py @@ -21,3 +21,7 @@ def run(self) -> Generator[Workflow]: def names(self) -> list[str]: """Return the names of the workflows in the pipeline.""" return [name for name, _ in self.workflows] + + def remove(self, name: str) -> None: + """Remove a workflow from the pipeline by name.""" + self.workflows = [w for w in self.workflows if w[0] != name]