diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e8c5927 --- /dev/null +++ b/.env.example @@ -0,0 +1,9 @@ +# GPTKit Configuration +# Copy this file to .env and set your values +# DO NOT commit .env to version control! + +# Bearer token for API authentication (required in production) +GPTKIT_BEARER_TOKEN=your-secret-token-here + +# Disable authentication in development (set to 1, true, or yes) +# GPTKIT_DISABLE_AUTH=1 diff --git a/README.md b/README.md index 240b9f2..a3dda31 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,46 @@ GPTKit is a unified backend designed to provide tools via HTTP Actions for Custom GPTs. +## Authentication + +All endpoints require Bearer token authentication. The `GPTKIT_BEARER_TOKEN` environment variable **must** be set for the API to function (unless disabled in development mode). + +### Usage + +When calling the API, include the Bearer token in the `Authorization` header: + +```bash +curl -H "Authorization: Bearer your-token-here" \ + "https://gptkit.guillaumeduveau.com/domain/whois?domain=example.com" +``` + +### Configuration + +#### Production (Docker) + +Use a `.env` file with Docker Compose (see [Deployment](#deployment) section): + +```bash +# .env +GPTKIT_BEARER_TOKEN=your-secret-token-here +``` + +#### Local Development + +For local development, you can disable authentication: + +```bash +export GPTKIT_DISABLE_AUTH=1 +uvicorn app.main:app --reload +``` + +Or set the token normally: + +```bash +export GPTKIT_BEARER_TOKEN="your-secret-token-here" +uvicorn app.main:app --reload +``` + ## Tools ### WHOIS (`/domain/whois`) @@ -11,7 +51,7 @@ Allows checking domain name availability and retrieving WHOIS information. - **Endpoint**: `GET /domain/whois` - **Parameters**: - `domain` (required): The domain name to check (e.g., `google.com`). - - `force` (optional): `1` to force a fresh WHOIS lookup (ignores cache). + - `refresh` (optional): `1` to force a fresh WHOIS lookup (ignores cache). - **Features**: - Persistent cache (SQLite). - Rate limiting (global and per domain). @@ -32,6 +72,8 @@ services: restart: unless-stopped ports: - "8000:8000" + environment: + - GPTKIT_BEARER_TOKEN=${GPTKIT_BEARER_TOKEN} volumes: # Data persistence (WHOIS cache stored in /app/data/whois_cache.db) - gptkit_data:/app/data @@ -40,6 +82,17 @@ volumes: gptkit_data: ``` +Create a `.env` file in the same directory as `docker-compose.yml` (see `.env.example` for reference): + +```bash +# .env (do not commit this file!) +GPTKIT_BEARER_TOKEN=your-secret-token-here +``` + +Docker Compose will automatically load variables from the `.env` file or from the host environment. + +> **Security**: Never commit the `.env` file to version control. It's already in `.gitignore`. Copy `.env.example` to `.env` and set your values. + ## Development 1. **Installation**: @@ -55,7 +108,12 @@ volumes: - Quick API smoke test (curl): ```bash + # Without authentication (if GPTKIT_BEARER_TOKEN is not set) curl "http://localhost:8000/domain/whois?domain=example.com" + + # With authentication + curl -H "Authorization: Bearer your-token-here" \ + "http://localhost:8000/domain/whois?domain=example.com" ``` - Run the unit test suite with pytest (from the project root): diff --git a/app/auth.py b/app/auth.py new file mode 100644 index 0000000..6dfc6ab --- /dev/null +++ b/app/auth.py @@ -0,0 +1,57 @@ +from fastapi import Security, HTTPException, status +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from typing import Optional +import os +import logging + +logger = logging.getLogger(__name__) + +# HTTPBearer scheme for extracting Bearer token +security = HTTPBearer(auto_error=False) + +def get_bearer_token() -> Optional[str]: + """Get the expected Bearer token from environment variable.""" + # Allow disabling auth in local/dev mode + if os.getenv("GPTKIT_DISABLE_AUTH", "").lower() in ("1", "true", "yes"): + logger.warning("Authentication is DISABLED (GPTKIT_DISABLE_AUTH is set). Not recommended for production!") + return None + + token = os.getenv("GPTKIT_BEARER_TOKEN") + if not token: + raise ValueError( + "GPTKIT_BEARER_TOKEN environment variable must be set. " + "Authentication is required for all endpoints. " + "Set GPTKIT_DISABLE_AUTH=1 to disable auth in development." + ) + return token + +def verify_token(credentials: Optional[HTTPAuthorizationCredentials] = Security(security)) -> Optional[str]: + """ + Verify the Bearer token from the Authorization header. + + Raises HTTPException if token is invalid or missing (unless auth is disabled). + Returns the token if valid, or None if authentication is disabled. + """ + expected_token = get_bearer_token() + + # If auth is disabled, allow access + if expected_token is None: + return None + + if not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing Authorization header", + headers={"WWW-Authenticate": "Bearer"}, + ) + + if credentials.credentials != expected_token: + logger.warning(f"Invalid token attempt from client") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired token", + headers={"WWW-Authenticate": "Bearer"}, + ) + + return credentials.credentials + diff --git a/app/main.py b/app/main.py index cc5e285..9d46073 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ -from fastapi import FastAPI +from fastapi import FastAPI, Depends from app.routers import domain +from app.auth import verify_token import logging import logging.handlers import os @@ -25,8 +26,40 @@ version="1.0.0" ) +# Add security scheme to OpenAPI +from fastapi.openapi.utils import get_openapi + +def custom_openapi(): + if app.openapi_schema: + return app.openapi_schema + openapi_schema = get_openapi( + title=app.title, + version=app.version, + description=app.description, + routes=app.routes, + ) + # Add security scheme + openapi_schema["components"]["securitySchemes"] = { + "BearerAuth": { + "type": "http", + "scheme": "bearer", + "bearerFormat": "JWT", + "description": "Bearer token authentication. Set GPTKIT_BEARER_TOKEN environment variable." + } + } + # Apply security to all endpoints + for path in openapi_schema["paths"].values(): + for method in path.values(): + if isinstance(method, dict) and "security" not in method: + method["security"] = [{"BearerAuth": []}] + + app.openapi_schema = openapi_schema + return app.openapi_schema + +app.openapi = custom_openapi + app.include_router(domain.router) -@app.get("/") +@app.get("/", dependencies=[Depends(verify_token)]) async def root(): return {"message": "GPTKit is running"} diff --git a/app/routers/domain.py b/app/routers/domain.py index 84148ad..3a8166f 100644 --- a/app/routers/domain.py +++ b/app/routers/domain.py @@ -1,9 +1,10 @@ -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, HTTPException, Query, Depends from pydantic import BaseModel -from typing import Optional +from typing import Optional, Union from app.services.cache import WhoisCache from app.services.whois import WhoisService, parse_whois from app.services.rate_limiter import RateLimiter +from app.auth import verify_token import logging logger = logging.getLogger(__name__) @@ -16,24 +17,33 @@ whois_service = WhoisService() rate_limiter = RateLimiter() -class WhoisResponse(BaseModel): +class WhoisResponseMinimal(BaseModel): + """Format minimaliste par défaut - seulement les champs essentiels.""" + domain: str + available: bool + created_at: Optional[str] = None + +class WhoisResponseDetailed(BaseModel): + """Format détaillé avec toutes les clés développées incluant raw.""" domain: str checked_at: str tld: str available: bool - pendingDelete: bool = False - redemptionPeriod: bool = False + pending_delete: bool = False + redemption_period: bool = False statut: Optional[str] = None - creation_date: Optional[str] = None + created_at: Optional[str] = None registrar: Optional[str] = None - # raw is intentionally omitted from the public response + raw: str # raw WHOIS data (inclus seulement avec details=1) -@router.get("/whois", response_model=WhoisResponse) +@router.get("/whois", response_model=Union[WhoisResponseMinimal, WhoisResponseDetailed]) async def get_whois( domain: str = Query(..., description="Domain name to check"), - force: int = Query(0, description="Force fresh lookup (1 to force)") + refresh: int = Query(0, description="Force fresh lookup (1 to refresh)"), + details: int = Query(0, description="Return detailed format with all keys including raw (1 for details)"), + token: str = Depends(verify_token) ): - logger.info(f"get_whois called for domain={domain}, force={force}") + logger.info(f"get_whois called for domain={domain}, refresh={refresh}, details={details}") # 1. Validation if "." not in domain: raise HTTPException( @@ -48,16 +58,17 @@ async def get_whois( # 2. Cache # parser is provided by app.services.whois.parse_whois - if force != 1: + if refresh != 1: cached_data = cache.get(domain) if cached_data: # Prefer parsed fields persisted in DB. Only fallback to parsing raw if fields are missing. + # Support both old and new column names for migration compatibility parsed = { "statut": cached_data.get("statut"), - "creation_date": cached_data.get("creation_date"), + "created_at": cached_data.get("created_at") or cached_data.get("creation_date"), "registrar": cached_data.get("registrar"), - "pendingDelete": cached_data.get("pendingDelete"), - "redemptionPeriod": cached_data.get("redemptionPeriod"), + "pending_delete": cached_data.get("pending_delete") or cached_data.get("pendingDelete"), + "redemption_period": cached_data.get("redemption_period") or cached_data.get("redemptionPeriod"), } # If any key is missing/None, parse raw as fallback if not any(v is not None for v in parsed.values()): @@ -65,23 +76,43 @@ async def get_whois( else: # ensure booleans normalized (could be stored as 0/1) try: - parsed["pendingDelete"] = bool(int(parsed["pendingDelete"])) if parsed["pendingDelete"] is not None else False + parsed["pending_delete"] = bool(int(parsed["pending_delete"])) if parsed["pending_delete"] is not None else False except Exception: - parsed["pendingDelete"] = bool(parsed.get("pendingDelete")) + parsed["pending_delete"] = bool(parsed.get("pending_delete")) try: - parsed["redemptionPeriod"] = bool(int(parsed["redemptionPeriod"])) if parsed["redemptionPeriod"] is not None else False + parsed["redemption_period"] = bool(int(parsed["redemption_period"])) if parsed["redemption_period"] is not None else False except Exception: - parsed["redemptionPeriod"] = bool(parsed.get("redemptionPeriod")) - # do not expose raw in responses - cached_data.pop("raw", None) + parsed["redemption_period"] = bool(parsed.get("redemption_period")) # inject parsed fields so response_model includes them cached_data.update(parsed) - # ensure coherence: if pendingDelete or redemptionPeriod, available must be False - if cached_data.get("pendingDelete") or cached_data.get("redemptionPeriod"): + # ensure coherence: if pending_delete or redemption_period, available must be False + if cached_data.get("pending_delete") or cached_data.get("redemption_period"): cached_data["available"] = False - return cached_data + + # Projection dynamique selon details + if details != 1: + # Format minimaliste - seulement domain, available, created_at + return WhoisResponseMinimal( + domain=cached_data["domain"], + available=cached_data["available"], + created_at=cached_data.get("created_at") + ) + else: + # Format détaillé avec raw + return WhoisResponseDetailed( + domain=cached_data["domain"], + checked_at=cached_data["checked_at"], + tld=cached_data["tld"], + available=cached_data["available"], + pending_delete=cached_data.get("pending_delete", False), + redemption_period=cached_data.get("redemption_period", False), + statut=cached_data.get("statut"), + created_at=cached_data.get("created_at"), + registrar=cached_data.get("registrar"), + raw=cached_data.get("raw", "") + ) - logger.debug(f"Cache miss or force=1, performing lookup for {domain}") + logger.debug(f"Cache miss or refresh=1, performing lookup for {domain}") # 3. Rate Limiting if not rate_limiter.check(domain): @@ -112,29 +143,49 @@ async def get_whois( if not cached_data: raise HTTPException(status_code=500, detail="Failed to retrieve data from cache after save") # Prefer parsed fields persisted in DB. Only fallback to parsing raw if fields are missing. + # Support both old and new column names for migration compatibility parsed = { "statut": cached_data.get("statut"), - "creation_date": cached_data.get("creation_date"), + "created_at": cached_data.get("created_at") or cached_data.get("creation_date"), "registrar": cached_data.get("registrar"), - "pendingDelete": cached_data.get("pendingDelete"), - "redemptionPeriod": cached_data.get("redemptionPeriod"), + "pending_delete": cached_data.get("pending_delete") or cached_data.get("pendingDelete"), + "redemption_period": cached_data.get("redemption_period") or cached_data.get("redemptionPeriod"), } if not any(v is not None for v in parsed.values()): parsed = parse_whois(cached_data.get("raw"), tld) else: try: - parsed["pendingDelete"] = bool(int(parsed["pendingDelete"])) if parsed["pendingDelete"] is not None else False + parsed["pending_delete"] = bool(int(parsed["pending_delete"])) if parsed["pending_delete"] is not None else False except Exception: - parsed["pendingDelete"] = bool(parsed.get("pendingDelete")) + parsed["pending_delete"] = bool(parsed.get("pending_delete")) try: - parsed["redemptionPeriod"] = bool(int(parsed["redemptionPeriod"])) if parsed["redemptionPeriod"] is not None else False + parsed["redemption_period"] = bool(int(parsed["redemption_period"])) if parsed["redemption_period"] is not None else False except Exception: - parsed["redemptionPeriod"] = bool(parsed.get("redemptionPeriod")) - cached_data.pop("raw", None) + parsed["redemption_period"] = bool(parsed.get("redemption_period")) cached_data.update(parsed) - # ensure coherence: if pendingDelete or redemptionPeriod, available must be False - if cached_data.get("pendingDelete") or cached_data.get("redemptionPeriod"): + # ensure coherence: if pending_delete or redemption_period, available must be False + if cached_data.get("pending_delete") or cached_data.get("redemption_period"): cached_data["available"] = False - return cached_data - cached_data.update(parsed) - return cached_data + + # Projection dynamique selon details + if details != 1: + # Format minimaliste - seulement domain, available, created_at + return WhoisResponseMinimal( + domain=cached_data["domain"], + available=cached_data["available"], + created_at=cached_data.get("created_at") + ) + else: + # Format détaillé avec raw + return WhoisResponseDetailed( + domain=cached_data["domain"], + checked_at=cached_data["checked_at"], + tld=cached_data["tld"], + available=cached_data["available"], + pending_delete=cached_data.get("pending_delete", False), + redemption_period=cached_data.get("redemption_period", False), + statut=cached_data.get("statut"), + created_at=cached_data.get("created_at"), + registrar=cached_data.get("registrar"), + raw=cached_data.get("raw", "") + ) diff --git a/app/services/cache.py b/app/services/cache.py index ed6e4cd..45c90bf 100644 --- a/app/services/cache.py +++ b/app/services/cache.py @@ -29,10 +29,10 @@ def _init_db(self): checked_at TEXT, raw TEXT, statut TEXT, - creation_date TEXT, + created_at TEXT, registrar TEXT, - pendingDelete BOOLEAN, - redemptionPeriod BOOLEAN + pending_delete BOOLEAN, + redemption_period BOOLEAN ) """) @@ -67,13 +67,13 @@ def set(self, domain: str, tld: str, available: bool, raw: str): from app.services.whois import parse_whois parsed = parse_whois(raw, tld) except Exception: - parsed = {"statut": None, "creation_date": None, "registrar": None, "pendingDelete": False, "redemptionPeriod": False} + parsed = {"statut": None, "created_at": None, "registrar": None, "pending_delete": False, "redemption_period": False} try: with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO whois_cache - (domain, tld, available, checked_at, raw, statut, creation_date, registrar, pendingDelete, redemptionPeriod) + (domain, tld, available, checked_at, raw, statut, created_at, registrar, pending_delete, redemption_period) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( @@ -83,28 +83,30 @@ def set(self, domain: str, tld: str, available: bool, raw: str): checked_at, raw, parsed.get("statut"), - parsed.get("creation_date"), + parsed.get("created_at"), parsed.get("registrar"), - int(bool(parsed.get("pendingDelete"))), - int(bool(parsed.get("redemptionPeriod"))) + int(bool(parsed.get("pending_delete"))), + int(bool(parsed.get("redemption_period"))) )) logger.debug(f"Cache SET for domain: {domain} (checked_at: {checked_at})") except sqlite3.Error as e: logger.error(f"Cache error on set({domain}): {e}") def _migrate_if_needed(self): - """Detect missing expected columns, add them, and backfill parsed fields from raw.""" + """Detect missing expected columns, add them, migrate old column names to new ones, and backfill parsed fields from raw.""" EXPECTED = { "statut": "TEXT", - "creation_date": "TEXT", + "created_at": "TEXT", "registrar": "TEXT", - "pendingDelete": "BOOLEAN", - "redemptionPeriod": "BOOLEAN", + "pending_delete": "BOOLEAN", + "redemption_period": "BOOLEAN", } try: with sqlite3.connect(self.db_path) as conn: cur = conn.execute("PRAGMA table_info('whois_cache')") existing = {row[1] for row in cur.fetchall()} # column names + + # Add new columns if they don't exist to_add = [(n, t) for n, t in EXPECTED.items() if n not in existing] if to_add: logger.info(f"Cache migration: adding columns: {[n for n, _ in to_add]}") @@ -115,8 +117,30 @@ def _migrate_if_needed(self): logger.exception(f"Failed to add column {name}; continuing") conn.commit() + # Migrate old column names to new ones if old columns exist + old_to_new = { + "creation_date": "created_at", + "pendingDelete": "pending_delete", + "redemptionPeriod": "redemption_period", + } + + for old_col, new_col in old_to_new.items(): + if old_col in existing and new_col in existing: + # Copy data from old column to new column where new column is NULL + try: + result = conn.execute(f""" + UPDATE whois_cache + SET {new_col} = {old_col} + WHERE {new_col} IS NULL AND {old_col} IS NOT NULL + """) + if result.rowcount > 0: + logger.info(f"Cache migration: migrated {result.rowcount} rows from {old_col} to {new_col}") + conn.commit() + except sqlite3.Error: + logger.exception(f"Failed to migrate {old_col} to {new_col}; continuing") + # Backfill parsed fields for rows where raw is present and parsed columns are NULL/empty - sel = "SELECT domain, raw, tld FROM whois_cache WHERE raw IS NOT NULL AND (statut IS NULL OR creation_date IS NULL OR registrar IS NULL OR pendingDelete IS NULL OR redemptionPeriod IS NULL)" + sel = "SELECT domain, raw, tld FROM whois_cache WHERE raw IS NOT NULL AND (statut IS NULL OR created_at IS NULL OR registrar IS NULL OR pending_delete IS NULL OR redemption_period IS NULL)" rows = conn.execute(sel).fetchall() if rows: logger.info(f"Cache migration: backfilling parsed fields for {len(rows)} rows") @@ -127,17 +151,17 @@ def _migrate_if_needed(self): logger.exception("Could not import parse_whois for migration; skipping backfill") return - upd = "UPDATE whois_cache SET statut = ?, creation_date = ?, registrar = ?, pendingDelete = ?, redemptionPeriod = ? WHERE domain = ?" + upd = "UPDATE whois_cache SET statut = ?, created_at = ?, registrar = ?, pending_delete = ?, redemption_period = ? WHERE domain = ?" updated = 0 for domain, raw, tld in rows: try: parsed = parse_whois(raw, tld) conn.execute(upd, ( parsed.get("statut"), - parsed.get("creation_date"), + parsed.get("created_at"), parsed.get("registrar"), - int(bool(parsed.get("pendingDelete"))), - int(bool(parsed.get("redemptionPeriod"))), + int(bool(parsed.get("pending_delete"))), + int(bool(parsed.get("redemption_period"))), domain, )) updated += 1 diff --git a/app/services/whois.py b/app/services/whois.py index 280a0cd..73cfcec 100644 --- a/app/services/whois.py +++ b/app/services/whois.py @@ -9,9 +9,19 @@ def __init__(self, timeout: int = 5): def lookup(self, domain: str) -> str: try: - # Using -H to suppress legal disclaimers if possible, but standard whois usually just works + # Extract TLD to determine the appropriate WHOIS server + parts = domain.split(".") + tld = parts[-1].lower() if parts else "" + + # Use appropriate WHOIS server based on TLD + if tld == "fr": + whois_server = "whois.afnic.fr" + else: + # Default to Verisign for .com, .net, and other common TLDs + whois_server = "whois.verisign-grs.com" + result = subprocess.run( - ["whois", "-h", "whois.verisign-grs.com", domain], + ["whois", "-h", whois_server, domain], capture_output=True, text=True, timeout=self.timeout @@ -40,6 +50,7 @@ def is_available(self, raw_output: str, tld: str) -> bool: not_found_patterns = [ "no match", "not found", + "%% not found", # AFNIC format for .fr domains "no entries found", "status: free", "nothing found", @@ -56,27 +67,27 @@ def is_available(self, raw_output: str, tld: str) -> bool: def parse_whois(raw: str, tld: str): - """Extract statut, creation_date, registrar, pendingDelete, redemptionPeriod for all TLDs. + """Extract statut, created_at, registrar, pending_delete, redemption_period for all TLDs. Heuristic parser reused across the app and migration scripts. """ if not raw: return { "statut": None, - "creation_date": None, + "created_at": None, "registrar": None, - "pendingDelete": False, - "redemptionPeriod": False, + "pending_delete": False, + "redemption_period": False, } raw_lines = [l.strip() for l in raw.splitlines() if l.strip()] lower = raw.lower() statut = None - creation_date = None + created_at = None registrar = None - pendingDelete = False - redemptionPeriod = False + pending_delete = False + redemption_period = False import re @@ -90,10 +101,10 @@ def parse_whois(raw: str, tld: str): registrar = parts[1].strip() continue # Creation date - if creation_date is None and ("creation date" in l or "created on" in l or "created:" in l or "creation:" in l or "registered on" in l): + if created_at is None and ("creation date" in l or "created on" in l or "created:" in l or "creation:" in l or "registered on" in l): parts = line.split(":", 1) if len(parts) == 2: - creation_date = parts[1].strip() + created_at = parts[1].strip() continue # Status lines (can have multiple) if "status:" in l or l.startswith("domain status"): @@ -101,11 +112,11 @@ def parse_whois(raw: str, tld: str): parts = line.split(":", 1) if len(parts) == 2: statut = parts[1].strip() - # Check for pendingDelete and redemptionPeriod in any status line + # Check for pending_delete and redemption_period in any status line if "pendingdelete" in l: - pendingDelete = True + pending_delete = True if "redemptionperiod" in l: - redemptionPeriod = True + redemption_period = True continue # Fallback regex for Registrar lines like 'Registrar Name' without colon @@ -116,8 +127,8 @@ def parse_whois(raw: str, tld: str): return { "statut": statut, - "creation_date": creation_date, + "created_at": created_at, "registrar": registrar, - "pendingDelete": pendingDelete, - "redemptionPeriod": redemptionPeriod, + "pending_delete": pending_delete, + "redemption_period": redemption_period, } diff --git a/gptkit-whois-openapi.json b/gptkit-whois-openapi.json index b277c3e..9fc87ad 100644 --- a/gptkit-whois-openapi.json +++ b/gptkit-whois-openapi.json @@ -9,12 +9,28 @@ "url": "https://gptkit.guillaumeduveau.com" } ], + "components": { + "schemas": {}, + "securitySchemes": { + "BearerAuth": { + "type": "http", + "scheme": "bearer", + "bearerFormat": "JWT", + "description": "Bearer token authentication. Set GPTKIT_BEARER_TOKEN environment variable." + } + } + }, "paths": { "/domain/whois": { "get": { "operationId": "whoisLookup", "summary": "WHOIS lookup for a domain", "description": "Check WHOIS information and availability status for a single domain.", + "security": [ + { + "BearerAuth": [] + } + ], "parameters": [ { "name": "domain", @@ -26,7 +42,7 @@ } }, { - "name": "force", + "name": "refresh", "in": "query", "required": false, "description": "If 1, bypass cache and force a fresh WHOIS lookup. Default is 0.", @@ -35,27 +51,70 @@ "enum": [0, 1], "default": 0 } + }, + { + "name": "details", + "in": "query", + "required": false, + "description": "If 1, return detailed format with all keys including raw. Default is 0 (minimalist format).", + "schema": { + "type": "integer", + "enum": [0, 1], + "default": 0 + } } ], "responses": { "200": { - "description": "Successful WHOIS lookup", + "description": "Successful WHOIS lookup. Format depends on the 'details' parameter: minimalist format (default) or detailed format with all keys including raw.", + "content": { + "application/json": { + "schema": { + "oneOf": [ + { + "type": "object", + "title": "MinimalistFormat", + "description": "Minimalist format returned by default (when details=0 or not provided)", + "properties": { + "domain": { "type": "string", "description": "Domain name" }, + "available": { "type": "boolean", "description": "Availability status" }, + "created_at": { "type": "string", "nullable": true, "description": "Domain creation date, null if not available" } + }, + "required": ["domain", "available"] + }, + { + "type": "object", + "title": "DetailedFormat", + "description": "Detailed format returned when details=1, includes all keys and raw WHOIS data", + "properties": { + "domain": { "type": "string" }, + "checked_at": { "type": "string", "format": "date-time" }, + "tld": { "type": "string" }, + "available": { "type": "boolean" }, + "pending_delete": { "type": "boolean" }, + "redemption_period": { "type": "boolean" }, + "statut": { "type": "string", "nullable": true }, + "created_at": { "type": "string", "nullable": true }, + "registrar": { "type": "string", "nullable": true }, + "raw": { "type": "string", "description": "Raw WHOIS data" } + }, + "required": ["domain", "checked_at", "tld", "available", "pending_delete", "redemption_period", "raw"] + } + ] + } + } + } + }, + "401": { + "description": "Unauthorized - Missing or invalid Bearer token", "content": { "application/json": { "schema": { "type": "object", "properties": { - "domain": { "type": "string" }, - "checked_at": { "type": "string", "format": "date-time" }, - "tld": { "type": "string" }, - "available": { "type": "boolean" }, - "pendingDelete": { "type": "boolean" }, - "redemptionPeriod": { "type": "boolean" }, - "statut": { "type": "string", "nullable": true }, - "creation_date": { "type": "string", "nullable": true }, - "registrar": { "type": "string", "nullable": true } + "detail": { "type": "string", "example": "Invalid or expired token" } }, - "required": ["domain", "checked_at", "tld", "available", "pendingDelete", "redemptionPeriod"] + "required": ["detail"] } } } diff --git a/pyproject.toml b/pyproject.toml index 10499fd..11a6e33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ [project.optional-dependencies] dev = [ "pytest", + "httpx", ] [tool.setuptools] diff --git a/tests/data/whois-argent.fr b/tests/data/whois-argent.fr new file mode 100644 index 0000000..1eefb5e --- /dev/null +++ b/tests/data/whois-argent.fr @@ -0,0 +1,84 @@ +%% +%% This is the AFNIC Whois server. +%% +%% complete date format: YYYY-MM-DDThh:mm:ssZ +%% +%% Rights restricted by copyright. +%% See https://www.afnic.fr/en/domain-names-and-support/everything-there-is-to-know-about-domain-names/find-a-domain-name-or-a-holder-using-whois/ +%% +%% + +domain: argent.fr +status: ACTIVE +eppstatus: active +hold: NO +holder-c: CTC3123230-FRNIC +admin-c: OVH5-FRNIC +tech-c: OVH5-FRNIC +registrar: OVH +Expiry Date: 2027-09-24T10:33:48Z +created: 2000-07-10T22:00:00Z +last-update: 2025-09-27T17:53:37.502878Z +source: FRNIC + +nserver: dns100.ovh.net +nserver: ns100.ovh.net +source: FRNIC + +key1-tag: 2189 +key1-algo: 8 [RSASHA256] +key1-dgst-t: 2 [SHA256] +key1-dgst: 13B113064C8242C79E970578BAEED313EED350963FE5EBBE1E34EFAB18DC9DD8 +source: FRNIC + +registrar: OVH +address: 2 Rue Kellermann +address: 59100 ROUBAIX +country: FR +phone: +33.899701761 +fax-no: +33.320200958 +e-mail: support@ovh.net +website: http://www.ovh.com +anonymous: No +registered: 1999-10-18T00:00:00Z +source: FRNIC + +nic-hdl: CTC3123230-FRNIC +type: ORGANIZATION +contact: LIBRAIRIE GALERIE LES CHEVAU LEGERS +address: LIBRAIRIE GALERIE LES CHEVAU LEGERS +address: 36 rue Vivienne +address: 75002 PARIS +address: IDF +country: FR +phone: +33.140264297 +e-mail: b83a0e65-72d0-4214-861f-ac5d72c7994a@s.o-w-o.info +registrar: OVH +anonymous: NO +obsoleted: NO +eppstatus: associated +eppstatus: active +eligstatus: not identified +reachstatus: not identified +source: FRNIC + +nic-hdl: OVH5-FRNIC +type: ORGANIZATION +contact: OVH NET +address: OVH +address: 140, quai du Sartel +address: 59100 Roubaix +country: FR +phone: +33.899701761 +e-mail: tech@ovh.net +registrar: OVH +changed: 2025-12-03T15:04:31.901686Z +anonymous: NO +obsoleted: NO +eppstatus: associated +eppstatus: active +eligstatus: not identified +reachstatus: not identified +source: FRNIC + +>>> Last update of WHOIS database: 2025-12-03T15:11:45.202241Z <<< \ No newline at end of file diff --git a/tests/data/whois-nodomain.fr b/tests/data/whois-nodomain.fr new file mode 100644 index 0000000..311965b --- /dev/null +++ b/tests/data/whois-nodomain.fr @@ -0,0 +1,13 @@ +%% +%% This is the AFNIC Whois server. +%% +%% complete date format: YYYY-MM-DDThh:mm:ssZ +%% +%% Rights restricted by copyright. +%% See https://www.afnic.fr/en/domain-names-and-support/everything-there-is-to-know-about-domain-names/find-a-domain-name-or-a-holder-using-whois/ +%% +%% + +%% NOT FOUND + +>>> Last update of WHOIS database: 2025-12-05T14:44:54.540787Z <<< \ No newline at end of file diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..c6d156a --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,239 @@ +import os +import pytest +from fastapi.testclient import TestClient +from app.main import app +from app.services.cache import WhoisCache +import tempfile +import shutil + +# Mock the cache to use a temporary database for tests +@pytest.fixture +def temp_cache(): + """Create a temporary cache database for testing.""" + temp_dir = tempfile.mkdtemp() + db_path = os.path.join(temp_dir, "test_whois_cache.db") + cache = WhoisCache(db_path=db_path) + yield cache + shutil.rmtree(temp_dir) + +@pytest.fixture +def client(temp_cache, monkeypatch): + """Create a test client with mocked cache and disabled auth.""" + # Disable authentication for tests + monkeypatch.setenv("GPTKIT_DISABLE_AUTH", "1") + + # Mock the cache instance + from app.routers import domain + original_cache = domain.cache + domain.cache = temp_cache + + client = TestClient(app) + yield client + + # Restore original cache + domain.cache = original_cache + +@pytest.fixture +def client_with_auth(temp_cache, monkeypatch): + """Create a test client with authentication enabled.""" + monkeypatch.setenv("GPTKIT_BEARER_TOKEN", "test-token-123") + monkeypatch.delenv("GPTKIT_DISABLE_AUTH", raising=False) + + # Mock the cache instance + from app.routers import domain + original_cache = domain.cache + domain.cache = temp_cache + + client = TestClient(app) + yield client + + # Restore original cache + domain.cache = original_cache + +def test_root_endpoint(client): + """Test the root endpoint.""" + response = client.get("/") + assert response.status_code == 200 + assert response.json() == {"message": "GPTKit is running"} + +def test_whois_invalid_domain(client): + """Test WHOIS endpoint with invalid domain.""" + response = client.get("/domain/whois?domain=invalid") + assert response.status_code == 400 + assert "error" in response.json()["detail"] + assert response.json()["detail"]["error"] == "invalid_domain" + +def test_whois_authentication_required(client_with_auth): + """Test that authentication is required when token is set.""" + # Request without token + response = client_with_auth.get("/domain/whois?domain=example.com") + assert response.status_code == 401 + assert "detail" in response.json() + + # Request with invalid token + response = client_with_auth.get( + "/domain/whois?domain=example.com", + headers={"Authorization": "Bearer wrong-token"} + ) + assert response.status_code == 401 + + # Request with valid token + response = client_with_auth.get( + "/domain/whois?domain=example.com", + headers={"Authorization": "Bearer test-token-123"} + ) + # Should not be 401 (might be 500 if whois lookup fails, but auth should pass) + assert response.status_code != 401 + +def test_whois_minimal_format(client, monkeypatch): + """Test WHOIS endpoint returns minimal format by default.""" + # Mock whois lookup to avoid actual network calls + from app.services.whois import WhoisService + from app.routers import domain + + class MockWhoisService: + def lookup(self, domain): + return "Domain Name: example.com\nCreation Date: 2020-01-01T00:00:00Z\nRegistrar: Test Registrar" + + def is_available(self, raw, tld): + return False + + original_service = domain.whois_service + domain.whois_service = MockWhoisService() + + try: + response = client.get("/domain/whois?domain=example.com") + assert response.status_code == 200 + data = response.json() + + # Check minimal format structure + assert "domain" in data + assert "available" in data + assert "created_at" in data + # Should NOT have detailed fields + assert "tld" not in data + assert "checked_at" not in data + assert "raw" not in data + assert "registrar" not in data + finally: + domain.whois_service = original_service + +def test_whois_detailed_format(client, monkeypatch): + """Test WHOIS endpoint returns detailed format with details=1.""" + # Mock whois lookup + from app.services.whois import WhoisService + from app.routers import domain + + class MockWhoisService: + def lookup(self, domain): + return "Domain Name: example.com\nCreation Date: 2020-01-01T00:00:00Z\nRegistrar: Test Registrar" + + def is_available(self, raw, tld): + return False + + original_service = domain.whois_service + domain.whois_service = MockWhoisService() + + try: + response = client.get("/domain/whois?domain=example.com&details=1") + assert response.status_code == 200 + data = response.json() + + # Check detailed format structure + assert "domain" in data + assert "available" in data + assert "created_at" in data + assert "tld" in data + assert "checked_at" in data + assert "raw" in data + assert "registrar" in data + assert "pending_delete" in data + assert "redemption_period" in data + finally: + domain.whois_service = original_service + +def test_whois_refresh_parameter(client, monkeypatch): + """Test that refresh=1 forces a fresh lookup.""" + from app.services.whois import WhoisService + from app.routers import domain + + lookup_called = [] + + class MockWhoisService: + def lookup(self, domain): + lookup_called.append(domain) + return "Domain Name: example.com\nCreation Date: 2020-01-01T00:00:00Z" + + def is_available(self, raw, tld): + return False + + original_service = domain.whois_service + domain.whois_service = MockWhoisService() + + try: + # First request - should cache + response1 = client.get("/domain/whois?domain=example.com") + assert response1.status_code == 200 + assert len(lookup_called) == 1 + + # Second request - should use cache + response2 = client.get("/domain/whois?domain=example.com") + assert response2.status_code == 200 + assert len(lookup_called) == 1 # Still 1, cache used + + # Third request with refresh=1 - should force lookup + response3 = client.get("/domain/whois?domain=example.com&refresh=1") + assert response3.status_code == 200 + assert len(lookup_called) == 2 # Now 2, fresh lookup + finally: + domain.whois_service = original_service + +def test_whois_cache_hit_format(client, monkeypatch): + """Test that cached data returns correct format.""" + from app.routers import domain + + # Pre-populate cache + domain.cache.set( + "cached.com", + "com", + False, + "Domain Name: cached.com\nCreation Date: 2021-01-01T00:00:00Z\nRegistrar: Test" + ) + + # Request should use cache + response = client.get("/domain/whois?domain=cached.com") + assert response.status_code == 200 + data = response.json() + + # Should be minimal format + assert "domain" in data + assert "available" in data + assert data["domain"] == "cached.com" + assert data["available"] == False + +def test_whois_cache_hit_detailed_format(client, monkeypatch): + """Test that cached data returns detailed format with details=1.""" + from app.routers import domain + + # Pre-populate cache + domain.cache.set( + "cached.com", + "com", + False, + "Domain Name: cached.com\nCreation Date: 2021-01-01T00:00:00Z\nRegistrar: Test" + ) + + # Request should use cache with details=1 + response = client.get("/domain/whois?domain=cached.com&details=1") + assert response.status_code == 200 + data = response.json() + + # Should be detailed format + assert "domain" in data + assert "available" in data + assert "raw" in data + assert "tld" in data + assert data["domain"] == "cached.com" + assert data["tld"] == "com" + assert "cached.com" in data["raw"] + diff --git a/tests/test_cache_persistence.py b/tests/test_cache_persistence.py index 68c7faa..b9772eb 100644 --- a/tests/test_cache_persistence.py +++ b/tests/test_cache_persistence.py @@ -21,7 +21,7 @@ def test_cache_persistence(tmp_path): assert entry is not None # parsed fields should be present and match expectations assert entry.get("registrar") == "OVH sas" - assert entry.get("creation_date") == "2002-05-13T18:12:06Z" - assert entry.get("pendingDelete") in (0, 1, False, True) + assert entry.get("created_at") == "2002-05-13T18:12:06Z" + assert entry.get("pending_delete") in (0, 1, False, True) # Normalize to boolean check - assert bool(int(entry.get("pendingDelete"))) is False + assert bool(int(entry.get("pending_delete"))) is False diff --git a/tests/test_whois_parsing.py b/tests/test_whois_parsing.py index 0c5f717..6db5e60 100644 --- a/tests/test_whois_parsing.py +++ b/tests/test_whois_parsing.py @@ -1,6 +1,6 @@ import os import pytest -from app.services.whois import parse_whois +from app.services.whois import parse_whois, WhoisService def test_parse_whois_cadeaux_com(): path = os.path.join(os.path.dirname(__file__), "data", "whois-cadeaux.com") @@ -9,10 +9,10 @@ def test_parse_whois_cadeaux_com(): tld = "com" result = parse_whois(raw, tld) assert result["statut"] is not None, f"statut should not be None, got {result['statut']}" - assert result["creation_date"] == "2002-05-13T18:12:06Z" + assert result["created_at"] == "2002-05-13T18:12:06Z" assert result["registrar"] == "OVH sas" - assert result["pendingDelete"] == False - assert result["redemptionPeriod"] == False + assert result["pending_delete"] == False + assert result["redemption_period"] == False def test_parse_whois_assiste_com(): path = os.path.join(os.path.dirname(__file__), "data", "whois-assiste.com") @@ -21,7 +21,35 @@ def test_parse_whois_assiste_com(): tld = "com" result = parse_whois(raw, tld) assert result["statut"] is not None, f"statut should not be None, got {result['statut']}" - assert result["creation_date"] == "2003-09-15T11:32:57Z" + assert result["created_at"] == "2003-09-15T11:32:57Z" assert result["registrar"] == "Gandi SAS" - assert result["pendingDelete"] == True, f"pendingDelete should be True, got {result['pendingDelete']}" - assert result["redemptionPeriod"] == False + assert result["pending_delete"] == True, f"pending_delete should be True, got {result['pending_delete']}" + assert result["redemption_period"] == False + +def test_parse_whois_argent_fr(): + path = os.path.join(os.path.dirname(__file__), "data", "whois-argent.fr") + with open(path, encoding="utf-8") as f: + raw = f.read() + tld = "fr" + result = parse_whois(raw, tld) + assert result["statut"] == "ACTIVE", f"statut should be ACTIVE, got {result['statut']}" + assert result["created_at"] == "2000-07-10T22:00:00Z" + assert result["registrar"] == "OVH" + assert result["pending_delete"] == False + assert result["redemption_period"] == False + +def test_is_available_nodomain_fr(): + """Test that nodomain.fr is detected as available (NOT FOUND).""" + path = os.path.join(os.path.dirname(__file__), "data", "whois-nodomain.fr") + with open(path, encoding="utf-8") as f: + raw = f.read() + service = WhoisService() + assert service.is_available(raw, "fr") == True, "nodomain.fr should be detected as available" + +def test_is_available_argent_fr(): + """Test that argent.fr is detected as not available (exists).""" + path = os.path.join(os.path.dirname(__file__), "data", "whois-argent.fr") + with open(path, encoding="utf-8") as f: + raw = f.read() + service = WhoisService() + assert service.is_available(raw, "fr") == False, "argent.fr should be detected as not available"