diff --git a/.github/workflows/test-build.yml b/.github/workflows/test-build.yml new file mode 100644 index 0000000..28e98d4 --- /dev/null +++ b/.github/workflows/test-build.yml @@ -0,0 +1,54 @@ +# Created by GitHub Copilot CLI on 2026-02-06 +name: Test Build + +on: + workflow_dispatch: + +jobs: + init: + name: Initialize build + runs-on: ubuntu-latest + outputs: + architectures: ${{ steps.info.outputs.architectures }} + steps: + - name: Checkout the repository + uses: actions/checkout@v4 + - name: Get information + id: info + uses: home-assistant/actions/helpers/info@master + + test-build: + name: Test build for ${{ matrix.architecture }} + needs: init + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + architecture: ${{ fromJson(needs.init.outputs.architectures) }} + steps: + - name: Checkout the repository + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Get base image from build.yaml + id: image + shell: bash + run: | + if [ "${{ matrix.architecture }}" = "amd64" ]; then + BUILD_FROM=$(grep -A 2 "^build_from:" build.yaml | grep "amd64:" | awk '{print $2}' | tr -d '"') + elif [ "${{ matrix.architecture }}" = "aarch64" ]; then + BUILD_FROM=$(grep -A 2 "^build_from:" build.yaml | grep "aarch64:" | awk '{print $2}' | tr -d '"') + fi + echo "BUILD_FROM=$BUILD_FROM" >> $GITHUB_OUTPUT + - name: Build image for ${{ matrix.architecture }} + uses: docker/build-push-action@v4 + with: + context: . + file: ./Dockerfile + platforms: ${{ matrix.architecture == 'amd64' && 'linux/amd64' || 'linux/arm64' }} + build-args: | + BUILD_FROM=${{ steps.image.outputs.BUILD_FROM }} + push: false + tags: homedetector:test-${{ matrix.architecture }} diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml new file mode 100644 index 0000000..e385701 --- /dev/null +++ b/.github/workflows/test-python.yml @@ -0,0 +1,49 @@ +# Created by GitHub Copilot CLI on 2026-02-06 +name: Python Tests + +on: + workflow_dispatch: + +jobs: + test: + name: Run Python tests with tox + runs-on: ubuntu-latest + steps: + - name: Checkout the repository + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.13' + - name: Install tox + run: pip install tox + - name: Run tox tests + run: tox + - name: Upload test reports + if: always() + uses: actions/upload-artifact@v3 + with: + name: test-reports + path: | + tests/admin/report.md + tests/admin/coverage.md + tests/dns/report.md + tests/dns/coverage.md + retention-days: 30 + - name: Display test results + if: always() + shell: bash + run: | + echo "## Admin Tests" + echo "### Report" + cat tests/admin/report.md + echo "" + echo "### Coverage" + cat tests/admin/coverage.md + echo "" + echo "## DNS Tests" + echo "### Report" + cat tests/dns/report.md + echo "" + echo "### Coverage" + cat tests/dns/coverage.md diff --git a/.gitignore b/.gitignore index aca27a8..cbd901c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Custom Project Ignores *.db* +# For now, let's ignore VSCode settings +.vscode/ + # Opencanry twistd.pid opencanary.log @@ -57,6 +60,8 @@ coverage.xml .hypothesis/ .pytest_cache/ cover/ +coverage.md +tests/*/report.md # Translations *.mo diff --git a/Agents.md b/AGENTS.md similarity index 70% rename from Agents.md rename to AGENTS.md index 49aa0d6..f651dc9 100644 --- a/Agents.md +++ b/AGENTS.md @@ -1,6 +1,7 @@ # Agent Instructions for HomeDetector + This document provides guidance for AI agents on how to understand, modify, and contribute to the HomeDetector project. @@ -50,10 +51,21 @@ The system is composed of three main Python-based components that run concurrent ## Development Workflow +### Virtual Environment + +This project uses `uv` to manage the Python virtual environment. All `python` and `pip` commands MUST be executed using the project's virtual environment located in the `.venv` directory. This ensures consistency and avoids conflicts with system-wide packages. + +For example, use `.venv/bin/python` and `.venv/bin/pip`. + ### Setup 1. The application is designed to run in a Home Assistant environment or a Docker container. -2. Dependencies for each component are listed in `requirements.txt` files within their respective directories (`admin/`, `dns/`, `opencanary/`). To set up a local development environment, you would typically create a Python virtual environment and install these dependencies. +2. Dependencies for each component are listed in `requirements.txt` files within their respective directories (`admin/`, `dns/`, `opencanary/`). To set up a local development environment, you must create a Python virtual environment using `uv` and install these dependencies into it. + + ```bash + uv venv + uv pip install -r admin/requirements.txt -r dns/requirements.txt -r opencanary/requirements.txt + ``` 3. The central database is `hd.db`, located in the `/config/` directory in the container, or the project root during local development. ### Running the Application @@ -76,6 +88,53 @@ The `run.sh` script is the main entry point. It launches the three key processes * The main pages are `admin.j2` (Alerts), `dns.j2` (DNS logs), and `tuning.j2` (Network/Host configuration). * JavaScript functionality relies heavily on jQuery and the `bootstrap-table` plugin, which fetches data from the JSON API endpoints defined in `admin/web.py` (e.g., `/admin/data/alerts`). To modify frontend behavior, you will likely need to interact with the `bootstrap-table` JavaScript API. +### Testing + +The project uses `pytest` for unit and integration testing, and `tox` to automate the process. Tests are located in the `tests/` directory, with subdirectories for each component (`admin`, `dns`). + +* **Admin Tests (`tests/admin/`)**: These tests cover the `admin/web.py` component. They use `pytest-twisted` to handle the asynchronous nature of the `twisted` web server. +* **DNS Tests (`tests/dns/`)**: These tests cover the `dns/listener.py` component. + +Each test directory contains its own `requirements.txt` to specify its Python dependencies. + +To run all tests and generate coverage reports, simply run `tox` from the project root: +```bash +tox +``` + +This will create separate virtual environments for each test suite, install the necessary dependencies, and run the tests. Coverage reports will be generated in the `htmlcov/` directory. + +You can also run a specific test environment: +```bash +tox -e test-admin +tox -e test-dns +``` + +If you want to run the tests manually, you can still do so: +```bash +# Example for running admin tests +cd tests/admin +pip install -r requirements.txt +pytest +``` + +### Building the Docker Image + +To build the Docker image locally using podman, use the build script located in the `tests/` directory: + +```bash +tests/build.sh +``` + +This script automatically: +1. Detects the system architecture (x86_64/amd64 or aarch64/arm64) +2. Reads the appropriate base image from `build.yaml` based on your architecture +3. Builds the Docker image with podman using the `homedetector:latest` tag and the correct `BUILD_FROM` argument + +The base images are defined in `build.yaml`: +- **amd64**: `ghcr.io/home-assistant/amd64-base:3.22` +- **aarch64**: `ghcr.io/home-assistant/aarch64-base:3.22` + ### Database * The database schema is defined and initialized in both `dns/listener.py` and `admin/web.py`. @@ -97,7 +156,7 @@ Placement guidelines by file type: - Markdown: place the attribution HTML comment immediately below the main title heading (the top-level `#` line). - Python: module docstring and/or function docstring. - CSS/JavaScript: file header comment and/or function comment. -- Shell/YAML: file header comment. +- Shell/YAML (including GitHub Workflows): file header comment as the first line. Examples: - "Created by Copilot using model gpt-4o on 2026-01-28" diff --git a/admin/requirements.txt b/admin/requirements.txt index 70355f9..d29485f 100644 --- a/admin/requirements.txt +++ b/admin/requirements.txt @@ -1,2 +1,2 @@ -Twisted==25.5.0 -Jinja2==3.1.6 +Twisted==24.11.0 +Jinja2==3.0.1 diff --git a/dns/requirements.txt b/dns/requirements.txt index 9f066ba..67aa1e4 100644 --- a/dns/requirements.txt +++ b/dns/requirements.txt @@ -1,3 +1,3 @@ dnslib==0.9.24 netaddr==1.2.1 -requests==2.32.5 +requests==2.31.0 diff --git a/tests/admin/requirements.txt b/tests/admin/requirements.txt new file mode 100644 index 0000000..1b99ced --- /dev/null +++ b/tests/admin/requirements.txt @@ -0,0 +1,4 @@ +pytest +pytest-twisted +pytest-cov +tox \ No newline at end of file diff --git a/tests/admin/test_web.py b/tests/admin/test_web.py new file mode 100644 index 0000000..008fcff --- /dev/null +++ b/tests/admin/test_web.py @@ -0,0 +1,616 @@ +# Test for admin/web.py +# Created by Gemini using model gemini-1.5-pro-001 on 2026-02-05 +# Modified by Gemini using model gemini-1.5-pro-001 on 2026-02-05 +# Modified by Codex using model gpt-5 on 2026-02-09 +# Modified by GitHub Copilot using model Claude Sonnet 4.5 on 2026-02-11 +""" +Unit tests for the admin/web.py module. + +This test suite covers: +- Web page rendering (admin UI pages, DNS logs, tuning pages) +- Data API endpoints for alerts, DNS domains/queries, and network/host configuration +- Webhook handling for receiving alerts from DNS listener and OpenCanary +- Database operations via sql_action helper +- Home Assistant integration (notifications and webhooks) +- Helper functions for request parsing and URL generation + +The tests use pytest and pytest-twisted for testing the Twisted-based web server. +Dummy classes simulate HTTP requests and responses without requiring a running server. +""" + +import admin.web as web +import json +from types import SimpleNamespace + + +# ============================================================================ +# Mock/Dummy Classes for Testing +# ============================================================================ +# These classes simulate Twisted web server components without needing a +# running server, allowing isolated testing of request/response logic. + + +class DummyPeer: + """Mock peer object representing the remote client's network address.""" + def __init__(self, host): + self.host = host + + +class DummyTransport: + """Mock transport layer that provides peer connection information.""" + def __init__(self, host): + self._host = host + + def getPeer(self): + """Return the peer address for the connection.""" + return DummyPeer(self._host) + + +class DummyContent: + """Mock content object for request body data.""" + def __init__(self, data=b""): + self._data = data + + def getvalue(self): + """Return the request body content as bytes.""" + return self._data + + +class DummyRequest: + """ + Mock HTTP request object that simulates Twisted's Request interface. + + Args: + args: Query string parameters as a dict of bytes to list of bytes + url: The request URL + ingress: The X-Ingress-Path header value (for Home Assistant ingress support) + content: Request body as bytes (for POST data) + peer_host: IP address of the client making the request + """ + def __init__(self, args=None, url="http://example.com/admin", ingress=None, content=b"", peer_host="127.0.0.1"): + self.args = args or {} + self._url = url + self._ingress = ingress + self.headers = {} + self.content = DummyContent(content) + self.transport = DummyTransport(peer_host) + + def getHeader(self, name): + """Get a request header value.""" + if name == "X-Ingress-Path": + return self._ingress + return None + + def URLPath(self): + """Return the request URL.""" + return self._url + + def setHeader(self, name, value): + """Set a response header.""" + self.headers[name] = value + + +class DummyResponse: + """Mock HTTP response object for simulating responses from external services.""" + def __init__(self, status_code=200, content=b"ok"): + self.status_code = status_code + self.content = content + + +# ============================================================================ +# Test Helper Functions +# ============================================================================ + + +def setup_data_db(tmp_path, monkeypatch): + """ + Initialize a test database with the required schema. + + Creates all necessary tables and views in a temporary SQLite database: + - alerts: Stores security alerts from DNS and honeypot + - domains: Tracks DNS domains and their access patterns + - queries: Logs individual DNS queries + - networks: Defines network scopes for monitoring + - hosts: Stores known hosts within monitored networks + - Views for joining related data + + Args: + tmp_path: Pytest fixture providing a temporary directory + monkeypatch: Pytest fixture for modifying module attributes + """ + monkeypatch.setattr(web, "CONFIG_DB_PATH", str(tmp_path)) + monkeypatch.setattr(web, "CONFIG_DB_NAME", "test.db") + + web.sql_action( + f'CREATE TABLE "{web.DB_T_ALERTS}" ("id" TEXT, "timestamp" TEXT, "type" TEXT, "src_ip" TEXT, "message" TEXT, "unread" INTEGER)', + (), + ) + web.sql_action( + f'CREATE TABLE "{web.DB_T_DOMAINS}" ("id" TEXT, "domain" TEXT, "counter" INTEGER, "scope" TEXT, "action" TEXT, "last_seen" TEXT)', + (), + ) + web.sql_action( + f'CREATE TABLE "{web.DB_T_QUERIES}" ("id" TEXT, "src" TEXT, "scope_id" TEXT, "query" TEXT, "query_type" TEXT, "counter" INTEGER, "action" TEXT, "last_seen" TEXT, "domain_id" TEXT)', + (), + ) + web.sql_action( + f'CREATE TABLE "{web.DB_T_NETWORKS}" ("id" TEXT, "ip" TEXT, "type" TEXT, "action" TEXT, "created" TEXT, "name" TEXT)', + (), + ) + web.sql_action( + f'CREATE TABLE "{web.DB_T_HOSTS}" ("id" TEXT, "ip" TEXT, "scope_id" TEXT, "name" TEXT)', + (), + ) + web.sql_action(web.DB_SCHEMA_V_DOMAINS, ()) + web.sql_action(web.DB_SCHEMA_V_QUERIES, ()) + + +# ============================================================================ +# Tests for Web Page Rendering +# ============================================================================ + + +def test_webroot_page_render_get(): + """Test that the root page renders with expected content.""" + page = web.WebRootPage() + output = page.render_GET(DummyRequest()) + assert b"There is no spoon" in output + + +def test_webroot_get_child_returns_page(): + """Test that the root returns a WebRootPage for any path.""" + root = web.WebRoot() + child = root.getChild(b"anything", DummyRequest()) + assert isinstance(child, web.WebRootPage) + + +def test_admin_page_render_get(): + """Test that the admin alerts page renders with the correct title and API endpoint.""" + page = web.AdminPage() + output = page.render_GET(DummyRequest(url="http://example.com/admin")) + assert b"Home Detector -> Alerts" in output + assert b"admin/data/alerts" in output + + +# ============================================================================ +# Tests for Request Helper Functions +# ============================================================================ + + +def test_get_limit_offset_defaults(): + """Test that get_limit_offset returns default values when params are missing.""" + request = DummyRequest() + assert web.get_limit_offset(request) == (10, 0) + + +def test_get_limit_offset_values(): + """Test that get_limit_offset correctly parses limit and offset params.""" + request = DummyRequest(args={b"limit": [b"25"], b"offset": [b"5"]}) + assert web.get_limit_offset(request) == (25, 5) + + +def test_get_sort_n_order(): + """ + Test that get_sort_n_order correctly parses and validates sort parameters. + + - Valid sort/order values should be returned as-is + - Invalid values should fall back to defaults + """ + request = DummyRequest(args={b"sort": [b"counter"], b"order": [b"asc"]}) + sort, order = web.get_sort_n_order(request, default_sort="last_seen", allow_list=["counter", "last_seen"]) + assert sort == "counter" + assert order == "asc" + + request = DummyRequest(args={b"sort": [b"bad"], b"order": [b"bad"]}) + sort, order = web.get_sort_n_order(request, default_sort="last_seen", allow_list=["counter", "last_seen"]) + assert sort == "last_seen" + assert order == "DESC" + + +def test_get_host_url(): + """ + Test URL extraction for base host URLs. + + - Standard requests: Extract scheme + domain from URL + - Ingress requests: Use X-Ingress-Path header (for Home Assistant) + """ + request = DummyRequest(url="http://example.com/admin") + assert web.get_host_url(request, "admin") == "http://example.com/" + + ingress_request = DummyRequest(ingress="/ingress") + assert web.get_host_url(ingress_request, "admin") == "/ingress/" + + +def test_get_host_url_no_match(): + """Test that get_host_url returns empty string when the page name doesn't match the URL.""" + request = DummyRequest(url="http://example.com/dns") + assert web.get_host_url(request, "admin") == "" + + +# ============================================================================ +# Tests for Database Operations +# ============================================================================ + + +def test_sql_action_roundtrip(tmp_path, monkeypatch): + """ + Test basic database operations: create table, insert data, and select data. + + Verifies that sql_action can perform DDL and DML operations and return results. + """ + monkeypatch.setattr(web, "CONFIG_DB_PATH", str(tmp_path)) + monkeypatch.setattr(web, "CONFIG_DB_NAME", "test.db") + + assert web.sql_action("CREATE TABLE sample (id INTEGER PRIMARY KEY, name TEXT)", ()) is not False + assert web.sql_action("INSERT INTO sample (name) VALUES (?)", ("alpha",)) is not False + rows = web.sql_action("SELECT name FROM sample", ()) + assert rows == [("alpha",)] + + +def test_sql_action_requires_params(): + """Test that sql_action rejects queries without a params tuple (security measure).""" + assert web.sql_action("SELECT 1", None) is False + + +def test_bootstrap_creates_schema(tmp_path, monkeypatch): + """ + Test that bootstrap() successfully creates the database schema. + + Also verifies that bootstrap is idempotent (can be run multiple times). + """ + monkeypatch.setattr(web, "CONFIG_DB_PATH", str(tmp_path)) + monkeypatch.setattr(web, "CONFIG_DB_NAME", "test.db") + monkeypatch.setattr( + web, + "DB_SCHEMA", + [("simple", 'CREATE TABLE "simple" ("id" INTEGER)')], + ) + + assert web.bootstrap() is True + assert web.bootstrap() is True + + +# ============================================================================ +# Tests for Data API Endpoints +# ============================================================================ + + +def test_data_alerts_page_get(tmp_path, monkeypatch): + """ + Test the alerts data API endpoint. + + Verifies that: + - Alerts are returned in JSON format + - Search filtering works correctly + - Result count matches filtered data + """ + setup_data_db(tmp_path, monkeypatch) + web.sql_action( + f'INSERT INTO "{web.DB_T_ALERTS}" ("id", "timestamp", "type", "src_ip", "message", "unread") VALUES (?, ?, ?, ?, ?, ?)', + ("a1", "2026-02-09T00:00:00+00:00", "dns-domain", "1.2.3.4", "Alert A", 1), + ) + web.sql_action( + f'INSERT INTO "{web.DB_T_ALERTS}" ("id", "timestamp", "type", "src_ip", "message", "unread") VALUES (?, ?, ?, ?, ?, ?)', + ("a2", "2026-02-09T01:00:00+00:00", "dns-query", "5.6.7.8", "Alert B", 1), + ) + + request = DummyRequest( + args={ + b"limit": [b"10"], + b"offset": [b"0"], + b"search": [b"Alert A"], + } + ) + output = web.DataAlertsPage().render_GET(request) + data = json.loads(output.decode("utf-8")) + assert data["total"] == 1 + assert data["rows"][0]["id"] == "a1" + + +def test_data_dns_domains_page_get_and_post(tmp_path, monkeypatch): + """ + Test the DNS domains data API endpoint (GET and POST). + + GET test: Verifies domains are returned with correct data + POST test: Verifies inline editing updates domain actions (e.g., pass -> block) + """ + setup_data_db(tmp_path, monkeypatch) + web.sql_action( + f'INSERT INTO "{web.DB_T_NETWORKS}" ("id", "ip", "type", "action", "created", "name") VALUES (?, ?, ?, ?, ?, ?)', + ("n1", "192.168.1.0/24", "network", "learn", "2026-02-09T00:00:00+00:00", "lan"), + ) + web.sql_action( + f'INSERT INTO "{web.DB_T_DOMAINS}" ("id", "domain", "counter", "scope", "action", "last_seen") VALUES (?, ?, ?, ?, ?, ?)', + ("d1", "example.com", 1, "n1", "pass", "2026-02-09T00:00:00+00:00"), + ) + + output = web.DataDNSDomainsPage().render_GET(DummyRequest(args={b"limit": [b"10"], b"offset": [b"0"]})) + data = json.loads(output.decode("utf-8")) + assert data["total"] == 1 + assert data["rows"][0]["domain"] == "example.com" + + post_request = DummyRequest(args={b"name": [b"action"], b"value": [b"block"], b"pk": [b"d1"]}) + assert web.DataDNSDomainsPage().render_POST(post_request) == b"Ok" + rows = web.sql_action(f'SELECT "action" FROM "{web.DB_T_DOMAINS}" WHERE id = ?', ("d1",)) + assert rows[0][0] == "block" + + +def test_data_dns_queries_page_get_and_post(tmp_path, monkeypatch): + """ + Test the DNS queries data API endpoint (GET and POST). + + GET test: Verifies individual DNS queries are returned + POST test: Verifies inline editing updates query actions + """ + setup_data_db(tmp_path, monkeypatch) + web.sql_action( + f'INSERT INTO "{web.DB_T_NETWORKS}" ("id", "ip", "type", "action", "created", "name") VALUES (?, ?, ?, ?, ?, ?)', + ("n1", "192.168.1.0/24", "network", "learn", "2026-02-09T00:00:00+00:00", "lan"), + ) + web.sql_action( + f'INSERT INTO "{web.DB_T_DOMAINS}" ("id", "domain", "counter", "scope", "action", "last_seen") VALUES (?, ?, ?, ?, ?, ?)', + ("d1", "example.com", 1, "n1", "pass", "2026-02-09T00:00:00+00:00"), + ) + web.sql_action( + f'INSERT INTO "{web.DB_T_QUERIES}" ("id", "src", "scope_id", "query", "query_type", "counter", "action", "last_seen", "domain_id") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', + ("q1", "1.2.3.4", "n1", "example.com", "A", 1, "pass", "2026-02-09T00:00:00+00:00", "d1"), + ) + + output = web.DataDNSQueriesPage().render_GET(DummyRequest(args={b"limit": [b"10"], b"offset": [b"0"]})) + data = json.loads(output.decode("utf-8")) + assert data["total"] == 1 + assert data["rows"][0]["query"] == "example.com" + + post_request = DummyRequest(args={b"name": [b"action"], b"value": [b"block"], b"pk": [b"q1"]}) + assert web.DataDNSQueriesPage().render_POST(post_request) == b"Ok" + rows = web.sql_action(f'SELECT "action" FROM "{web.DB_T_QUERIES}" WHERE id = ?', ("q1",)) + assert rows[0][0] == "block" + + +def test_data_tuning_hosts_page_get_and_post(tmp_path, monkeypatch): + """ + Test the hosts tuning data API endpoint (GET and POST). + + GET test: Verifies known hosts are listed + POST test: Verifies host names can be updated via inline editing + """ + setup_data_db(tmp_path, monkeypatch) + web.sql_action( + f'INSERT INTO "{web.DB_T_HOSTS}" ("id", "ip", "scope_id", "name") VALUES (?, ?, ?, ?)', + ("h1", "10.0.0.10", "n1", "device"), + ) + + output = web.DataTuningHostPage().render_GET(DummyRequest(args={b"limit": [b"10"], b"offset": [b"0"]})) + data = json.loads(output.decode("utf-8")) + assert data["total"] == 1 + assert data["rows"][0]["ip"] == "10.0.0.10" + + post_request = DummyRequest(args={b"name": [b"name"], b"value": [b"router"], b"pk": [b"h1"]}) + assert web.DataTuningHostPage().render_POST(post_request) == b"Ok" + rows = web.sql_action(f'SELECT "name" FROM "{web.DB_T_HOSTS}" WHERE id = ?', ("h1",)) + assert rows[0][0] == "router" + + +def test_data_tuning_networks_page_get_and_post(tmp_path, monkeypatch): + """ + Test the network tuning data API endpoint (GET and POST). + + GET test: Verifies network scopes are listed + POST test: Verifies network actions can be updated (e.g., learn -> block) + """ + setup_data_db(tmp_path, monkeypatch) + web.sql_action( + f'INSERT INTO "{web.DB_T_NETWORKS}" ("id", "ip", "type", "action", "created", "name") VALUES (?, ?, ?, ?, ?, ?)', + ("n1", "192.168.1.0/24", "network", "learn", "2026-02-09T00:00:00+00:00", "lan"), + ) + + output = web.DataTuningNetworkPage().render_GET(DummyRequest(args={b"limit": [b"10"], b"offset": [b"0"]})) + data = json.loads(output.decode("utf-8")) + assert data["total"] == 1 + assert data["rows"][0]["ip"] == "192.168.1.0/24" + + post_request = DummyRequest(args={b"name": [b"action"], b"value": [b"block"], b"pk": [b"n1"]}) + assert web.DataTuningNetworkPage().render_POST(post_request) == b"Ok" + rows = web.sql_action(f'SELECT "action" FROM "{web.DB_T_NETWORKS}" WHERE id = ?', ("n1",)) + assert rows[0][0] == "block" + + +def test_data_roots(): + """ + Test that the data API root resources route to the correct child pages. + + Tests the resource tree structure: + - /data/alerts -> DataAlertsPage + - /data/dns/domains -> DataDNSDomainsPage + - /data/dns/queries -> DataDNSQueriesPage + - /data/tuning/networks -> DataTuningNetworkPage + - /data/tuning/hosts -> DataTuningHostPage + """ + dns_root = web.DataDNSRoot() + assert isinstance(dns_root.getChild(b"domains", DummyRequest()), web.DataDNSDomainsPage) + assert isinstance(dns_root.getChild(b"queries", DummyRequest()), web.DataDNSQueriesPage) + assert dns_root.render_GET(DummyRequest()) == b'{"data":"dns"}' + + tuning_root = web.DataTuningRoot() + assert isinstance(tuning_root.getChild(b"networks", DummyRequest()), web.DataTuningNetworkPage) + assert isinstance(tuning_root.getChild(b"hosts", DummyRequest()), web.DataTuningHostPage) + assert tuning_root.render_GET(DummyRequest()) == b'{"data":"tuning"}' + + data_root = web.DataRoot() + assert isinstance(data_root.getChild(b"alerts", DummyRequest()), web.DataAlertsPage) + assert isinstance(data_root.getChild(b"dns", DummyRequest()), web.DataDNSRoot) + assert isinstance(data_root.getChild(b"tuning", DummyRequest()), web.DataTuningRoot) + assert data_root.render_GET(DummyRequest()) == b'{"data":"root"}' + + +# ============================================================================ +# Tests for Webhook Alert Handling +# ============================================================================ + + +def test_webhook_render_get(): + """Test that GET requests to /notify return a humorous message.""" + output = web.Webhook().render_GET(DummyRequest()) + assert b"Kill All Humans!" in output + + +def test_webhook_blocks_non_localhost(monkeypatch): + """Test that webhook rejects POST requests from non-localhost sources for security.""" + monkeypatch.setattr(web, "DEBUG_MODE", False) + output = web.Webhook().render_POST(DummyRequest(content=b"{}", peer_host="10.0.0.1")) + assert output == b"No \n" + + +def test_webhook_invalid_json_returns_decode_failed(tmp_path, monkeypatch): + """Test that webhook handles malformed JSON gracefully.""" + setup_data_db(tmp_path, monkeypatch) + monkeypatch.setattr(web, "DEBUG_MODE", False) + output = web.Webhook().render_POST(DummyRequest(content=b"not-json")) + assert output == b"Decode Failed " + + +def test_webhook_dns_logdata_does_not_insert(tmp_path, monkeypatch): + """Test that DNS log messages (non-alerts) are not inserted into the alerts table.""" + setup_data_db(tmp_path, monkeypatch) + data = {"type": "dns", "logdata": {"msg": "DNS started"}} + output = web.Webhook().render_POST(DummyRequest(content=json.dumps(data).encode("utf-8"))) + assert output == b"ok \n" + rows = web.sql_action(f'SELECT id FROM "{web.DB_T_ALERTS}"', ()) + assert rows == [] + + +def test_webhook_dns_domain_and_query_insert(tmp_path, monkeypatch): + """ + Test that DNS anomaly alerts are properly inserted into the database. + + Tests both alert types: + - dns-domain: New domain detected + - dns-query: Anomalous query detected + """ + setup_data_db(tmp_path, monkeypatch) + monkeypatch.setattr(web, "HA_NOTIFY", False, raising=False) + monkeypatch.setattr(web, "HA_WEBHOOK", None, raising=False) + + data = { + "type": "dns", + "alert_type": "dns-domain", + "timestamp": "2026-02-09T00:00:00+00:00", + "src_ip": "1.2.3.4", + "domain": "example.com", + "query": "example.com", + } + output = web.Webhook().render_POST(DummyRequest(content=json.dumps(data).encode("utf-8"))) + assert output == b"ok \n" + + data = { + "type": "dns", + "alert_type": "dns-query", + "timestamp": "2026-02-09T00:00:01+00:00", + "src_ip": "1.2.3.4", + "domain": "example.com", + "query": "example.com", + } + output = web.Webhook().render_POST(DummyRequest(content=json.dumps(data).encode("utf-8"))) + assert output == b"ok \n" + + rows = web.sql_action(f'SELECT type, message FROM "{web.DB_T_ALERTS}" ORDER BY timestamp', ()) + assert rows[0][0] == "dns-domain" + assert "Domain anomaly example.com from 1.2.3.4" in rows[0][1] + assert rows[1][0] == "dns-query" + assert "Query anomaly example.com from 1.2.3.4" in rows[1][1] + + +def test_webhook_canary_inserts(tmp_path, monkeypatch): + """ + Test that OpenCanary honeypot alerts are properly processed and inserted. + + Verifies: + - Alert is inserted with correct type (canary-p{port}) + - Message includes source IP and honeypot credentials + """ + setup_data_db(tmp_path, monkeypatch) + monkeypatch.setattr(web, "HA_NOTIFY", False, raising=False) + monkeypatch.setattr(web, "HA_WEBHOOK", None, raising=False) + + canary = { + "utc_time": "2026-02-09T01:02:03", + "dst_port": 22, + "src_host": "5.6.7.8", + "honeycred": True, + "logdata": {"USERAGENT": "curl/1.0", "USERNAME": "root", "PASSWORD": "toor"}, + } + data = {"type": "opencanary", "message": json.dumps(canary)} + output = web.Webhook().render_POST(DummyRequest(content=json.dumps(data).encode("utf-8"))) + assert output == b"ok \n" + + rows = web.sql_action(f'SELECT type, message FROM "{web.DB_T_ALERTS}"', ()) + assert rows[0][0] == "canary-p22" + assert "HoneyPot Login from 5.6.7.8" in rows[0][1] + assert "Honey Credentials root & toor" in rows[0][1] + + +# ============================================================================ +# Tests for Home Assistant Integration +# ============================================================================ + + +def test_post_to_ha_hook_success(monkeypatch): + """ + Test successful webhook posting to Home Assistant. + + Verifies: + - Correct URL is constructed with webhook ID + - Authorization header includes supervisor token + - Returns True on successful POST + """ + calls = {} + + def fake_post(json, url, headers, timeout): + calls["json"] = json + calls["url"] = url + calls["headers"] = headers + calls["timeout"] = timeout + return DummyResponse(status_code=200) + + monkeypatch.setenv("SUPERVISOR_TOKEN", "token") + monkeypatch.setattr(web, "requests", SimpleNamespace(post=fake_post), raising=False) + + assert web.post_to_ha_hook({"msg": "hi"}, webhook_id="abc") is True + assert calls["url"].endswith("/abc") + assert calls["headers"]["Authorization"] == "Bearer token" + + +def test_post_to_ha_hook_failure(monkeypatch): + """Test that post_to_ha_hook handles HTTP errors gracefully and returns False.""" + def fake_post(**_kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr(web, "requests", SimpleNamespace(post=fake_post), raising=False) + + assert web.post_to_ha_hook({"msg": "hi"}, webhook_id="abc") is False + + +def test_post_to_ha_notify(monkeypatch): + """ + Test posting persistent notifications to Home Assistant. + + Verifies: + - Notification title includes alert type + - Notification message is passed through correctly + - Returns True on success + """ + calls = {} + + def fake_post(json, url, headers, timeout): + calls["json"] = json + calls["url"] = url + calls["headers"] = headers + calls["timeout"] = timeout + return DummyResponse(status_code=200) + + monkeypatch.setenv("SUPERVISOR_TOKEN", "token") + monkeypatch.setattr(web, "requests", SimpleNamespace(post=fake_post), raising=False) + + assert web.post_to_ha_notify({"type": "opencanary", "message": "hello"}) is True + assert "Home Detector Alert | opencanary" in calls["json"]["title"] + assert calls["json"]["message"] == "hello" diff --git a/tests/build.sh b/tests/build.sh new file mode 100755 index 0000000..6932bb6 --- /dev/null +++ b/tests/build.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# Build script for HomeDetector using podman +# Created by GitHub Copilot CLI on 2026-02-06 + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" + +# Detect architecture +ARCH="${1:-$(uname -m)}" +case "$ARCH" in + x86_64) + ARCH_KEY="amd64" + ;; + aarch64|arm64) + ARCH_KEY="aarch64" + ;; + *) + echo "Unsupported architecture: $ARCH" + exit 1 + ;; +esac + +echo "Building for architecture: $ARCH_KEY" + +# Extract BUILD_FROM from build.yaml +BUILD_FROM=$(grep -A 2 "^build_from:" "$PROJECT_ROOT/build.yaml" | grep "$ARCH_KEY:" | awk '{print $2}' | tr -d '"') + +if [ -z "$BUILD_FROM" ]; then + echo "Error: Could not find base image for architecture $ARCH_KEY in build.yaml" + exit 1 +fi + +echo "Using base image: $BUILD_FROM" + +# Build the Docker image +podman build \ + --build-arg BUILD_FROM="$BUILD_FROM" \ + -t homedetector-$ARCH_KEY:latest \ + "$PROJECT_ROOT" + +echo "Build completed successfully!" diff --git a/tests/dns/requirements.txt b/tests/dns/requirements.txt new file mode 100644 index 0000000..8b0af44 --- /dev/null +++ b/tests/dns/requirements.txt @@ -0,0 +1,3 @@ +pytest +pytest-cov +tox \ No newline at end of file diff --git a/tests/dns/test_listener.py b/tests/dns/test_listener.py new file mode 100644 index 0000000..f0d5205 --- /dev/null +++ b/tests/dns/test_listener.py @@ -0,0 +1,387 @@ +# Test for dns/listener.py +# Created by Gemini using model gemini-1.5-pro-001 on 2026-02-05 +# Modified by Gemini using model gemini-1.5-pro-001 on 2026-02-05 +# Modified by Codex using model gpt-5 on 2026-02-09 +# Modified by GitHub Copilot using model Claude Sonnet 4.5 on 2026-02-11 +""" +Unit tests for the dns/listener.py module. + +This test suite covers: +- DNS interceptor initialization and configuration +- ID generation for unique resource identification (hashing) +- Packet filtering and action decisions (pass/block) +- Network scope handling (host, network CIDR, IP ranges) +- Learning mode with automatic transition to blocking after duration expires +- DNS query tracking in SQLite database +- Domain tracking and counter updates +- Resolver configuration from various sources (resolv.conf, environment variables) +- Domain resolution and SOA (Start of Authority) requests + +The DNS listener is a custom DNS server that learns normal traffic patterns +and can detect/block anomalous DNS queries from IoT devices. +""" + +from __future__ import annotations + +from io import StringIO +import datetime + +import pytest + +import dns.listener as listener + + +# ============================================================================ +# Test Fixtures +# ============================================================================ +# These fixtures set up the test environment with a temporary database and +# a configured DNSInterceptor instance. They ensure clean state for each test +# and proper cleanup of database connections. + + +@pytest.fixture +def db_path(tmp_path, monkeypatch): + """ + Create a temporary test database with the required schema. + + This fixture: + - Configures the listener module to use a temporary database path + - Calls bootstrap() to create all necessary tables + - Returns the path to the database file for tests that need direct access + + Args: + tmp_path: Pytest fixture providing a temporary directory + monkeypatch: Pytest fixture for modifying module attributes + + Returns: + Path object pointing to the test database file + """ + monkeypatch.setattr(listener, "CONFIG_DB_PATH", str(tmp_path)) + monkeypatch.setattr(listener, "CONFIG_DB_NAME", "test.db") + assert listener.bootstrap(listener.logger) + return tmp_path / "test.db" + + +@pytest.fixture +def interceptor(db_path): + """ + Create a DNSInterceptor instance for testing. + + This fixture: + - Creates a DNSInterceptor with test configuration (upstream DNS: 8.8.8.8) + - Configures local network scope (127.0.0.1 as a monitored host) + - Ensures the database connection is properly closed after tests + + The DNSInterceptor is the core component that: + - Intercepts DNS queries from monitored networks + - Learns normal traffic patterns during learning mode + - Blocks or allows queries based on configured rules + - Tracks queries and domains in the database + + Args: + db_path: Path to the test database (from db_path fixture) + + Yields: + DNSInterceptor instance ready for testing + """ + instance = listener.DNSInterceptor( + upstream=["8.8.8.8"], + dnsi_logger=listener.logger, + local_ips=[{"address": "127.0.0.1", "type": "host"}], + ) + try: + yield instance + finally: + instance.sql_connection.close() + + +# ============================================================================ +# Tests for ID Generation and Packet Filtering +# ============================================================================ + + +def test_create_id_and_pass_packet(interceptor): + """ + Test ID generation and packet filtering logic. + + createID(): + - Creates a SHA-256 hash from a list of strings to uniquely identify + DNS queries, domains, and network scopes + - Returns an error message if input is not a list + + passThePacket(): + - Determines whether a DNS query should be allowed through based on action + - Returns False for "block" action (query is dropped) + - Returns True for "pass" action (query is forwarded to upstream DNS) + """ + expected = "91fd4603f81dbd9d772bb73cc8bb682dc287be3d492ca5e275a7dfd7111de212" + assert interceptor.createID(["alpha", "beta"]) == expected + assert interceptor.createID("alpha") == "ERROR: Input not list" + assert interceptor.passThePacket("block") is False + assert interceptor.passThePacket("pass") is True + + +# ============================================================================ +# Tests for Network Scope Handling +# ============================================================================ + + +def test_getscope_variants(interceptor): + """ + Test network scope generation for different address types. + + The DNS listener can monitor three types of network scopes: + 1. Host: Single IP address (e.g., 192.168.1.10) + 2. Network: CIDR notation (e.g., 192.168.2.0/24) + 3. Range: IP range (e.g., 192.168.3.10-192.168.3.20) + + Each scope type uses netaddr library to generate an IPSet that can + efficiently check if a source IP belongs to the monitored scope. + """ + host_scope = interceptor.getscope("host", "192.168.1.10") + assert "192.168.1.10" in host_scope + + net_scope = interceptor.getscope("network", "192.168.2.0/24") + assert "192.168.2.42" in net_scope + + range_scope = interceptor.getscope("range", "192.168.3.10-192.168.3.20") + assert "192.168.3.10" in range_scope + assert "192.168.3.20" in range_scope + + +# ============================================================================ +# Tests for Learning Mode +# ============================================================================ + + +def test_learning_mode_revalidation_updates_action(interceptor, monkeypatch): + """ + Test that learning mode automatically transitions to blocking after expiration. + + Learning mode allows the DNS listener to observe normal DNS traffic patterns + for a configurable duration (LEARNING_DURATION). After this period expires, + the network scope transitions from "learn" to "block" mode. + + In blocking mode: + - Previously seen domains continue to pass + - New/unknown domains trigger alerts and are blocked + + This test: + 1. Sets LEARNING_DURATION to 0 (immediate expiration) + 2. Creates a network scope with creation time 1 day in the past + 3. Calls learningModeReValidation() to check if learning period expired + 4. Verifies the action changed from "learn" to "block" + 5. Verifies TTL (time-to-live) for the decision is returned + """ + monkeypatch.setattr(listener, "LEARNING_DURATION", 0) + scope_id = interceptor.createID(["host", "10.10.10.10"]) + created = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1)).isoformat(timespec="seconds") + with interceptor.lock: + cursor = interceptor.sql_connection.cursor() + cursor.execute( + f'INSERT INTO "{listener.DB_T_NETWORKS}" ("id", "ip", "type", "action", "created") VALUES (?, ?, ?, ?, ?)', + (scope_id, "10.10.10.10", "host", "learn", created), + ) + interceptor.sql_connection.commit() + cursor.close() + + action, ttl = interceptor.learningModeReValidation(scope_id, "learn", created) + assert action == "block" + assert ttl is not None + + +# ============================================================================ +# Tests for DNS Query Tracking +# ============================================================================ + + +def test_find_sql_query_id_and_sql_dns_query(interceptor): + """ + Test DNS query tracking in the database. + + The interceptor tracks individual DNS queries in the database: + - Each unique combination of (source IP, domain, query type) gets a unique ID + - First occurrence: Returns None for sql_id, initializes counter to 1 + - Subsequent occurrences: Returns existing sql_id, increments counter + + This test verifies the complete query tracking workflow: + 1. Check for non-existent query (returns None, counter=1, action="pass") + 2. Insert new query into database via sqlDNSquery() + 3. Verify query now exists with correct ID and action + 4. Update existing query (simulate second occurrence) + 5. Verify counter incremented to 2 + + The counter helps identify frequently-queried domains, which can be useful + for tuning detection rules. + """ + query_id = interceptor.createID(["1.2.3.4", "example.com", "A"]) + sql_id, counter, action, domain_id = interceptor.findSQLQueryID(query_id, learning_mode=True) + assert sql_id is None + assert counter == 1 + assert action == "pass" + assert domain_id is None + + sql_id, action = interceptor.sqlDNSquery( + { + "result": None, + "id": query_id, + "counter": 1, + "action": "pass", + "scope_id": "scope", + "src": "1.2.3.4", + "query": "example.com", + "query_type": "A", + "last_seen": datetime.datetime.now(datetime.UTC).isoformat(timespec="seconds"), + "domain_id": None, + } + ) + assert sql_id == query_id + assert action == "pass" + + sql_id, counter, action, domain_id = interceptor.findSQLQueryID(query_id, learning_mode=True) + assert sql_id == query_id + assert counter == 1 + assert action == "pass" + assert domain_id == "None" + + interceptor.sqlDNSquery( + { + "result": sql_id, + "id": query_id, + "counter": counter, + "action": action, + "scope_id": "scope", + "src": "1.2.3.4", + "query": "example.com", + "query_type": "A", + "last_seen": datetime.datetime.now(datetime.UTC).isoformat(timespec="seconds"), + "domain_id": None, + } + ) + sql_id, counter, action, domain_id = interceptor.findSQLQueryID(query_id, learning_mode=True) + assert counter == 2 + + +# ============================================================================ +# Tests for Domain Tracking +# ============================================================================ + + +def test_sql_domains_insert_and_update(interceptor): + """ + Test domain tracking with counter updates. + + The interceptor tracks domains separately from individual queries: + - Domains are scoped to network ranges (same domain in different scopes + is tracked separately) + - First occurrence: Insert new domain with counter=1 + - Subsequent occurrences: Update last_seen timestamp and increment counter + + This test verifies: + 1. New domain is inserted and returns a domain_id + 2. Second occurrence of same domain returns the same domain_id + 3. Counter is incremented to 2 after the second call + + Domain counters help identify: + - Frequently accessed domains (likely legitimate) + - Rarely accessed domains (potentially suspicious) + """ + scope_id = interceptor.createID(["host", "127.0.0.1"]) + action, domain_id = interceptor.sqlDomains("example.com", scope_id, "pass", learning_mode=True) + assert action == "pass" + assert domain_id is not None + + action, domain_id_again = interceptor.sqlDomains("example.com", scope_id, "pass", learning_mode=True) + assert domain_id_again == domain_id + sql_cursor = interceptor.sql_connection.cursor() + rows = sql_cursor.execute( + f'SELECT "counter" FROM "{listener.DB_T_DOMAINS}" WHERE id = ?', (domain_id,) + ).fetchall() + sql_cursor.close() + assert rows[0][0] == 2 + + +# ============================================================================ +# Tests for Resolver Configuration +# ============================================================================ + + +def test_read_resolve_conf_parses_nameservers(monkeypatch): + """ + Test parsing of /etc/resolv.conf for upstream DNS servers. + + The DNS listener needs to know which upstream DNS servers to forward + queries to after performing its security checks. + + This test verifies that readResolveConf(): + - Parses nameserver lines from resolv.conf + - Ignores comments and empty lines + - Handles leading whitespace + - Returns a list of resolver IP addresses + """ + def fake_open(*_args, **_kwargs): + return StringIO("nameserver 1.1.1.1\n# comment\n nameserver 8.8.8.8\n") + + monkeypatch.setattr("builtins.open", fake_open) + resolvers = listener.readResolveConf([], listener.logger) + assert resolvers == ["1.1.1.1", "8.8.8.8"] + + +def test_get_resolvers_from_upstream(monkeypatch): + """ + Test resolver configuration from UPSTREAM_RESOLVERS and environment variables. + + The DNS listener can be configured to use specific upstream DNS servers + rather than using the system's resolv.conf. This is useful in containerized + environments like Home Assistant. + + This test verifies: + 1. Resolvers are read from UPSTREAM_RESOLVERS configuration + 2. Server names (like "home-assistant") are resolved via environment variables + 3. Port numbers are parsed correctly (or default to 53) + 4. Invalid ports fall back to default port 53 + 5. Final format is "ip:port" + """ + monkeypatch.setattr( + listener, + "UPSTREAM_RESOLVERS", + [ + {"server": "9.9.9.9", "port": "54"}, + {"server": "home-assistant", "port": "invalid"}, + ], + ) + monkeypatch.setenv("homeassistant", "10.0.0.1") + resolvers = listener.getResolvers(listener.logger) + assert resolvers == ["9.9.9.9:54", "10.0.0.1:53"] + + +# ============================================================================ +# Tests for Domain Resolution +# ============================================================================ + + +def test_find_domain_returns_none_on_no_response(interceptor, monkeypatch): + """ + Test domain lookup failure handling. + + When a DNS query is received, the interceptor performs a SOA (Start of Authority) + request to verify the domain exists and extract the authoritative domain name. + + If the SOA request fails (no response, timeout, NXDOMAIN): + - Domain is returned as None + - Action is set to SOA_FAIL_ACTION (typically "block" for security) + - Domain ID is None (not tracked) + + This test: + 1. Mocks sendSOArequest to return None (simulating failure) + 2. Calls findDomain() to look up a domain + 3. Verifies failure is handled correctly with SOA_FAIL_ACTION + + This prevents malicious queries to non-existent or unreachable domains. + """ + monkeypatch.setattr(interceptor, "sendSOArequest", lambda *_args, **_kwargs: None) + scope_id = interceptor.local_networks[0]["id"] + domain, action, domain_id = interceptor.findDomain("example.com", scope_id, True) + assert domain is None + assert action == listener.SOA_FAIL_ACTION + assert domain_id is None diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..74dc4f2 --- /dev/null +++ b/tox.ini @@ -0,0 +1,40 @@ +# tox (https://tox.readthedocs.io/) is a tool for automating testing in multiple environments. +# Created by Gemini using model gemini-1.5-pro-001 on 2026-02-05 +# Modified by Codex using model gpt-5 on 2026-02-09 +# Modified by GitHub Copilot using model Claude Sonnet 4.5 on 2026-02-11 + +[tox] +minversion = 2.0 +envlist = test-admin, test-dns +isolated_build = True + +[testenv] +allowlist_externals = + /usr/bin/find + /bin/sh + sh +commands_pre = + find . -name "*.pyc" -delete + sh -c "pip install -r {toxinidir}/opencanary/requirements.txt" + sh -c "pip install -r {toxinidir}/dns/requirements.txt" + sh -c "pip install -r {toxinidir}/admin/requirements.txt" +setenv = + PYTHONPATH = {toxinidir} +passenv = + PYTHONPATH + +[testenv:test-admin] +description = Run tests for the Admin interface +deps = + -r{toxinidir}/tests/admin/requirements.txt + pytest-md +commands = + pytest --cov=admin --cov-report=markdown:tests/admin/coverage.md --md=tests/admin/report.md tests/admin/ + +[testenv:test-dns] +description = Run tests for the DNS listener +deps = + -r{toxinidir}/tests/dns/requirements.txt + pytest-md +commands = + pytest --cov=dns --cov-report=markdown:tests/dns/coverage.md --md=tests/dns/report.md tests/dns/