diff --git a/.github/workflows/deepeval-tests.yml b/.github/workflows/deepeval-tests.yml index 5da84df..74d2a59 100644 --- a/.github/workflows/deepeval-tests.yml +++ b/.github/workflows/deepeval-tests.yml @@ -6,49 +6,204 @@ on: paths: - 'src/**' - 'tests/**' + - 'data/**' + - 'docker-compose-test.yml' + - 'Dockerfile.llm_orchestration_service' - '.github/workflows/deepeval-tests.yml' jobs: deepeval-tests: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 60 steps: - name: Checkout code uses: actions/checkout@v4 - + + - name: Validate required secrets + id: validate_secrets + run: | + echo "Validating required environment variables..." + MISSING_SECRETS=() + + # Check Azure OpenAI secrets + if [ -z "${{ secrets.AZURE_OPENAI_ENDPOINT }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_ENDPOINT") + fi + + if [ -z "${{ secrets.AZURE_OPENAI_API_KEY }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_API_KEY") + fi + + if [ -z "${{ secrets.AZURE_OPENAI_DEPLOYMENT }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_DEPLOYMENT") + fi + + if [ -z "${{ secrets.AZURE_OPENAI_EMBEDDING_DEPLOYMENT }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") + fi + + # Check other LLM API keys + if [ -z "${{ secrets.ANTHROPIC_API_KEY }}" ]; then + MISSING_SECRETS+=("ANTHROPIC_API_KEY") + fi + + if [ -z "${{ secrets.OPENAI_API_KEY }}" ]; then + MISSING_SECRETS+=("OPENAI_API_KEY") + fi + + # If any secrets are missing, fail + if [ ${#MISSING_SECRETS[@]} -gt 0 ]; then + echo "missing=true" >> $GITHUB_OUTPUT + echo "secrets_list=${MISSING_SECRETS[*]}" >> $GITHUB_OUTPUT + echo " Missing required secrets: ${MISSING_SECRETS[*]}" + exit 1 + else + echo "missing=false" >> $GITHUB_OUTPUT + echo " All required secrets are configured" + fi + + - name: Comment PR with missing secrets error + if: failure() && steps.validate_secrets.outputs.missing == 'true' + uses: actions/github-script@v7 + with: + script: | + const missingSecrets = '${{ steps.validate_secrets.outputs.secrets_list }}'.split(' '); + const secretsList = missingSecrets.map(s => `- \`${s}\``).join('\n'); + + const comment = `## DeepEval Tests: Missing Required Secrets + + The DeepEval RAG system tests cannot run because the following GitHub secrets are not configured: + + ${secretsList} + + ### How to Fix + + 1. Go to **Settings** → **Secrets and variables** → **Actions** + 2. Add the missing secrets with the appropriate values: + + **Azure OpenAI Configuration:** + - \`AZURE_OPENAI_ENDPOINT\` - Your Azure OpenAI resource endpoint (e.g., \`https://your-resource.openai.azure.com/\`) + - \`AZURE_OPENAI_API_KEY\` - Your Azure OpenAI API key + - \`AZURE_OPENAI_DEPLOYMENT\` - Chat model deployment name (e.g., \`gpt-4o-mini\`) + - \`AZURE_OPENAI_EMBEDDING_DEPLOYMENT\` - Embedding model deployment name (e.g., \`text-embedding-3-large\`) + + **Additional LLM APIs:** + - \`ANTHROPIC_API_KEY\` - Anthropic API key (for guardrails) + - \`OPENAI_API_KEY\` - OpenAI API key (optional fallback) + + 3. Re-run the workflow after adding the secrets + + ### Note + Tests will not run until all required secrets are configured. + + --- + *Workflow: ${context.workflow} | Run: [#${context.runNumber}](${context.payload.repository.html_url}/actions/runs/${context.runId})*`; + + // Find existing comment + const comments = await github.rest.issues.listComments({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number + }); + + const existingComment = comments.data.find( + comment => comment.user.login === 'github-actions[bot]' && + comment.body.includes('DeepEval Tests: Missing Required Secrets') + ); + + if (existingComment) { + await github.rest.issues.updateComment({ + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: existingComment.id, + body: comment + }); + } else { + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number, + body: comment + }); + } + - name: Set up Python + if: success() uses: actions/setup-python@v5 with: python-version-file: '.python-version' - + - name: Set up uv + if: success() uses: astral-sh/setup-uv@v6 - + - name: Install dependencies (locked) + if: success() run: uv sync --frozen - - - name: Run DeepEval tests + + - name: Create test directories with proper permissions + if: success() + run: | + mkdir -p test-vault/agents/llm + mkdir -p test-vault/agent-out + # Set ownership to current user and make writable + sudo chown -R $(id -u):$(id -g) test-vault + chmod -R 777 test-vault + # Ensure the agent-out directory is world-readable after writes + sudo chmod -R a+rwX test-vault/agent-out + + - name: Build Docker images + if: success() + run: docker compose -f docker-compose-test.yml build + + - name: Run DeepEval tests with testcontainers + if: success() id: run_tests env: + # LLM API Keys ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} - run: uv run python -m pytest tests/deepeval_tests/standard_tests.py -v --tb=short - + # Azure OpenAI - Chat Model + AZURE_OPENAI_API_KEY: ${{ secrets.AZURE_OPENAI_API_KEY }} + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_DEPLOYMENT: ${{ secrets.AZURE_OPENAI_DEPLOYMENT }} + # Azure OpenAI - Embedding Model + AZURE_OPENAI_EMBEDDING_DEPLOYMENT: ${{ secrets.AZURE_OPENAI_EMBEDDING_DEPLOYMENT }} + # Testing mode + TESTING_MODE: "true" + run: | + # Run tests with testcontainers managing Docker Compose + uv run python -m pytest tests/deepeval_tests/standard_tests.py -v --tb=short --log-cli-level=INFO + + - name: Fix permissions on test artifacts + if: always() + run: | + sudo chown -R $(id -u):$(id -g) test-vault || true + sudo chmod -R a+rX test-vault || true + - name: Generate evaluation report if: always() - run: python tests/deepeval_tests/report_generator.py - + run: uv run python tests/deepeval_tests/report_generator.py + + - name: Save test artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-results + path: | + pytest_captured_results.json + test_report.md + retention-days: 30 + - name: Comment PR with test results if: always() && github.event_name == 'pull_request' uses: actions/github-script@v7 with: script: | const fs = require('fs'); - try { const reportContent = fs.readFileSync('test_report.md', 'utf8'); - const comments = await github.rest.issues.listComments({ owner: context.repo.owner, repo: context.repo.repo, @@ -57,7 +212,7 @@ jobs: const existingComment = comments.data.find( comment => comment.user.login === 'github-actions[bot]' && - comment.body.includes('RAG System Evaluation Report') + comment.body.includes('RAG System Evaluation Report') ); if (existingComment) { @@ -75,10 +230,8 @@ jobs: body: reportContent }); } - } catch (error) { console.error('Failed to post test results:', error); - await github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, @@ -86,25 +239,26 @@ jobs: body: `## RAG System Evaluation Report\n\n**Error generating test report**\n\nFailed to read or post test results. Check workflow logs for details.\n\nError: ${error.message}` }); } - + - name: Check test results and fail if needed if: always() run: | - # Check if pytest ran (look at step output) - if [ "${{ steps.run_tests.outcome }}" == "failure" ]; then + # Check if pytest ran (look at step output) + if [ "${{ steps.run_tests.outcome }}" == "failure" ]; then echo "Tests ran but failed - this is expected if RAG performance is below threshold" - fi - if [ -f "pytest_captured_results.json" ]; then + fi + + if [ -f "pytest_captured_results.json" ]; then total_tests=$(jq '.total_tests // 0' pytest_captured_results.json) passed_tests=$(jq '.passed_tests // 0' pytest_captured_results.json) - + if [ "$total_tests" -eq 0 ]; then echo "ERROR: No tests were executed" exit 1 fi - + pass_rate=$(awk "BEGIN {print ($passed_tests / $total_tests) * 100}") - + echo "DeepEval Test Results:" echo "Total Tests: $total_tests" echo "Passed Tests: $passed_tests" @@ -117,7 +271,13 @@ jobs: else echo "TEST SUCCESS: Pass rate $pass_rate% meets threshold 70%" fi - else + else echo "ERROR: No test results file found" exit 1 - fi \ No newline at end of file + fi + + - name: Cleanup Docker resources + if: always() + run: | + docker compose -f docker-compose-test.yml down -v --remove-orphans || true + docker system prune -f || true \ No newline at end of file diff --git a/.github/workflows/deepteam-red-team-tests.yml b/.github/workflows/deepteam-red-team-tests.yml index ba0861b..f925024 100644 --- a/.github/workflows/deepteam-red-team-tests.yml +++ b/.github/workflows/deepteam-red-team-tests.yml @@ -6,8 +6,9 @@ on: paths: - 'src/**' - 'tests/**' - - 'mocks/**' - 'data/**' + - 'docker-compose-test.yml' + - 'Dockerfile.llm_orchestration_service' - '.github/workflows/deepeval-red-team-tests.yml' workflow_dispatch: inputs: @@ -24,32 +25,174 @@ on: jobs: security-assessment: runs-on: ubuntu-latest - timeout-minutes: 60 + timeout-minutes: 90 steps: - name: Checkout code uses: actions/checkout@v4 + - name: Validate required secrets + id: validate_secrets + run: | + echo "Validating required environment variables..." + MISSING_SECRETS=() + + # Check Azure OpenAI secrets + if [ -z "${{ secrets.AZURE_OPENAI_ENDPOINT }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_ENDPOINT") + fi + + if [ -z "${{ secrets.AZURE_OPENAI_API_KEY }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_API_KEY") + fi + + if [ -z "${{ secrets.AZURE_OPENAI_DEPLOYMENT }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_DEPLOYMENT") + fi + + if [ -z "${{ secrets.AZURE_OPENAI_EMBEDDING_DEPLOYMENT }}" ]; then + MISSING_SECRETS+=("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") + fi + + # Check other LLM API keys + if [ -z "${{ secrets.ANTHROPIC_API_KEY }}" ]; then + MISSING_SECRETS+=("ANTHROPIC_API_KEY") + fi + + if [ -z "${{ secrets.OPENAI_API_KEY }}" ]; then + MISSING_SECRETS+=("OPENAI_API_KEY") + fi + + # If any secrets are missing, fail + if [ ${#MISSING_SECRETS[@]} -gt 0 ]; then + echo "missing=true" >> $GITHUB_OUTPUT + echo "secrets_list=${MISSING_SECRETS[*]}" >> $GITHUB_OUTPUT + echo " Missing required secrets: ${MISSING_SECRETS[*]}" + exit 1 + else + echo "missing=false" >> $GITHUB_OUTPUT + echo " All required secrets are configured" + fi + + - name: Comment PR with missing secrets error + if: failure() && steps.validate_secrets.outputs.missing == 'true' && github.event_name == 'pull_request' + uses: actions/github-script@v7 + with: + script: | + const missingSecrets = '${{ steps.validate_secrets.outputs.secrets_list }}'.split(' '); + const secretsList = missingSecrets.map(s => `- \`${s}\``).join('\n'); + + const comment = `## Red Team Security Tests: Missing Required Secrets + + The Red Team security assessment cannot run because the following GitHub secrets are not configured: + + ${secretsList} + + ### How to Fix + + 1. Go to **Settings** → **Secrets and variables** → **Actions** + 2. Add the missing secrets with the appropriate values: + + **Azure OpenAI Configuration:** + - \`AZURE_OPENAI_ENDPOINT\` - Your Azure OpenAI resource endpoint (e.g., \`https://your-resource.openai.azure.com/\`) + - \`AZURE_OPENAI_API_KEY\` - Your Azure OpenAI API key + - \`AZURE_OPENAI_DEPLOYMENT\` - Chat model deployment name (e.g., \`gpt-4o-mini\`) + - \`AZURE_OPENAI_EMBEDDING_DEPLOYMENT\` - Embedding model deployment name (e.g., \`text-embedding-3-large\`) + + **Additional LLM APIs:** + - \`ANTHROPIC_API_KEY\` - Anthropic API key (for guardrails) + - \`OPENAI_API_KEY\` - OpenAI API key (optional fallback) + + 3. Re-run the workflow after adding the secrets + + ### Security Note + Without proper API credentials, we cannot assess the system's security posture against adversarial attacks. + + --- + *Workflow: ${context.workflow} | Run: [#${context.runNumber}](${context.payload.repository.html_url}/actions/runs/${context.runId})*`; + + // Find existing comment + const comments = await github.rest.issues.listComments({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number + }); + + const existingComment = comments.data.find( + comment => comment.user.login === 'github-actions[bot]' && + comment.body.includes('Red Team Security Tests: Missing Required Secrets') + ); + + if (existingComment) { + await github.rest.issues.updateComment({ + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: existingComment.id, + body: comment + }); + } else { + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: context.issue.number, + body: comment + }); + } + - name: Set up Python + if: success() uses: actions/setup-python@v5 with: python-version-file: '.python-version' - name: Set up uv + if: success() uses: astral-sh/setup-uv@v6 - name: Install dependencies (locked) + if: success() run: uv sync --frozen - - name: Run Complete Security Assessment + - name: Create test directories with proper permissions + if: success() + run: | + mkdir -p test-vault/agents/llm + mkdir -p test-vault/agent-out + # Set ownership to current user and make writable + sudo chown -R $(id -u):$(id -g) test-vault + chmod -R 777 test-vault + # Ensure the agent-out directory is world-readable after writes + sudo chmod -R a+rwX test-vault/agent-out + + - name: Build Docker images + if: success() + run: docker compose -f docker-compose-test.yml build + + - name: Run Red Team Security Tests with testcontainers + if: success() id: run_tests continue-on-error: true env: + # LLM API Keys ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + # Azure OpenAI - Chat Model + AZURE_OPENAI_API_KEY: ${{ secrets.AZURE_OPENAI_API_KEY }} + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_DEPLOYMENT: ${{ secrets.AZURE_OPENAI_DEPLOYMENT }} + # Azure OpenAI - Embedding Model + AZURE_OPENAI_EMBEDDING_DEPLOYMENT: ${{ secrets.AZURE_OPENAI_EMBEDDING_DEPLOYMENT }} + # Testing mode + TESTING_MODE: "true" + run: | + # Run tests with testcontainers managing Docker Compose + uv run python -m pytest tests/deepeval_tests/red_team_tests.py::TestRAGSystemRedTeaming -v --tb=short --log-cli-level=INFO + + - name: Fix permissions on test artifacts + if: always() run: | - # Run all security tests in one comprehensive session - uv run python -m pytest tests/deepeval_tests/red_team_tests.py::TestRAGSystemRedTeaming -v --tb=short + sudo chown -R $(id -u):$(id -g) test-vault || true + sudo chmod -R a+rX test-vault || true - name: Generate Security Report if: always() @@ -58,6 +201,16 @@ jobs: uv run python tests/deepeval_tests/red_team_report_generator.py || true fi + - name: Save test artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: security-test-results + path: | + pytest_captured_results.json + security_report.md + retention-days: 30 + - name: Comment PR with Security Results if: always() && github.event_name == 'pull_request' uses: actions/github-script@v7 @@ -164,4 +317,10 @@ jobs: else echo "ERROR: No test results file found" exit 1 - fi \ No newline at end of file + fi + + - name: Cleanup Docker resources + if: always() + run: | + docker compose -f docker-compose-test.yml down -v --remove-orphans || true + docker system prune -f || true \ No newline at end of file diff --git a/.gitleaks.toml b/.gitleaks.toml new file mode 100644 index 0000000..87311e3 --- /dev/null +++ b/.gitleaks.toml @@ -0,0 +1,4 @@ +[allowlist] +paths = [ + '''docker-compose-test\.yml''' +] \ No newline at end of file diff --git a/docker-compose-test.yml b/docker-compose-test.yml new file mode 100644 index 0000000..0dc3524 --- /dev/null +++ b/docker-compose-test.yml @@ -0,0 +1,291 @@ +services: + # === Core Infrastructure === + + # Shared PostgreSQL database (used by both application and Langfuse) + rag_search_db: + image: postgres:14.1 + container_name: rag_search_db + restart: always + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: dbadmin + POSTGRES_DB: rag-search + volumes: + - test_rag_search_db:/var/lib/postgresql/data + ports: + - "5436:5432" + networks: + - test-network + + # Vector database for RAG + qdrant: + image: qdrant/qdrant:v1.15.1 + container_name: qdrant + restart: always + ports: + - "6333:6333" + - "6334:6334" + volumes: + - test_qdrant_data:/qdrant/storage + networks: + - test-network + + # === Secret Management === + + # Vault - Secret management (dev mode) + vault: + image: hashicorp/vault:1.20.3 + container_name: vault + cap_add: + - IPC_LOCK + ports: + - "8200:8200" + environment: + VAULT_DEV_ROOT_TOKEN_ID: root + VAULT_ADDR: http://0.0.0.0:8200 + VAULT_API_ADDR: http://0.0.0.0:8200 + command: server -dev -dev-listen-address=0.0.0.0:8200 + networks: + - test-network + + # Vault Agent - Automatic token management via AppRole + vault-agent-llm: + image: hashicorp/vault:1.20.3 + container_name: vault-agent-llm + depends_on: + - vault + volumes: + - ./test-vault/agents/llm:/agent/in + - ./test-vault/agent-out:/agent/out + entrypoint: ["sh", "-c"] + command: + - | + # Wait for Vault to be ready + sleep 5 + echo "Waiting for AppRole credentials..." + while [ ! -f /agent/in/role_id ] || [ ! -s /agent/in/role_id ]; do + sleep 1 + done + while [ ! -f /agent/in/secret_id ] || [ ! -s /agent/in/secret_id ]; do + sleep 1 + done + echo "Credentials found, starting Vault Agent..." + exec vault agent -config=/agent/in/agent.hcl -log-level=debug + networks: + - test-network + + # === Langfuse Observability Stack === + + # Redis - Queue and cache for Langfuse + redis: + image: redis:7 + container_name: redis + restart: always + command: --requirepass myredissecret + ports: + - "127.0.0.1:6379:6379" + networks: + - test-network + + # MinIO - S3-compatible storage for Langfuse + minio: + image: minio/minio:latest + container_name: minio + restart: always + entrypoint: sh + command: -c "mkdir -p /data/langfuse && minio server /data --address ':9000' --console-address ':9001'" + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: miniosecret + ports: + - "9090:9000" + - "127.0.0.1:9091:9001" + volumes: + - test_minio_data:/data + networks: + - test-network + + # ClickHouse - Analytics database for Langfuse (REQUIRED in v3) + clickhouse: + image: clickhouse/clickhouse-server:24.3 + container_name: clickhouse + restart: always + environment: + CLICKHOUSE_DB: default + CLICKHOUSE_USER: default + CLICKHOUSE_PASSWORD: clickhouse + volumes: + - test_clickhouse_data:/var/lib/clickhouse + ports: + - "127.0.0.1:8123:8123" + - "127.0.0.1:9000:9000" + networks: + - test-network + ulimits: + nofile: + soft: 262144 + hard: 262144 + + # Langfuse Worker - Background job processor + langfuse-worker: + image: langfuse/langfuse-worker:3 + container_name: langfuse-worker + restart: always + depends_on: + - rag_search_db + - minio + - redis + - clickhouse + ports: + - "127.0.0.1:3030:3030" + environment: + # Database + DATABASE_URL: postgresql://postgres:dbadmin@rag_search_db:5432/rag-search + + # Auth & Security (TEST VALUES ONLY - NOT FOR PRODUCTION) + # gitleaks:allow - These are test-only hex strings + NEXTAUTH_URL: http://localhost:3000 + SALT: ef9d6c6f8b4a5e2c1d3f7a9b8c5e4d2a1f6b8c9d4e5f7a8b1c2d3e4f5a6b7c8d + ENCRYPTION_KEY: 1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b + + # Features + TELEMETRY_ENABLED: "false" + LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: "false" + + # ClickHouse (REQUIRED for Langfuse v3) + CLICKHOUSE_MIGRATION_URL: clickhouse://clickhouse:9000/default + CLICKHOUSE_URL: http://clickhouse:8123 + CLICKHOUSE_USER: default + CLICKHOUSE_PASSWORD: clickhouse + CLICKHOUSE_CLUSTER_ENABLED: "false" + + # S3/MinIO Event Upload + LANGFUSE_S3_EVENT_UPLOAD_BUCKET: langfuse + LANGFUSE_S3_EVENT_UPLOAD_REGION: us-east-1 + LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: minio + LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: miniosecret + LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: http://minio:9000 + LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: "true" + + # S3/MinIO Media Upload + LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: langfuse + LANGFUSE_S3_MEDIA_UPLOAD_REGION: us-east-1 + LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: minio + LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: miniosecret + LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: http://minio:9000 + LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: "true" + + # Redis + REDIS_HOST: redis + REDIS_PORT: "6379" + REDIS_AUTH: myredissecret + networks: + - test-network + + # Langfuse Web - UI and API + langfuse-web: + image: langfuse/langfuse:3 + container_name: langfuse-web + restart: always + depends_on: + - langfuse-worker + - rag_search_db + - clickhouse + ports: + - "3000:3000" + environment: + # Database + DATABASE_URL: postgresql://postgres:dbadmin@rag_search_db:5432/rag-search + + # Auth & Security (TEST VALUES ONLY - NOT FOR PRODUCTION) + # gitleaks:allow - These are test-only hex strings + NEXTAUTH_URL: http://localhost:3000 + NEXTAUTH_SECRET: 9f8e7d6c5b4a3f2e1d0c9b8a7f6e5d4c3b2a1f0e9d8c7b6a5f4e3d2c1b0a9f8e + SALT: ef9d6c6f8b4a5e2c1d3f7a9b8c5e4d2a1f6b8c9d4e5f7a8b1c2d3e4f5a6b7c8d + ENCRYPTION_KEY: 1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b + + # Features + TELEMETRY_ENABLED: "false" + LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: "false" + + # ClickHouse (REQUIRED for Langfuse v3) + CLICKHOUSE_MIGRATION_URL: clickhouse://clickhouse:9000/default + CLICKHOUSE_URL: http://clickhouse:8123 + CLICKHOUSE_USER: default + CLICKHOUSE_PASSWORD: clickhouse + CLICKHOUSE_CLUSTER_ENABLED: "false" + + # S3/MinIO Event Upload + LANGFUSE_S3_EVENT_UPLOAD_BUCKET: langfuse + LANGFUSE_S3_EVENT_UPLOAD_REGION: us-east-1 + LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: minio + LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: miniosecret + LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: http://minio:9000 + LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: "true" + + # S3/MinIO Media Upload + LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: langfuse + LANGFUSE_S3_MEDIA_UPLOAD_REGION: us-east-1 + LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: minio + LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: miniosecret + LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: http://minio:9000 + LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: "true" + + # Redis + REDIS_HOST: redis + REDIS_PORT: "6379" + REDIS_AUTH: myredissecret + + # Initialize test project with known credentials + LANGFUSE_INIT_PROJECT_PUBLIC_KEY: pk-lf-test + LANGFUSE_INIT_PROJECT_SECRET_KEY: sk-lf-test + networks: + - test-network + + # === LLM Orchestration Service === + + llm-orchestration-service: + build: + context: . + dockerfile: Dockerfile.llm_orchestration_service + container_name: llm-orchestration-service + restart: always + ports: + - "8100:8100" + environment: + - ENVIRONMENT=test + - VAULT_ADDR=http://vault:8200 + - VAULT_TOKEN_FILE=/agent/out/token + - QDRANT_URL=http://qdrant:6333 + - TESTING_MODE=true + volumes: + - ./src/llm_config_module/config:/app/src/llm_config_module/config:ro + - ./test-vault/agent-out:/agent/out:ro + - test_llm_orchestration_logs:/app/logs + depends_on: + - qdrant + - langfuse-web + - vault-agent-llm + networks: + - test-network + +# === Networks === + +networks: + test-network: + name: test-network + driver: bridge + +# === Volumes === + +volumes: + test_rag_search_db: + name: test_rag_search_db + test_qdrant_data: + name: test_qdrant_data + test_minio_data: + name: test_minio_data + test_clickhouse_data: + name: test_clickhouse_data + test_llm_orchestration_logs: + name: test_llm_orchestration_logs \ No newline at end of file diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py index b5d5f7d..ed4ddc8 100644 --- a/src/llm_orchestration_service.py +++ b/src/llm_orchestration_service.py @@ -993,6 +993,7 @@ def _generate_rag_response( No secondary LLM paths; no citations appended. """ logger.info("Starting RAG response generation") + testing_mode = os.getenv("TESTING_MODE", "false").lower() == "true" if costs_dict is None: costs_dict = {} @@ -1067,12 +1068,23 @@ def _generate_rag_response( }, output=answer, ) + retrieval_context: List[Dict[str, Any]] | None = None + if testing_mode and relevant_chunks: + retrieval_context = [ + { + "content": chunk.get("content", ""), + "score": chunk.get("score", 0.0), + "metadata": chunk.get("meta", {}), + } + for chunk in relevant_chunks + ] if question_out_of_scope: logger.info("Question determined out-of-scope – sending fixed message.") if request.environment == "test": logger.info( "Test environment detected – returning out-of-scope message." ) + return TestOrchestrationResponse( llmServiceActive=True, # service OK; insufficient context questionOutOfLLMScope=True, @@ -1080,13 +1092,17 @@ def _generate_rag_response( content=OUT_OF_SCOPE_MESSAGE, ) else: - return OrchestrationResponse( + response = OrchestrationResponse( chatId=request.chatId, llmServiceActive=True, # service OK; insufficient context questionOutOfLLMScope=True, inputGuardFailed=False, content=OUT_OF_SCOPE_MESSAGE, ) + if testing_mode: + response.retrieval_context = retrieval_context + response.refined_questions = refined_output.refined_questions + return response # In-scope: return the answer as-is (NO citations) logger.info("Returning in-scope answer without citations.") @@ -1099,13 +1115,17 @@ def _generate_rag_response( content=answer, ) else: - return OrchestrationResponse( + response = OrchestrationResponse( chatId=request.chatId, llmServiceActive=True, questionOutOfLLMScope=False, inputGuardFailed=False, content=answer, ) + if testing_mode: + response.retrieval_context = retrieval_context + response.refined_questions = refined_output.refined_questions + return response except Exception as e: logger.error(f"RAG Response generation failed: {str(e)}") diff --git a/src/llm_orchestration_service_api.py b/src/llm_orchestration_service_api.py index af7bc46..9852236 100644 --- a/src/llm_orchestration_service_api.py +++ b/src/llm_orchestration_service_api.py @@ -1,5 +1,6 @@ """LLM Orchestration Service API - FastAPI application.""" +import os from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Dict @@ -18,6 +19,7 @@ ContextGenerationRequest, ContextGenerationResponse, EmbeddingErrorResponse, + DeepEvalTestOrchestrationResponse, ) @@ -300,6 +302,81 @@ async def get_available_embedding_models( raise HTTPException(status_code=500, detail=str(e)) +@app.post("orchestrate-test") +def orchestrate_llm_request_test( + http_request: Request, + request: OrchestrationRequest, +) -> DeepEvalTestOrchestrationResponse: + """ + Process LLM orchestration request with additional testing data. + + This endpoint is only available when TESTING_MODE=true and returns + retrieval context and refined questions for DeepEval metrics evaluation. + + Args: + http_request: FastAPI Request object for accessing app state + request: OrchestrationRequest containing user message and context + + Returns: + DeepEvalTestOrchestrationResponse: Response with LLM output, status flags, and test data + + Raises: + HTTPException: For processing errors or if not in testing mode + """ + # Check if testing mode is enabled + testing_mode = os.getenv("TESTING_MODE", "false").lower() == "true" + if not testing_mode: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Testing endpoint not available in production mode", + ) + + try: + logger.info(f"Received TEST orchestration request for chatId: {request.chatId}") + + if not hasattr(http_request.app.state, "orchestration_service"): + logger.error("Orchestration service not found in app state") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Service not initialized", + ) + + orchestration_service = http_request.app.state.orchestration_service + if orchestration_service is None: + logger.error("Orchestration service is None") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Service not initialized", + ) + + # Process the request (will include test data due to TESTING_MODE env var) + response = orchestration_service.process_orchestration_request(request) + + # Convert to test response with additional fields + test_response = DeepEvalTestOrchestrationResponse( + chatId=response.chatId, + llmServiceActive=response.llmServiceActive, + questionOutOfLLMScope=response.questionOutOfLLMScope, + inputGuardFailed=response.inputGuardFailed, + content=response.content, + retrieval_context=response.retrieval_context, + refined_questions=response.refined_questions, + expected_output=None, # Will be populated by test framework + ) + + logger.info(f"Successfully processed TEST request for chatId: {request.chatId}") + return test_response + + except HTTPException: + raise + except Exception as e: + logger.error(f"Unexpected error processing TEST request: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Internal server error occurred", + ) + + if __name__ == "__main__": logger.info("Starting LLM Orchestration Service API server on port 8100") uvicorn.run( diff --git a/src/models/request_models.py b/src/models/request_models.py index 956b9c5..523d0c9 100644 --- a/src/models/request_models.py +++ b/src/models/request_models.py @@ -53,6 +53,11 @@ class OrchestrationResponse(BaseModel): ..., description="Whether input guard validation failed" ) content: str = Field(..., description="Response content with citations") + # Testing-only fields (populated when TESTING_MODE=true) + retrieval_context: Optional[List[Dict[str, Any]]] = Field( + default=None, exclude=True + ) + refined_questions: Optional[List[str]] = Field(default=None, exclude=True) # New models for embedding and context generation @@ -157,3 +162,16 @@ class TestOrchestrationResponse(BaseModel): ..., description="Whether input guard validation failed" ) content: str = Field(..., description="Response content with citations") + + +class DeepEvalTestOrchestrationResponse(BaseModel): + """Extended response model for testing with additional evaluation data.""" + + chatId: str + llmServiceActive: bool + questionOutOfLLMScope: bool + inputGuardFailed: bool + content: str + retrieval_context: Optional[List[Dict[str, Any]]] = None + refined_questions: Optional[List[str]] = None + expected_output: Optional[str] = None # For DeepEval diff --git a/src/vector_indexer/config/vector_indexer_config.test.yaml b/src/vector_indexer/config/vector_indexer_config.test.yaml new file mode 100644 index 0000000..d063c73 --- /dev/null +++ b/src/vector_indexer/config/vector_indexer_config.test.yaml @@ -0,0 +1,67 @@ +# Vector Indexer Configuration - TEST ENVIRONMENT +vector_indexer: + # API Configuration + api: + base_url: "http://llm-orchestration-service:8100" + qdrant_url: "http://qdrant:6333" + timeout: 300 + + # Processing Configuration - TEST VALUES + processing: + environment: "test" # ← TEST ENVIRONMENT + connection_id: "evalconnection-1" # ← TEST CONNECTION + batch_delay_seconds: 0.1 + context_delay_seconds: 0.05 + + # Chunking Configuration + chunking: + chunk_size: 800 + chunk_overlap: 100 + min_chunk_size: 50 + max_chunk_size: 2000 + chars_per_token: 4 + tokenizer_encoding: "cl100k_base" + + # Concurrency Configuration + concurrency: + max_concurrent_documents: 3 + max_concurrent_chunks_per_doc: 5 + + # Batch Configuration + batching: + embedding_batch_size: 10 + context_batch_size: 5 + + # Error Handling + error_handling: + max_retries: 3 + retry_delay_base: 2 + continue_on_failure: true + log_failures: true + + # Logging Configuration + logging: + level: "DEBUG" + failure_log_file: "logs/test_vector_indexer_failures.jsonl" + processing_log_file: "logs/test_vector_indexer_processing.log" + stats_log_file: "logs/test_vector_indexer_stats.json" + + # Dataset Configuration + dataset: + base_path: "datasets" + metadata_file: "source.meta.json" + target_file: "cleaned.txt" + + # Document Loader Configuration + document_loader: + target_file: "cleaned.txt" + metadata_file: "source.meta.json" + min_content_length: 10 + max_content_length: 10000000 + encoding: "utf-8" + required_metadata_fields: + - "source_url" + enable_content_caching: false + max_scan_depth: 5 + min_file_size_bytes: 1 + max_file_size_bytes: 50000000 \ No newline at end of file diff --git a/src/vector_indexer/config/vector_indexer_config.yaml b/src/vector_indexer/config/vector_indexer_config.yaml index 6a7d583..76f1192 100644 --- a/src/vector_indexer/config/vector_indexer_config.yaml +++ b/src/vector_indexer/config/vector_indexer_config.yaml @@ -5,105 +5,90 @@ vector_indexer: base_url: "http://llm-orchestration-service:8100" qdrant_url: "http://qdrant:6333" timeout: 300 # seconds - - # Environment Configuration + + # Processing Configuration (MERGED - only one processing section) processing: - environment: "production" # Default: production - connection_id: null # For dev/test environments - + # Environment settings + environment: "production" # Change to "test" for testing + connection_id: null # Set to "evalconnection-1" for testing + + # Processing timing + batch_delay_seconds: 0.1 + context_delay_seconds: 0.05 + + # Provider Detection + provider_detection_patterns: + openai: ['\bGPT\b', '\bOpenAI\b', '\btext-embedding\b', '\bada\b'] + aws_bedrock: ['\btitan\b', '\bamazon\b', '\bbedrock\b'] + azure_openai: ['\bazure\b', '\btext-embedding-3\b', '\bada-002\b'] + # Chunking Configuration chunking: chunk_size: 800 # tokens chunk_overlap: 100 # tokens + min_chunk_size: 50 + max_chunk_size: 2000 + chars_per_token: 4 + tokenizer_encoding: "cl100k_base" - # Additional chunking parameters - min_chunk_size: 50 # minimum tokens per chunk - max_chunk_size: 2000 # maximum tokens per chunk - chars_per_token: 4 # character-to-token ratio for fallback - tokenizer_encoding: "cl100k_base" # tiktoken encoding - - # Content formatting - chunk_id_pattern: "{document_hash}_chunk_{index:03d}" - contextual_template: "{context}\n\n{content}" + # Templates + templates: + chunk_id_pattern: "{document_hash}_chunk_{index:03d}" + context_separator: "\n\n--- Chunk {chunk_id} ---\n\n" # Quality validation - min_word_count: 5 # minimum words per chunk - max_whitespace_ratio: 0.8 # maximum whitespace ratio - max_repetition_ratio: 0.5 # maximum content repetition - + min_word_count: 5 + max_whitespace_ratio: 0.8 + max_repetition_ratio: 0.5 + # Concurrency Configuration concurrency: - max_concurrent_documents: 3 # Process 3 documents simultaneously - max_concurrent_chunks_per_doc: 5 # Generate context for 5 chunks simultaneously - - # Batch Configuration (Small batches) + max_concurrent_documents: 3 + max_concurrent_chunks_per_doc: 5 + + # Batch Configuration batching: - embedding_batch_size: 10 # Small batch size for embeddings - context_batch_size: 5 # Small batch size for context generation - + embedding_batch_size: 10 + context_batch_size: 5 + # Error Handling error_handling: max_retries: 3 - retry_delay_base: 2 # seconds (exponential backoff) + retry_delay_base: 2 continue_on_failure: true log_failures: true - - # Processing Configuration - processing: - batch_delay_seconds: 0.1 # delay between embedding batches - context_delay_seconds: 0.05 # delay between context batches - - # Provider Detection - providers: - azure_patterns: ["azure", "text-embedding-3"] - aws_patterns: ["amazon", "titan"] - openai_patterns: ["openai", "gpt"] - + # Logging Configuration logging: level: "INFO" failure_log_file: "logs/vector_indexer_failures.jsonl" processing_log_file: "logs/vector_indexer_processing.log" stats_log_file: "logs/vector_indexer_stats.json" - + # Dataset Configuration dataset: base_path: "datasets" supported_extensions: [".txt"] metadata_file: "source.meta.json" target_file: "cleaned.txt" - + # Document Loader Configuration document_loader: - # File discovery (existing behavior maintained) target_file: "cleaned.txt" metadata_file: "source.meta.json" - - # Validation rules min_content_length: 10 - max_content_length: 10000000 # 10MB + max_content_length: 10000000 encoding: "utf-8" required_metadata_fields: - "source_url" - - # Performance settings enable_content_caching: false max_scan_depth: 5 - - # File validation min_file_size_bytes: 1 - max_file_size_bytes: 50000000 # 50MB + max_file_size_bytes: 50000000 # Diff Identifier Configuration diff_identifier: - # Dataset tracking datasets_path: "datasets" metadata_filename: "processed-metadata.json" - - # Retry configuration max_retries: 3 - max_delay_seconds: 8 - - # S3Ferry configuration (uses environment variables) - # S3_DATA_BUCKET_NAME, S3_DATA_BUCKET_PATH, S3_ENDPOINT_URL - # S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY \ No newline at end of file + max_delay_seconds: 8 \ No newline at end of file diff --git a/test-vault/agents/llm/agent.hcl b/test-vault/agents/llm/agent.hcl new file mode 100644 index 0000000..9883bfe --- /dev/null +++ b/test-vault/agents/llm/agent.hcl @@ -0,0 +1,45 @@ +vault { + # Inside Docker network, the service name "vault" resolves to the dev Vault + address = "http://vault:8200" +} + +pid_file = "/agent/out/pidfile" + +auto_auth { + method "approle" { + mount_path = "auth/approle" + config = { + role_id_file_path = "/agent/in/role_id" + secret_id_file_path = "/agent/in/secret_id" + remove_secret_id_file_after_reading = false # test-friendly + } + } + + sink "file" { + config = { + path = "/agent/out/token" + } + } +} + +# In-memory cache (free, no Enterprise license) +cache { + default_lease_duration = "1h" +} + +# Listener is required for Agent’s internal servers (not exposed) +listener "tcp" { + address = "127.0.0.1:8201" + tls_disable = true +} + +# dummy template so cache is “active” (some versions require this) +template { + source = "/dev/null" + destination = "/agent/out/dummy" +} + +# Disable API proxy; not needed here +api_proxy { + disable = true +} \ No newline at end of file diff --git a/test-vault/agents/llm/role_id b/test-vault/agents/llm/role_id new file mode 100644 index 0000000..e69de29 diff --git a/test-vault/agents/llm/secret_id b/test-vault/agents/llm/secret_id new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..759d67c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,733 @@ +import os +import time +import subprocess +from pathlib import Path +from typing import Dict, Any, Optional, Generator + +import pytest +import hvac +import requests +from loguru import logger +from testcontainers.compose import DockerCompose # type: ignore + + +# ===================== VaultAgentClient ===================== + + +class VaultAgentClient: + """Client for interacting with Vault using a token written by Vault Agent""" + + def __init__( + self, + vault_url: str, + token_path: Path = Path("test-vault/agent-out/token"), + mount_point: str = "secret", + timeout: int = 10, + ): + self.vault_url = vault_url + self.token_path = token_path + self.mount_point = mount_point + + self.client = hvac.Client(url=self.vault_url, timeout=timeout) + self._load_token() + + def _load_token(self) -> None: + """Load token from file written by Vault Agent""" + if not self.token_path.exists(): + raise FileNotFoundError(f"Vault token file missing: {self.token_path}") + token = self.token_path.read_text().strip() + if not token: + raise ValueError("Vault token file is empty") + self.client.token = token + + def is_authenticated(self) -> bool: + """Check if the current token is valid""" + try: + return self.client.is_authenticated() + except Exception as e: + logger.warning(f"Vault token is not valid: {e}") + return False + + def is_vault_available(self) -> bool: + """Check if Vault is initialized and unsealed""" + try: + status = self.client.sys.read_health_status(method="GET") + return ( + isinstance(status, dict) + and status.get("initialized", False) + and not status.get("sealed", True) + ) + except Exception as e: + logger.warning(f"Vault availability check failed: {e}") + return False + + def get_secret(self, path: str) -> dict: + """Read a secret from Vault KV v2""" + try: + result = self.client.secrets.kv.v2.read_secret_version( + path=path, mount_point=self.mount_point + ) + return result["data"]["data"] + except Exception as e: + logger.error(f"Failed to read Vault secret at {path}: {e}") + raise + + +# ===================== RAGStackTestContainers ===================== + + +class RAGStackTestContainers: + """Manages test containers for RAG stack including Vault, Qdrant, Langfuse, and LLM orchestration service""" + + def __init__(self, compose_file_name: str = "docker-compose-test.yml"): + self.project_root = Path(__file__).parent.parent + self.compose_file_path = self.project_root / compose_file_name + self.compose: Optional[DockerCompose] = None + self.services_info: Dict[str, Dict[str, Any]] = {} + + if not self.compose_file_path.exists(): + raise FileNotFoundError( + f"Docker compose file not found: {self.compose_file_path}" + ) + + def start(self) -> None: + """Start all test containers and bootstrap Vault""" + logger.info("Starting RAG Stack testcontainers...") + + # Prepare Vault Agent directories + agent_in = self.project_root / "test-vault" / "agents" / "llm" + agent_out = self.project_root / "test-vault" / "agent-out" + agent_in.mkdir(parents=True, exist_ok=True) + agent_out.mkdir(parents=True, exist_ok=True) + + # Clean up any stale files from previous runs + for f in ["role_id", "secret_id", "token", "pidfile", "dummy"]: + (agent_in / f).unlink(missing_ok=True) + (agent_out / f).unlink(missing_ok=True) + + # Start all Docker Compose services + logger.info("Starting Docker Compose services...") + self.compose = DockerCompose( + str(self.project_root), + compose_file_name=self.compose_file_path.name, + pull=False, + ) + self.compose.start() + + # Get Vault connection details + vault_url = self._get_vault_url() + logger.info(f"Vault URL: {vault_url}") + + # Wait for Vault to be ready + self._wait_for_vault_ready(vault_url) + + # Configure Vault with AppRole, policies, and test secrets + self._bootstrap_vault_dev(agent_in, vault_url) + + # Verify credentials were written successfully + role_id = (agent_in / "role_id").read_text().strip() + secret_id = (agent_in / "secret_id").read_text().strip() + logger.info( + f"AppRole credentials written: role_id={role_id[:8]}..., secret_id={secret_id[:8]}..." + ) + + # Wait for Vault Agent to authenticate and write token + logger.info("Waiting for vault-agent to authenticate...") + self._wait_for_valid_token(agent_out / "token", vault_url, max_attempts=20) + + logger.info("Vault Agent authenticated successfully") + + # Wait for other services to be ready + self._wait_for_services() + self._collect_service_info() + + # Index test data into Qdrant + self._index_test_data() + + logger.info("RAG Stack testcontainers ready") + + def stop(self) -> None: + """Stop all test containers""" + if self.compose: + logger.info("Stopping RAG Stack testcontainers...") + self.compose.stop() + logger.info("Testcontainers stopped") + + def _get_vault_url(self) -> str: + """Get the mapped Vault URL accessible from the host""" + if not self.compose: + raise RuntimeError("Docker Compose not initialized") + host = self.compose.get_service_host("vault", 8200) + port = self.compose.get_service_port("vault", 8200) + return f"http://{host}:{port}" + + def _wait_for_vault_ready(self, vault_url: str, timeout: int = 60) -> None: + """Wait for Vault to be initialized and unsealed""" + logger.info("Waiting for Vault to be available...") + client = hvac.Client(url=vault_url, token="root", timeout=10) + + start = time.time() + while time.time() - start < timeout: + try: + status = client.sys.read_health_status(method="GET") + if status.get("initialized", False) and not status.get("sealed", True): + logger.info("Vault is available and unsealed") + return + except Exception: + pass + time.sleep(2) + + raise TimeoutError("Vault did not become available within 60s") + + def _bootstrap_vault_dev(self, agent_in: Path, vault_url: str) -> None: + """ + Bootstrap Vault dev instance with: + - AppRole auth method + - Policy for LLM orchestration service + - AppRole role and credentials + - Test secrets (LLM connections, Langfuse, embeddings, guardrails) + """ + logger.info("Bootstrapping Vault with AppRole and test secrets...") + client = hvac.Client(url=vault_url, token="root") + + # Enable AppRole authentication method + if "approle/" not in client.sys.list_auth_methods(): + client.sys.enable_auth_method("approle") + logger.info("AppRole enabled") + + # Create policy with permissions for all secret paths (updated with correct embedding paths) + policy = """ +path "secret/metadata/llm/*" { capabilities = ["list"] } +path "secret/data/llm/*" { capabilities = ["read"] } +path "secret/metadata/langfuse/*" { capabilities = ["list"] } +path "secret/data/langfuse/*" { capabilities = ["read"] } +path "secret/metadata/embeddings/*" { capabilities = ["list"] } +path "secret/data/embeddings/*" { capabilities = ["read"] } +path "secret/metadata/guardrails/*" { capabilities = ["list"] } +path "secret/data/guardrails/*" { capabilities = ["read"] } +path "auth/token/lookup-self" { capabilities = ["read"] } +path "auth/token/renew-self" { capabilities = ["update"] } +""" + client.sys.create_or_update_policy("llm-orchestration", policy) + logger.info("Policy 'llm-orchestration' created") + + # Create AppRole role with service token type + role_name = "llm-orchestration-service" + client.write( + f"auth/approle/role/{role_name}", + **{ + "token_policies": "llm-orchestration", + "secret_id_ttl": "24h", + "token_ttl": "1h", + "token_max_ttl": "24h", + "secret_id_num_uses": 0, + "bind_secret_id": True, + "token_no_default_policy": True, + "token_type": "service", + }, + ) + logger.info(f"AppRole '{role_name}' created") + + # Generate credentials for the AppRole + role_id = client.read(f"auth/approle/role/{role_name}/role-id")["data"][ + "role_id" + ] + secret_id = client.write(f"auth/approle/role/{role_name}/secret-id")["data"][ + "secret_id" + ] + + # Write credentials to files that Vault Agent will read + (agent_in / "role_id").write_text(role_id, encoding="utf-8") + (agent_in / "secret_id").write_text(secret_id, encoding="utf-8") + logger.info("AppRole credentials written to agent-in/") + + # Write test secrets + self._write_test_secrets(client) + + def _write_test_secrets(self, client: hvac.Client) -> None: + """Write all test secrets to Vault with correct path structure""" + + # ============================================================ + # CRITICAL DEBUG SECTION - Environment Variables + # ============================================================ + logger.info("=" * 80) + logger.info("VAULT SECRET BOOTSTRAP - ENVIRONMENT VARIABLES DEBUG") + logger.info("=" * 80) + + azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") + azure_api_key = os.getenv("AZURE_OPENAI_API_KEY") + azure_deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT") + azure_embedding_deployment = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") + + # Validate critical environment variables + missing_vars = [] + if not azure_endpoint: + missing_vars.append("AZURE_OPENAI_ENDPOINT") + if not azure_api_key: + missing_vars.append("AZURE_OPENAI_API_KEY") + if not azure_embedding_deployment: + missing_vars.append("AZURE_OPENAI_EMBEDDING_DEPLOYMENT") + + if missing_vars: + error_msg = f"CRITICAL: Missing required environment variables: {', '.join(missing_vars)}" + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info("All required environment variables are set") + logger.info("=" * 80) + + # ============================================================ + # CHAT MODEL SECRET (LLM path) + # ============================================================ + logger.info("") + logger.info("Writing LLM connection secret (chat model)...") + llm_secret = { + "connection_id": "evalconnection-1", + "endpoint": azure_endpoint, + "api_key": azure_api_key, + "deployment_name": azure_deployment or "gpt-4o-mini", + "environment": "test", + "model": "gpt-4o-mini", + "model_type": "chat", + "api_version": "2024-02-15-preview", + "tags": "azure,test,chat", + } + + logger.info(f" → chat deployment: {llm_secret['deployment_name']}") + logger.info(f" → endpoint: {llm_secret['endpoint']}") + logger.info(f" → connection_id: {llm_secret['connection_id']}") + + client.secrets.kv.v2.create_or_update_secret( + mount_point="secret", + path="llm/connections/azure_openai/test/evalconnection-1", + secret=llm_secret, + ) + logger.info( + "LLM connection secret written to llm/connections/azure_openai/test/evalconnection-1" + ) + + # ============================================================ + # EMBEDDING MODEL SECRET (Embeddings path) + # KEY INSIGHT: Use the SAME connection_id for both! + # ============================================================ + logger.info("") + logger.info("Writing embedding model secret...") + embedding_secret = { + "connection_id": "evalconnection-1", + "endpoint": azure_endpoint, + "api_key": azure_api_key, + "deployment_name": azure_embedding_deployment, # This is the embedding deployment + "environment": "test", + "model": "text-embedding-3-large", + "model_type": "embedding", + "api_version": "2024-02-15-preview", + "max_tokens": 2048, + "vector_size": 3072, + "tags": "azure,embedding,test", + } + + logger.info(f" → model: {embedding_secret['model']}") + logger.info(f" → connection_id: {embedding_secret['connection_id']}") + logger.info( + " → Vault path: embeddings/connections/azure_openai/test/evalconnection-1" + ) + + # Write to embeddings path with connection_id in the path + client.secrets.kv.v2.create_or_update_secret( + mount_point="secret", + path="embeddings/connections/azure_openai/test/evalconnection-1", + secret=embedding_secret, + ) + logger.info( + "Embedding secret written to embeddings/connections/azure_openai/test/evalconnection-1" + ) + + # ============================================================ + # VERIFY SECRETS WERE WRITTEN CORRECTLY + # ============================================================ + logger.info("") + logger.info("Verifying secrets in Vault...") + try: + # Verify LLM path + verify_llm = client.secrets.kv.v2.read_secret_version( + path="llm/connections/azure_openai/test/evalconnection-1", + mount_point="secret", + ) + llm_data = verify_llm["data"]["data"] + logger.info("LLM path verified:") + logger.info(f" • connection_id: {llm_data.get('connection_id')}") + + # Verify embeddings path + verify_embedding = client.secrets.kv.v2.read_secret_version( + path="embeddings/connections/azure_openai/test/evalconnection-1", + mount_point="secret", + ) + embedding_data = verify_embedding["data"]["data"] + logger.info("Embeddings path verified:") + logger.info(f" • model: {embedding_data.get('model')}") + logger.info(f" • connection_id: {embedding_data.get('connection_id')}") + + # Critical validation + if embedding_data.get("deployment_name") != azure_embedding_deployment: + error_msg = ( + "VAULT SECRET MISMATCH! " + "Expected deployment_name='{azure_embedding_deployment}' " + f"but Vault has '{embedding_data.get('deployment_name')}'" + ) + logger.error(error_msg) + raise ValueError(error_msg) + + if embedding_data.get("connection_id") != "evalconnection-1": + error_msg = ( + "VAULT SECRET MISMATCH! " + "Expected connection_id='evalconnection-1' " + f"but Vault has '{embedding_data.get('connection_id')}'" + ) + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info("Secret verification PASSED") + + except Exception as e: + logger.error(f"Failed to verify secrets: {e}") + raise + + # ============================================================ + # LANGFUSE CONFIGURATION + # ============================================================ + logger.info("") + logger.info("Writing Langfuse configuration secret...") + langfuse_secret = { + "public_key": "pk-lf-test", + "secret_key": "sk-lf-test", + "host": "http://langfuse-web:3000", + } + client.secrets.kv.v2.create_or_update_secret( + mount_point="secret", path="langfuse/config", secret=langfuse_secret + ) + logger.info("Langfuse configuration secret written") + + # ============================================================ + # GUARDRAILS CONFIGURATION + # ============================================================ + logger.info("") + logger.info("Writing Guardrails configuration secret...") + guardrails_secret = { + "connection_id": "guardrails-test-1", + "api_key": os.getenv("ANTHROPIC_API_KEY", "TEST_ANTHROPIC_KEY"), + "model": "claude-3-5-sonnet-20241022", + "environment": "test", + "tags": "anthropic,guardrails,test", + } + client.secrets.kv.v2.create_or_update_secret( + mount_point="secret", + path="guardrails/anthropic/test/claude-3-5-sonnet", + secret=guardrails_secret, + ) + logger.info("Guardrails configuration secret written") + + logger.info("=" * 80) + logger.info("ALL SECRETS WRITTEN SUCCESSFULLY") + logger.info("=" * 80) + + def _capture_service_logs(self) -> None: + """Capture logs from all services before cleanup.""" + services = ["llm-orchestration-service", "vault", "qdrant", "langfuse-web"] + + for service in services: + try: + logger.info(f"\n{'=' * 60}") + logger.info(f"LOGS: {service}") + logger.info("=" * 60) + + result = subprocess.run( + [ + "docker", + "compose", + "-f", + str(self.compose_file_path), + "logs", + "--tail", + "200", + service, + ], + capture_output=True, + text=True, + timeout=10, + cwd=str(self.project_root), + ) + + if result.stdout: + logger.info(result.stdout) + if result.stderr: + logger.error(result.stderr) + + except Exception as e: + logger.error(f"Failed to capture logs for {service}: {e}") + + def _wait_for_valid_token( + self, token_path: Path, vault_url: str, max_attempts: int = 20 + ) -> None: + """Wait for Vault Agent to write a valid token and verify it works""" + for attempt in range(max_attempts): + if token_path.exists() and token_path.stat().st_size > 0: + try: + # Fix permissions before reading + self._fix_token_file_permissions(token_path) + + token = token_path.read_text().strip() + + client = hvac.Client(url=vault_url, token=token) + try: + client.lookup_token() + + if client.is_authenticated(): + logger.info(f"Valid token obtained (attempt {attempt + 1})") + self._verify_token_permissions(client) + return + except Exception as e: + if attempt < max_attempts - 1: + logger.debug( + f"Token validation error (attempt {attempt + 1}): {type(e).__name__}" + ) + except PermissionError as e: + logger.warning( + f"Permission error reading token file (attempt {attempt + 1}): {e}" + ) + # Try to fix permissions again + self._fix_token_file_permissions(token_path, force=True) + + time.sleep(2) + + logger.error("Failed to obtain valid Vault token") + self._check_agent_logs() + raise TimeoutError( + f"Failed to obtain valid Vault token after {max_attempts} attempts" + ) + + def _fix_token_file_permissions( + self, token_path: Path, force: bool = False + ) -> None: + """Fix permissions on token file to make it readable by host user""" + try: + # Try to change permissions using subprocess (requires Docker to be accessible) + if force: + logger.info( + "Attempting to fix token file permissions using docker exec..." + ) + result = subprocess.run( + [ + "docker", + "exec", + "vault-agent-llm", + "chmod", + "644", + "/agent/out/token", + ], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode == 0: + logger.info( + "Successfully fixed token file permissions via docker exec" + ) + else: + logger.warning( + f"Failed to fix permissions via docker exec: {result.stderr}" + ) + + # Also try direct chmod (may not work in all environments) + try: + os.chmod(token_path, 0o644) + except Exception as chmod_error: + logger.debug( + f"Direct chmod failed (expected in some environments): {chmod_error}" + ) + + except Exception as e: + logger.debug(f"Could not fix token file permissions: {e}") + + def _verify_token_permissions(self, client: hvac.Client) -> None: + """Verify the token has correct permissions to read secrets""" + try: + client.secrets.kv.v2.read_secret_version( + path="llm/connections/azure_openai/test/evalconnection-1", + mount_point="secret", + ) + logger.info("Token has correct permissions to read secrets") + except Exception as e: + logger.error(f"Token cannot read secrets: {e}") + raise + + def _check_agent_logs(self) -> None: + """Check vault-agent logs for debugging authentication issues""" + result = subprocess.run( + ["docker", "logs", "--tail", "50", "vault-agent-llm"], + capture_output=True, + text=True, + ) + logger.error(f"Vault Agent Logs:\n{result.stdout}\n{result.stderr}") + + def _wait_for_services(self, total_timeout: int = 300) -> None: + """Wait for all services to be healthy""" + services = [ + ("qdrant", 6333, self._check_qdrant, 60), + ("langfuse-web", 3000, self._check_langfuse, 120), + ("llm-orchestration-service", 8100, self._check_orchestration, 180), + ] + start = time.time() + for name, port, check, timeout in services: + self._wait_single(name, port, check, timeout, start, total_timeout) + + def _wait_single( + self, + name: str, + port: int, + check: Any, + timeout: int, + global_start: float, + total_timeout: int, + ) -> None: + """Wait for a single service to be ready""" + if self.compose is None: + return + + logger.info(f"Waiting for {name}...") + start = time.time() + while time.time() - start < timeout: + try: + host = self.compose.get_service_host(name, port) + mapped_port = self.compose.get_service_port(name, port) + if check(host, mapped_port): + logger.info(f"{name} ready at {host}:{mapped_port}") + self.services_info[name] = { + "host": host, + "port": mapped_port, + "url": f"http://{host}:{mapped_port}", + } + return + except Exception: + pass + time.sleep(3) + raise TimeoutError(f"Timeout waiting for {name}") + + def _check_qdrant(self, host: str, port: int) -> bool: + """Check if Qdrant is ready""" + try: + r = requests.get(f"http://{host}:{port}/collections", timeout=5) + return r.status_code == 200 + except Exception: + return False + + def _check_langfuse(self, host: str, port: int) -> bool: + """Check if Langfuse is ready""" + try: + r = requests.get(f"http://{host}:{port}/api/public/health", timeout=5) + return r.status_code == 200 + except Exception: + return False + + def _check_orchestration(self, host: str, port: int) -> bool: + """Check if LLM orchestration service is healthy""" + try: + r = requests.get(f"http://{host}:{port}/health", timeout=5) + return r.status_code == 200 and r.json().get("status") == "healthy" + except Exception: + return False + + def _collect_service_info(self) -> None: + """Collect service connection information""" + if self.compose: + self.services_info["vault"] = { + "host": self.compose.get_service_host("vault", 8200), + "port": self.compose.get_service_port("vault", 8200), + "url": self._get_vault_url(), + } + + def get_orchestration_service_url(self) -> str: + """Get the URL for the LLM orchestration service""" + return self.services_info["llm-orchestration-service"]["url"] + + def get_qdrant_url(self) -> str: + """Get the URL for Qdrant""" + return self.services_info["qdrant"]["url"] + + def get_vault_url(self) -> str: + """Get the URL for Vault""" + return self.services_info["vault"]["url"] + + def get_langfuse_url(self) -> str: + """Get the URL for Langfuse""" + return self.services_info.get("langfuse-web", {}).get( + "url", "http://localhost:3000" + ) + + def is_service_available(self, service_name: str) -> bool: + """Check if a service is available""" + return service_name in self.services_info + + def _index_test_data(self) -> None: + """Index test documents into Qdrant for retrieval testing.""" + logger.info("Indexing test data into Qdrant contextual collections...") + + try: + from tests.helpers.test_data_loader import load_test_data_into_qdrant + + load_test_data_into_qdrant( + orchestration_url=self.get_orchestration_service_url(), + qdrant_url=self.get_qdrant_url(), + ) + + logger.info("Test data indexing complete") + + except Exception as e: + logger.error(f"Failed to index test data: {e}") + raise + + +# ===================== Pytest Fixtures ===================== + + +@pytest.fixture(scope="session") +def rag_stack() -> Generator[RAGStackTestContainers, None, None]: + """ + Session-scoped fixture that starts all test containers once per test session. + Containers are automatically stopped after all tests complete. + """ + stack = RAGStackTestContainers() + try: + stack.start() + yield stack + except Exception as e: + # If startup fails, capture logs before cleanup + logger.error(f"RAG stack startup failed: {e}") + try: + stack._capture_service_logs() + except Exception as e: + logger.error(f"Could not capture logs after startup failure: {e}") + pass + raise + finally: + logger.info("=" * 80) + logger.info("CAPTURING SERVICE LOGS BEFORE CLEANUP") + logger.info("=" * 80) + try: + stack._capture_service_logs() + except Exception as e: + logger.error(f"Could not capture logs: {e}") + stack.stop() + + +@pytest.fixture(scope="function") +def orchestration_client(rag_stack: RAGStackTestContainers) -> Any: + """ + Function-scoped fixture that provides a configured requests session + for testing the LLM orchestration service API. + """ + session = requests.Session() + session.headers.update( + {"Content-Type": "application/json", "Accept": "application/json"} + ) + setattr(session, "base_url", rag_stack.get_orchestration_service_url()) + return session diff --git a/tests/deepeval_tests/red_team_tests.py b/tests/deepeval_tests/red_team_tests.py index 0413913..6ada621 100644 --- a/tests/deepeval_tests/red_team_tests.py +++ b/tests/deepeval_tests/red_team_tests.py @@ -4,6 +4,7 @@ import sys import datetime import pytest +import requests from deepteam import red_team from deepteam.attacks.single_turn import ( @@ -35,7 +36,6 @@ ) sys.path.insert(0, str(Path(__file__).parent.parent)) -from mocks.dummy_llm_orchestrator import process_query class ComprehensiveResultCollector: @@ -192,11 +192,9 @@ def setup_class(cls): ] cls.multi_turn_attacks = [ - LinearJailbreaking(turns=3, weight=1), # Reduced turns for faster execution + LinearJailbreaking(turns=3, weight=1), SequentialJailbreak(type="dialogue", persona="student", weight=1), - CrescendoJailbreaking( - max_rounds=5, max_backtracks=5, weight=1 - ), # Reduced complexity + CrescendoJailbreaking(max_rounds=5, max_backtracks=5, weight=1), ] # All vulnerabilities organized by category @@ -230,17 +228,46 @@ def setup_class(cls): f"Loaded {len(cls.all_vulnerabilities)} vulnerabilities for comprehensive testing" ) + # Store orchestration URL for use in tests + cls.orchestration_url = None + + def setup_method(self, method): + """Setup method called before each test - captures orchestration client.""" + # This will be set by the orchestration_client fixture + pass + async def model_callback(self, input_text: str, language: str = "en") -> str: - """Model callback function for DeepTeam red teaming.""" + """Model callback function for DeepTeam red teaming - uses orchestration API.""" try: if not input_text or len(input_text.strip()) == 0: return "I cannot process empty queries." - result = process_query( - question=input_text, provider="anthropic", include_contexts=True + # Use the orchestration service endpoint + api_request = { + "chatId": f"redteam-{datetime.datetime.now().timestamp()}", + "message": input_text, + "authorId": "redteam-tester", + "conversationHistory": [], + "url": "https://test.example.com", + "environment": "test", + "connection_id": "evalconnection-1", + } + + response = requests.post( + f"{self.orchestration_url}/orchestrate-test", + json=api_request, + timeout=60, ) - return result["response"] + response.raise_for_status() + result = response.json() + + return result.get("content", "No response generated") + + except requests.exceptions.RequestException as e: + print(f"API request failed: {e}") + return f"Error processing query: {str(e)}" except Exception as e: + print(f"Model callback error: {e}") return f"Error processing query: {str(e)}" def _evaluate_results(self, red_team_results: Any) -> bool: @@ -265,10 +292,14 @@ def _evaluate_results(self, red_team_results: Any) -> bool: print(f"Evaluation error: {e}") return False - def test_comprehensive_security_assessment(self): + def test_comprehensive_security_assessment(self, orchestration_client): """Run comprehensive security assessment covering all attack vectors and vulnerabilities.""" + # Get orchestration URL from fixture + self.orchestration_url = orchestration_client.base_url + print("\n" + "=" * 80) print("STARTING COMPREHENSIVE RAG SYSTEM SECURITY ASSESSMENT") + print(f"Orchestration URL: {self.orchestration_url}") print("=" * 80) start_time = datetime.datetime.now() diff --git a/tests/deepeval_tests/standard_tests.py b/tests/deepeval_tests/standard_tests.py index a30e284..5041f32 100644 --- a/tests/deepeval_tests/standard_tests.py +++ b/tests/deepeval_tests/standard_tests.py @@ -4,6 +4,7 @@ from pathlib import Path import sys import datetime +import requests from deepeval.test_case import LLMTestCase from deepeval.metrics.answer_relevancy.answer_relevancy import AnswerRelevancyMetric from deepeval.metrics import ( @@ -14,7 +15,6 @@ ) sys.path.insert(0, str(Path(__file__).parent.parent)) -from mocks.dummy_llm_orchestrator import process_query class StandardResultCollector: @@ -108,7 +108,7 @@ def save_results_fixture(): class TestRAGSystem: - """Test suite for RAG system evaluation using DeepEval metrics.""" + """Test suite for RAG system evaluation using DeepEval metrics via API.""" @classmethod def setup_class(cls): @@ -130,21 +130,67 @@ def setup_class(cls): print(f"Loaded {len(cls.test_data)} test cases") def create_test_case( - self, data_item: Dict[str, Any], provider: str = "anthropic" + self, data_item: Dict[str, Any], orchestration_url: str ) -> LLMTestCase: - """Create a DeepEval test case from data item.""" - # Generate actual output using the dummy orchestrator - result = process_query( - question=data_item["input"], provider=provider, include_contexts=True - ) + """Create a DeepEval test case by calling the orchestration API.""" + + # Prepare API request + api_request = { + "chatId": f"test-{data_item.get('id', 'unknown')}", + "message": data_item["input"], + "authorId": "deepeval-tester", + "conversationHistory": [], + "url": "https://test.example.com", + "environment": "test", + "connection_id": "evalconnection-1", + } - llm_test_case = LLMTestCase( - input=data_item["input"], - actual_output=result["response"], - expected_output=data_item["expected_output"], - retrieval_context=result["retrieval_context"], - ) - return llm_test_case + # Call the testing endpoint + try: + response = requests.post( + f"{orchestration_url}/orchestrate-test", json=api_request, timeout=60 + ) + response.raise_for_status() + result = response.json() + print("=" * 80) + print("API RESPONSE DEBUG") + print("=" * 80) + print(f"Response keys: {list(result.keys())}") + print(f"Full response: {json.dumps(result, indent=2)}") + print("=" * 80) + # Extract data from API response + actual_output = result.get("content", "") + retrieval_context = result.get("retrieval_context", []) + if retrieval_context is None: + retrieval_context = [] + + # Convert retrieval context to strings for DeepEval + retrieval_context_strings = [] + if retrieval_context and isinstance(retrieval_context, list): + retrieval_context_strings = [ + chunk.get("content", "") if isinstance(chunk, dict) else str(chunk) + for chunk in retrieval_context + ] + + # Create DeepEval test case + llm_test_case = LLMTestCase( + input=data_item["input"], + actual_output=actual_output, + expected_output=data_item["expected_output"], + retrieval_context=retrieval_context_strings, + ) + + return llm_test_case + + except requests.exceptions.RequestException as e: + print(f"API request failed: {e}") + # Return a test case with error message + return LLMTestCase( + input=data_item["input"], + actual_output=f"API Error: {str(e)}", + expected_output=data_item["expected_output"], + retrieval_context=[], + ) @pytest.mark.parametrize( "test_item", @@ -159,9 +205,14 @@ def create_test_case( ) ], ) - def test_all_metrics(self, test_item: Dict[str, Any]): + def test_all_metrics(self, test_item: Dict[str, Any], orchestration_client): """Test all metrics for each test case and collect results.""" - test_case = self.create_test_case(test_item) + + # Get orchestration service URL from fixture + orchestration_url = orchestration_client.base_url + + # Create test case by calling API + test_case = self.create_test_case(test_item, orchestration_url) # Get test case index for consistent numbering test_case_num = self.test_data.index(test_item) + 1 diff --git a/tests/helpers/test_data_loader.py b/tests/helpers/test_data_loader.py new file mode 100644 index 0000000..0ba0da8 --- /dev/null +++ b/tests/helpers/test_data_loader.py @@ -0,0 +1,413 @@ +"""Helper module to load test data into Qdrant before running tests.""" + +import requests +import uuid +from typing import List, Dict, Any +from loguru import logger +from datetime import datetime + + +def load_test_data_into_qdrant( + orchestration_url: str, + qdrant_url: str, +) -> None: + """Load test documents into Qdrant contextual collections for retrieval testing.""" + logger.info("Loading test data into Qdrant contextual collections...") + + test_documents = get_test_documents() + + try: + # Create embeddings via orchestration service + texts = [doc["contextual_content"] for doc in test_documents] + + logger.info(f"Creating embeddings for {len(texts)} documents...") + + # CRITICAL: Use correct test environment values + embedding_response = requests.post( + f"{orchestration_url}/embeddings", + json={ + "texts": texts, + "environment": "test", # ← MUST be "test" + "connection_id": "evalconnection-1", # ← MUST match Vault + "batch_size": 50, + }, + timeout=120, + ) + + # Debug logging + logger.info(f"Embedding API response status: {embedding_response.status_code}") + if embedding_response.status_code != 200: + logger.error(f"Embedding API error: {embedding_response.text}") + raise RuntimeError(f"Embedding creation failed: {embedding_response.text}") + + embeddings_data = embedding_response.json() + + # Debug: Log the actual response structure + logger.info("=" * 60) + logger.info("EMBEDDING API RESPONSE DEBUG") + logger.info("=" * 60) + logger.info(f"Response keys: {list(embeddings_data.keys())}") + for key, value in embeddings_data.items(): + if key == "embeddings": + logger.info(f" {key}: list of {len(value)} embeddings") + if value: + logger.info(f" First embedding length: {len(value[0])}") + else: + logger.info(f" {key}: {value}") + logger.info("=" * 60) + + # Extract embeddings and metadata with proper fallbacks + embeddings = embeddings_data.get("embeddings", []) + if not embeddings: + raise RuntimeError("No embeddings returned from API") + + # Get vector size from first embedding (most reliable method) + vector_size = len(embeddings[0]) + + # Try to get model name from various possible fields + model_used = ( + embeddings_data.get("model_used") + or embeddings_data.get("model") + or embeddings_data.get("embedding_model") + or "text-embedding-3-large" # Fallback + ) + + logger.info(f"Created {len(embeddings)} embeddings") + logger.info(f" Vector size: {vector_size}") + logger.info(f" Model: {model_used}") + + # Step 2: Determine which collection to use based on model + collection_name = _determine_collection_from_model(model_used) + logger.info(f"Using collection: {collection_name}") + + # Step 3: Ensure collection exists with proper configuration + import httpx + + async_client = httpx.Client(timeout=30.0) + + try: + # Check if collection exists + response = async_client.get(f"{qdrant_url}/collections/{collection_name}") + + if response.status_code == 404: + # Create collection + logger.info(f"Creating collection '{collection_name}'...") + create_payload = { + "vectors": { + "size": vector_size, + "distance": "Cosine", + }, + "optimizers_config": {"default_segment_number": 2}, + "replication_factor": 1, + } + + response = async_client.put( + f"{qdrant_url}/collections/{collection_name}", json=create_payload + ) + + if response.status_code not in [200, 201]: + raise RuntimeError(f"Failed to create collection: {response.text}") + + logger.info(f"Created collection '{collection_name}'") + else: + logger.info(f"Collection '{collection_name}' already exists") + + except Exception as e: + logger.error(f"Collection setup failed: {e}") + raise + + # Step 4: Index documents in Qdrant using the contextual format + points = [] + for _, (doc, embedding) in enumerate(zip(test_documents, embeddings)): + # Generate UUID for point ID (Qdrant requirement) + point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, doc["chunk_id"])) + + # Create payload matching ContextualChunk structure + payload = { + # Core identifiers + "chunk_id": doc["chunk_id"], + "document_hash": doc["document_hash"], + "chunk_index": doc["chunk_index"], + "total_chunks": 1, + # Content (matching contextual retrieval format) + "original_content": doc["original_content"], + "contextual_content": doc["contextual_content"], + "context_only": doc["context"], + # Embedding info + "embedding_model": model_used, + "vector_dimensions": vector_size, + # Document metadata + "document_url": doc["metadata"].get("source", "test_document"), + "dataset_collection": "test_collection", + # Processing metadata + "processing_timestamp": datetime.now().isoformat(), + "tokens_count": len(doc["contextual_content"]) // 4, # Rough estimate + # Additional metadata + **doc["metadata"], + } + + points.append({"id": point_id, "vector": embedding, "payload": payload}) + + # Step 5: Upsert points in batches + batch_size = 100 + for i in range(0, len(points), batch_size): + batch = points[i : i + batch_size] + + upsert_payload = {"points": batch} + + response = async_client.put( + f"{qdrant_url}/collections/{collection_name}/points", + json=upsert_payload, + ) + + if response.status_code not in [200, 201]: + raise RuntimeError(f"Failed to upsert points: {response.text}") + + logger.info(f"Indexed batch {i // batch_size + 1} ({len(batch)} points)") + + async_client.close() + + # Step 6: Verify indexing + response = requests.get(f"{qdrant_url}/collections/{collection_name}") + if response.status_code == 200: + collection_info = response.json() + points_count = collection_info.get("result", {}).get("points_count", 0) + logger.info(f"Collection verification - Points count: {points_count}") + + logger.info(f"Successfully indexed {len(points)} documents into Qdrant") + + except Exception as e: + logger.error(f"Failed to load test data: {e}") + raise + + +def _determine_collection_from_model(model_name: str) -> str: + """Determine which Qdrant collection to use based on embedding model.""" + model_lower = model_name.lower() + + # Azure OpenAI models -> contextual_chunks_azure + if any( + keyword in model_lower for keyword in ["azure", "text-embedding", "ada-002"] + ): + return "contextual_chunks_azure" + + # AWS Bedrock models -> contextual_chunks_aws + elif any( + keyword in model_lower for keyword in ["titan", "amazon", "aws", "bedrock"] + ): + return "contextual_chunks_aws" + + # Default to Azure collection + else: + logger.warning( + f"Unknown model {model_name}, defaulting to contextual_chunks_azure" + ) + return "contextual_chunks_azure" + + +def get_test_documents() -> List[Dict[str, Any]]: + """ + Get test documents in contextual retrieval format. + + Each document includes: + - original_content: The raw chunk text + - context: Brief contextual description (simulating Anthropic methodology) + - contextual_content: context + original_content (what gets embedded) + """ + return [ + { + "chunk_id": "test_doc_001_chunk_000", + "document_hash": "test_doc_001", + "chunk_index": 0, + "original_content": "In 2021, the pension will become more flexible. People will be able to choose the most suitable time for their retirement, partially withdraw their pension or stop payment of their pension if they wish, in effect creating their own personal pension plan.", + "context": "This chunk discusses pension flexibility reforms in Estonia.", + "contextual_content": "This chunk discusses pension flexibility reforms in Estonia.\n\nIn 2021, the pension will become more flexible. People will be able to choose the most suitable time for their retirement, partially withdraw their pension or stop payment of their pension if they wish, in effect creating their own personal pension plan.", + "metadata": { + "category": "pension_information", + "language": "en", + "source": "gov_policy_2021", + }, + }, + { + "chunk_id": "test_doc_002_chunk_000", + "document_hash": "test_doc_002", + "chunk_index": 0, + "original_content": "Starting in 2027, retirement age calculations will be based on the life expectancy of 65-year-olds. The pension system will thus be in line with demographic developments.", + "context": "This chunk explains future pension age calculation changes.", + "contextual_content": "This chunk explains future pension age calculation changes.\n\nStarting in 2027, retirement age calculations will be based on the life expectancy of 65-year-olds. The pension system will thus be in line with demographic developments.", + "metadata": { + "category": "pension_information", + "language": "en", + "source": "pension_reform_2027", + }, + }, + { + "chunk_id": "test_doc_003_chunk_000", + "document_hash": "test_doc_003", + "chunk_index": 0, + "original_content": "From 2021, the formula for the state old-age pension will be upgraded - starting in 2021, we will start collecting the so-called joint part.", + "context": "This chunk describes pension formula updates.", + "contextual_content": "This chunk describes pension formula updates.\n\nFrom 2021, the formula for the state old-age pension will be upgraded - starting in 2021, we will start collecting the so-called joint part.", + "metadata": { + "category": "pension_information", + "language": "en", + "source": "pension_formula_update", + }, + }, + { + "chunk_id": "test_doc_004_chunk_000", + "document_hash": "test_doc_004", + "chunk_index": 0, + "original_content": "In 2021, a total of approximately 653 million euros in benefits were paid to families. Approximately 310 million euros for family benefits; Approximately 280 million euros for parental benefit.", + "context": "This chunk provides family benefit payment statistics.", + "contextual_content": "This chunk provides family benefit payment statistics.\n\nIn 2021, a total of approximately 653 million euros in benefits were paid to families. Approximately 310 million euros for family benefits; Approximately 280 million euros for parental benefit.", + "metadata": { + "category": "family_benefits", + "language": "en", + "source": "benefits_report_2021", + }, + }, + { + "chunk_id": "test_doc_005_chunk_000", + "document_hash": "test_doc_005", + "chunk_index": 0, + "original_content": "The Estonian parental benefit system is one of the most generous in the world, both in terms of the length of the period covered by the benefit and the amount of the benefit.", + "context": "This chunk describes Estonia's parental benefit system.", + "contextual_content": "This chunk describes Estonia's parental benefit system.\n\nThe Estonian parental benefit system is one of the most generous in the world, both in terms of the length of the period covered by the benefit and the amount of the benefit.", + "metadata": { + "category": "family_benefits", + "language": "en", + "source": "parental_benefits_overview", + }, + }, + { + "chunk_id": "test_doc_006_chunk_000", + "document_hash": "test_doc_006", + "chunk_index": 0, + "original_content": "23,687 families and 78,296 children receive support for families with many children, including 117 families with seven or more children.", + "context": "This chunk provides statistics on multi-child family support.", + "contextual_content": "This chunk provides statistics on multi-child family support.\n\n23,687 families and 78,296 children receive support for families with many children, including 117 families with seven or more children.", + "metadata": { + "category": "family_benefits", + "language": "en", + "source": "family_support_stats", + }, + }, + { + "chunk_id": "test_doc_007_chunk_000", + "document_hash": "test_doc_007", + "chunk_index": 0, + "original_content": "8,804 parents and 10,222 children receive single parent support.", + "context": "This chunk provides single parent support statistics.", + "contextual_content": "This chunk provides single parent support statistics.\n\n8,804 parents and 10,222 children receive single parent support.", + "metadata": { + "category": "single_parent_support", + "language": "en", + "source": "single_parent_stats", + }, + }, + { + "chunk_id": "test_doc_008_chunk_000", + "document_hash": "test_doc_008", + "chunk_index": 0, + "original_content": "Single-parent (mostly mother) families are at the highest risk of poverty, of whom 5.3% live in absolute poverty and 27.3% in relative poverty.", + "context": "This chunk discusses poverty risks for single-parent families.", + "contextual_content": "This chunk discusses poverty risks for single-parent families.\n\nSingle-parent (mostly mother) families are at the highest risk of poverty, of whom 5.3% live in absolute poverty and 27.3% in relative poverty.", + "metadata": { + "category": "single_parent_support", + "language": "en", + "source": "poverty_statistics", + }, + }, + { + "chunk_id": "test_doc_009_chunk_000", + "document_hash": "test_doc_009", + "chunk_index": 0, + "original_content": "Since January 2022, the Ministry of Social Affairs has been looking for solutions to support single-parent families.", + "context": "This chunk describes ministry initiatives for single parents.", + "contextual_content": "This chunk describes ministry initiatives for single parents.\n\nSince January 2022, the Ministry of Social Affairs has been looking for solutions to support single-parent families.", + "metadata": { + "category": "single_parent_support", + "language": "en", + "source": "ministry_initiatives_2022", + }, + }, + { + "chunk_id": "test_doc_010_chunk_000", + "document_hash": "test_doc_010", + "chunk_index": 0, + "original_content": "Ticket refund is only possible if at least 60 minutes remain until the departure of the trip.", + "context": "This chunk explains train ticket refund timing policy.", + "contextual_content": "This chunk explains train ticket refund timing policy.\n\nTicket refund is only possible if at least 60 minutes remain until the departure of the trip.", + "metadata": { + "category": "train_services", + "language": "en", + "source": "elron_refund_policy", + }, + }, + { + "chunk_id": "test_doc_011_chunk_000", + "document_hash": "test_doc_011", + "chunk_index": 0, + "original_content": "The ticket cost is refunded to the Elron travel card without service charge only if the refund request is submitted through the Elron homepage refund form.", + "context": "This chunk describes fee-free refund process.", + "contextual_content": "This chunk describes fee-free refund process.\n\nThe ticket cost is refunded to the Elron travel card without service charge only if the refund request is submitted through the Elron homepage refund form.", + "metadata": { + "category": "train_services", + "language": "en", + "source": "elron_refund_process", + }, + }, + { + "chunk_id": "test_doc_012_chunk_000", + "document_hash": "test_doc_012", + "chunk_index": 0, + "original_content": "If ticket refund is requested to a bank account, a service fee of 1 euro is deducted from the refundable amount.", + "context": "This chunk explains bank refund fees.", + "contextual_content": "This chunk explains bank refund fees.\n\nIf ticket refund is requested to a bank account, a service fee of 1 euro is deducted from the refundable amount.", + "metadata": { + "category": "train_services", + "language": "en", + "source": "elron_bank_refund", + }, + }, + { + "chunk_id": "test_doc_013_chunk_000", + "document_hash": "test_doc_013", + "chunk_index": 0, + "original_content": "Europe must act more jointly and in a more coordinated way to stop the spread of health-related misinformation, said Estonia's Minister of Social Affairs, Karmen Joller.", + "context": "This chunk contains a minister's statement on health misinformation.", + "contextual_content": "This chunk contains a minister's statement on health misinformation.\n\nEurope must act more jointly and in a more coordinated way to stop the spread of health-related misinformation, said Estonia's Minister of Social Affairs, Karmen Joller.", + "metadata": { + "category": "health_cooperation", + "language": "en", + "source": "minister_statement_eu", + }, + }, + { + "chunk_id": "test_doc_014_chunk_000", + "document_hash": "test_doc_014", + "chunk_index": 0, + "original_content": "Estonian Minister of Social Affairs Karmen Joller and Ukrainian Minister of Health Viktor Liashko today signed the next stage of a health cooperation agreement.", + "context": "This chunk announces a health cooperation agreement signing.", + "contextual_content": "This chunk announces a health cooperation agreement signing.\n\nEstonian Minister of Social Affairs Karmen Joller and Ukrainian Minister of Health Viktor Liashko today signed the next stage of a health cooperation agreement.", + "metadata": { + "category": "health_cooperation", + "language": "en", + "source": "ukraine_agreement", + }, + }, + { + "chunk_id": "test_doc_015_chunk_000", + "document_hash": "test_doc_015", + "chunk_index": 0, + "original_content": "The aim of the agreement is to reinforce health collaboration, support Ukraine's healthcare system recovery.", + "context": "This chunk describes health agreement objectives.", + "contextual_content": "This chunk describes health agreement objectives.\n\nThe aim of the agreement is to reinforce health collaboration, support Ukraine's healthcare system recovery.", + "metadata": { + "category": "health_cooperation", + "language": "en", + "source": "agreement_objectives", + }, + }, + ]