From 6a084a30c494a279e44a3988e0108b424d8071ab Mon Sep 17 00:00:00 2001 From: skcert Date: Mon, 9 Feb 2026 12:09:14 +0200 Subject: [PATCH] Add SQL Handler for EAB --- docs/eab.md | 108 +++++++++- docs/external_database_support.md | 51 +++++ examples/eab_handler/sql_handler.py | 316 ++++++++++++++++++++++++++++ test/test_eabsql_handler.py | 231 ++++++++++++++++++++ 4 files changed, 705 insertions(+), 1 deletion(-) create mode 100644 examples/eab_handler/sql_handler.py create mode 100644 test/test_eabsql_handler.py diff --git a/docs/eab.md b/docs/eab.md index 832e3c682..1ad0625ef 100644 --- a/docs/eab.md +++ b/docs/eab.md @@ -12,7 +12,7 @@ To enable EAB, the Certificate Authority (CA) operator must provide both the ACM Key identifiers are included in reports generated by the [Housekeeping](housekeeping.md) class. -By deafault `acme2certifier` validates, during each ACME transaction, whether the EAB credentials used to create the ACME account remain valid. If this check fails, `acme2certifier` stops processing the transaction. This check can be disabled by the configuration option `eabkid_check_disable` in `a + +### Usage + +In the simplest scenario, the database will have one account that all the keys are related to. + +```sql +INSERT INTO account (name, contact) + VALUES ('myaccount', 'contact@myaccount.com'); +``` + +The `profile` column in `credentials` table should contain JSON data in the same format as it is used in JSON Handler. + +Example: + +```sql +INSERT INTO credentials (account_id, key_id, description, profile, status) + VALUES ( + (SELECT id FROM account WHERE account.name = 'myaccount'), + 'keyid_03', + 'mykey', + '{ + "hmac": "YW5kX2ZpbmFsbHlfdGhlX2xhc3RfaG1hY19rZXlfd2hpY2hfaXNfbG9uZ2VyX3RoYW5fMjU2X2JpdHNfYW5kX3Nob3VsZF93b3Jr", + "authorization": { + "prevalidated_domainlist": ["www.example.com"] + } + }', + 1 + ); +``` + + + +### Activate Handler + +To activate this handler, configure the `EABhandler` section in `acme_srv.cfg` as follows. For `db_system`, enter either `mssql` or `postgres`. + +```ini +[EABhandler] +eab_profiling: True +eab_handler_file: examples/eab_handler/sql_handler.py +db_system: mssql, postgres +db_host: +db_name: +db_user: +db_password: +``` + ## Keyfile Verification To check the consistency of the keyfile, use the `tools/eab_chk.py` utility: diff --git a/docs/external_database_support.md b/docs/external_database_support.md index d65604cb9..2839fac06 100644 --- a/docs/external_database_support.md +++ b/docs/external_database_support.md @@ -66,6 +66,35 @@ GRANT postgres TO acme2certifier; sudo apt-get install python3-django python3-psycopg2 ``` +### When using SQL Server (Experimental) + +_SQL Server support is experimental, and is not tested in release regression like the other two databases._ + +Note that this part of the guide is written for **Red Hat Enterprise Linux 9**. + +It is assumed that SQL Server is already installed and running. + +Open SQL Server Management Studio. + +- Create the acme2certifier database and database user: + +```SQL +CREATE DATABASE acme2certifier; +CREATE LOGIN acme2certifier WITH PASSWORD = 'a2c+passwd'; +CREATE USER acme2certifier FOR LOGIN acme2certifier; +``` + +- From Object Explorer, open acme2certifier, Security, Logins, acme2certifier Properties. Then, from User Mapping, map the user to the database and give necessary roles. From Server Roles, give public and sysadmin roles. In essence, grant all access to the database for the acme2certifier user. + +- Install missing python modules + +```bash +pip install mssql-django pyodbc +sudo dnf install unixODBC +``` + +- Follow [these instructions](https://learn.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver15&tabs=redhat18-install%2Credhat17-install%2Cdebian8-install%2Credhat7-13-install%2Crhel7-offline#17) to install Microsoft ODBC 17. + ## Install and Configure acme2certifier - Download the [latest deb package](https://github.com/grindsa/acme2certifier/releases) @@ -166,6 +195,28 @@ DATABASES = { } ``` +### Connecting to SQL Server + +- Modify `/var/www/acme2certifier/acme2certifier/settings.py` and configure your database connection as below: + +```python +DATABASES = { + "default": { + "ENGINE": "mssql", + "NAME": "acme2certifier", + "USER": "acme2certifier", + "PASSWORD": "a2c+passwd", + "HOST": "sqlserverdbsrv,1433", + "PORT": "", + "OPTIONS": { + "driver": "ODBC Driver 17 for SQL Server" + }, + } +} +``` + +- You may also need to disable some SELinux settings for Apache, depending on your server configuration. + ## Finalize acme2cerifier configuration - Create a Django migration set, apply the migrations, and load fixtures: Modify the [configuration file](acme_srv.md) `/var/www/acme2certifier/volume/acme_srv.cfg`according to your needs. If your CA handler needs runtime information (configuration files, keys, certificate bundles, etc.) to be shared between the nodes, ensure they are loaded from `/var/www/acme2certifier/volume`. Below is an example for the `[CAhandler]` section of the openssl-handler I use during my tests: diff --git a/examples/eab_handler/sql_handler.py b/examples/eab_handler/sql_handler.py new file mode 100644 index 000000000..d2e17ff2f --- /dev/null +++ b/examples/eab_handler/sql_handler.py @@ -0,0 +1,316 @@ +#!/usr/bin/python + +# -*- coding: utf-8 -*- +"""eab sql handler""" + +from __future__ import print_function + +from logging import Logger +import psycopg2 +import re +from mssql_python import connect +from typing import Dict, List, Optional, Tuple + +from acme_srv.helper import load_config, csr_cn_get, csr_san_get + + +class EABhandler(object): + """EAB SQL handler""" + + def __init__(self, logger: Logger): + self.logger = logger + + self.db_system = None + self.db_host = None + self.db_name = None + self.db_user = None + self.db_password = None + + def __enter__(self): + """Makes EABhandler a Context Manager""" + self._config_load() + return self + + def __exit__(self, *args): + """Close the connection at the end of the context""" + + def _config_load(self): + """Load config from file""" + self.logger.debug("EABhandler._config_load()") + + config_dic = load_config(self.logger, "EABhandler") + + self.db_system = config_dic.get( + "EABhandler", "db_system", fallback=self.db_system + ) + self.db_host = config_dic.get("EABhandler", "db_host", fallback=self.db_host) + self.db_name = config_dic.get("EABhandler", "db_name", fallback=self.db_name) + self.db_user = config_dic.get("EABhandler", "db_user", fallback=self.db_user) + self.db_password = config_dic.get( + "EABhandler", "db_password", fallback=self.db_password + ) + + self.logger.debug("EABhandler._config_load() ended") + + def _chk_san_lists_get(self, csr: str) -> Tuple[List[str], List[bool]]: + """Check lists""" + self.logger.debug("EABhandler._chk_san_lists_get()") + + # get sans and build a list + _san_list = csr_san_get(self.logger, csr) + + check_list = [] + san_list = [] + + if _san_list: + for san in _san_list: + try: + # SAN list must be modified/filtered) + (_san_type, san_value) = san.lower().split(":") + san_list.append(san_value) + except Exception: + # force check to fail as something went wrong during parsing + check_list.append(False) + self.logger.info( + "SAN list parsing failed at entry: {0}".format(san) + ) + + self.logger.debug("EABhandler._chk_san_lists_get() ended") + return (san_list, check_list) + + def _cn_add(self, csr: str, san_list: List[str]) -> Tuple[List[str], str]: + """Add CN if required""" + self.logger.debug("EABhandler._cn_add()") + + # get common name and attach it to san_list + cn_ = csr_cn_get(self.logger, csr) + + if cn_: + cn_ = cn_.lower() + if cn_ not in san_list: + # append cn to san_list + self.logger.debug("EABhandler._csr_check(): append cn to san_list") + san_list.append(cn_) + + self.logger.debug("EABhandler._cn_add() ended") + return san_list + + def _list_regex_check(self, entry: str, list_: List[str]) -> bool: + """Check entry against regex""" + self.logger.debug("EABhandler._list_regex_check()") + + check_result = False + for regex in list_: + if regex.startswith("*."): + regex = regex.replace("*.", ".") + regex_compiled = re.compile(regex) + if bool(regex_compiled.search(entry)): + # parameter is in set flag accordingly and stop loop + check_result = True + + self.logger.debug( + "EABhandler._list_regex_check() ended with: {0}".format(check_result) + ) + return check_result + + def _wllist_check(self, entry: str, list_: List[str], toggle: bool = False) -> bool: + """Check string against list""" + self.logger.debug("EABhandler._wllist_check({0}:{1})".format(entry, toggle)) + self.logger.debug("check against list: {0}".format(list_)) + + # default setting + check_result = False + + if entry: + if list_: + check_result = self._list_regex_check(entry, list_) + else: + # empty list, flip parameter to make the check successful + check_result = True + + if toggle: + # toggle result if this is a blocked_domainlist + check_result = not check_result + + self.logger.debug( + "EABhandler._wllist_check() ended with: {0}".format(check_result) + ) + return check_result + + def _allowed_domains_check(self, csr: str, domain_list: List[str]) -> str: + """Check allowed domains""" + self.logger.debug("EABhandler.allowed_domains_check()") + + (san_list, check_list) = self._chk_san_lists_get(csr) + (san_list) = self._cn_add(csr, san_list) + + # go over the san list and check each entry + for san in san_list: + check_list.append(self._wllist_check(san, domain_list)) + + if check_list: + # cover a cornercase with empty checklist (no san, no cn) + if False in check_list: + result = "Either CN or SANs are not allowed by profile" + else: + result = False + + self.logger.debug("EABhandler.allowed_domains_check() ended with: %s", result) + return result + + def eab_kid_get(self, csr: str, revocation=False) -> str: + """Get eab kid from database based on csr""" + self.logger.debug("EABhandler.eab_kid_get()") + + try: + # look up eab_kid from database based on csr + from acme_srv.db_handler import DBstore # pylint: disable=c0415 + + if revocation: + # this is a lookup for a revocation request + search_key = "cert_raw" + else: + # this is a lookup for an enrollment request + search_key = "csr" + + dbstore = DBstore(False, self.logger) + result_dic = dbstore.certificate_lookup( + search_key, + csr, + vlist=[ + "name", + "order__name", + "order__account__name", + "order__account__eab_kid", + ], + ) + if result_dic and "order__account__eab_kid" in result_dic: + eab_kid = result_dic["order__account__eab_kid"] + else: + eab_kid = None + + except Exception as err: + self.logger.error("Database error while retrieving eab_kid: %s", err) + eab_kid = None + + self.logger.debug("EABhandler.eab_kid_get() ended with: %s", eab_kid) + return eab_kid + + def eab_profile_get(self, csr: str, revocation=False) -> str: + """Get eab profile""" + self.logger.debug("EABhandler._eab_profile_get()") + + # load profiles from eab credentials database + profiles_dic = self.key_file_load() + + # get eab_kid from database + eab_kid = self.eab_kid_get(csr, revocation=revocation) + + # get profile from profiles_dic + if ( + profiles_dic + and eab_kid + and eab_kid in profiles_dic + and "cahandler" in profiles_dic[eab_kid] + ): + profile_dic = profiles_dic[eab_kid]["cahandler"] + else: + profile_dic = {} + + self.logger.debug( + "EABhandler._eab_profile_get() ended with: %s", bool(profile_dic) + ) + return profile_dic + + def key_file_load(self) -> Dict[str, str]: + """Load profiles from eab credentials database""" + self.logger.debug("EABhandler.key_file_load()") + + if self.db_host and self.db_name and self.db_user and self.db_password: + data_dic = {} + + # query all active (status = 1) profiles for configured eab account + SQL_QUERY = "SELECT key_id, profile FROM credentials WHERE STATUS = 1;" + + if self.db_system == "mssql": + try: + # create sql server connection string + conn_str = ( + "Server=" + + self.db_host + + ";Database=" + + self.db_name + + ";Encrypt=yes;UID=" + + self.db_user + + ";PWD=" + + self.db_password + + ";TrustServerCertificate=yes" + ) + conn = connect(conn_str) + + cursor = conn.cursor() + cursor.execute(SQL_QUERY) + + # forms data_dic object with the same structure as in kid_profile_handler + rows = cursor.fetchall() + for row in rows: + data_dic[row.key_id] = row.profile + + conn.close() + + except Exception as err: + self.logger.error("EABhandler.key_file_load() error: %s", err) + + elif self.db_system == "postgres": + try: + conn = psycopg2.connect( + host=self.db_host, + dbname=self.db_name, + user=self.db_user, + password=self.db_password, + ) + + cursor = conn.cursor() + cursor.execute(SQL_QUERY) + + # forms data_dic object with the same structure as in kid_profile_handler + rows = cursor.fetchall() + for row in rows: + data_dic[str(row[0])] = str(row[1]) + + conn.close() + + except Exception as err: + self.logger.error("EABhandler.key_file_load() error: %s", err) + + self.logger.debug("EABhandler.key_file.load() ended: {%s}", bool(data_dic)) + return data_dic + + def mac_key_get(self, key_id: str) -> Optional[str]: + """Check external account binding""" + self.logger.debug("EABhandler.mac_key_get(%s)", key_id) + + mac_key = None + + try: + if ( + key_id + and self.db_host + and self.db_name + and self.db_user + and self.db_password + ): + data_dic = self.key_file_load() + + if key_id in data_dic: + mac_key = data_dic[key_id] + else: + self.logger.error("EABhandler.mac_key_get() error: key_id not found") + + except Exception as err: + self.logger.error( + "Failed to retrieve MAC key for key_id '%s': %s", key_id, err + ) + + self.logger.debug("EABhandler.mac_key_get() ended with %s", bool(mac_key)) + return mac_key diff --git a/test/test_eabsql_handler.py b/test/test_eabsql_handler.py new file mode 100644 index 000000000..1aede84b8 --- /dev/null +++ b/test/test_eabsql_handler.py @@ -0,0 +1,231 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""unittests for acme2certifier""" +# pylint: disable= C0415, W0212 +import unittest +import sys +import os +from unittest.mock import patch, MagicMock +import configparser + +sys.path.insert(0, ".") +sys.path.insert(1, "..") + + +class TestACMEHandler(unittest.TestCase): + """test class for sql_handler""" + + def setUp(self): + """setup unit test""" + import logging + + logging.basicConfig(level=logging.CRITICAL) + self.logger = logging.getLogger("test_a2c") + from examples.eab_handler.sql_handler import EABhandler + + self.eabhandler = EABhandler(self.logger) + self.dir_path = os.path.dirname(os.path.realpath(__file__)) + + + def test_001_default(self): + """default test which always passes""" + self.assertEqual("foo", "foo") + + @patch("examples.eab_handler.sql_handler.EABhandler._config_load") + def test_002__enter__(self, mock_config_load): + """test enter called""" + mock_config_load.return_value = True + self.eabhandler.__enter__() + self.assertTrue(mock_config_load.called) + + @patch("examples.eab_handler.sql_handler.load_config") + def test_003_config_load(self, mock_config_load): + """test _config_load - empty dictionary""" + parser = configparser.ConfigParser() + mock_config_load.return_value = parser + self.eabhandler._config_load() + self.assertFalse(self.eabhandler.db_system) + self.assertFalse(self.eabhandler.db_host) + self.assertFalse(self.eabhandler.db_name) + self.assertFalse(self.eabhandler.db_user) + self.assertFalse(self.eabhandler.db_password) + + @patch("examples.eab_handler.sql_handler.load_config") + def test_004_config_load(self, mock_load_config_load): + """test _config_load - bogus values""" + parser = configparser.ConfigParser() + parser["foo"] = {"foo": "bar"} + mock_load_config_load.return_value = parser + self.eabhandler._config_load() + self.assertFalse(self.eabhandler.db_system) + self.assertFalse(self.eabhandler.db_host) + self.assertFalse(self.eabhandler.db_name) + self.assertFalse(self.eabhandler.db_user) + self.assertFalse(self.eabhandler.db_password) + + @patch("examples.eab_handler.sql_handler.load_config") + def test_005_config_load(self, mock_config_load): + """test _config_load - bogus values""" + parser = configparser.ConfigParser() + parser["EABhandler"] = {"foo": "bar"} + mock_config_load.return_value = parser + self.eabhandler._config_load() + self.assertFalse(self.eabhandler.db_system) + self.assertFalse(self.eabhandler.db_host) + self.assertFalse(self.eabhandler.db_name) + self.assertFalse(self.eabhandler.db_user) + self.assertFalse(self.eabhandler.db_password) + + @patch("examples.eab_handler.sql_handler.load_config") + def test_006_config_load(self, mock_config_load): + """test _config_load - valid values""" + parser = configparser.ConfigParser() + parser["EABhandler"] = {"db_system": "db_system"} + mock_config_load.return_value = parser + self.eabhandler._config_load() + self.assertEqual("db_system", self.eabhandler.db_system) + self.assertFalse(self.eabhandler.db_host) + self.assertFalse(self.eabhandler.db_name) + self.assertFalse(self.eabhandler.db_user) + self.assertFalse(self.eabhandler.db_password) + + @patch("examples.eab_handler.sql_handler.load_config") + def test_007_config_load(self, mock_config_load): + """test _config_load - valid values""" + parser = configparser.ConfigParser() + parser["EABhandler"] = { + "db_system": "db_system", + "db_host": "db_host", + "db_name": "db_name", + "db_user": "db_user", + "db_password": "db_password" + } + mock_config_load.return_value = parser + self.eabhandler._config_load() + self.assertEqual("db_system", self.eabhandler.db_system) + self.assertEqual("db_host", self.eabhandler.db_host) + self.assertEqual("db_name", self.eabhandler.db_name) + self.assertEqual("db_user", self.eabhandler.db_user) + self.assertEqual("db_password", self.eabhandler.db_password) + + def test_008_mac_key_get(self): + """test mac_key_get without db parameters specified""" + self.assertFalse(self.eabhandler.mac_key_get(None)) + + @patch("examples.eab_handler.sql_handler.EABhandler._wllist_check") + @patch("examples.eab_handler.sql_handler.EABhandler._cn_add") + @patch("examples.eab_handler.sql_handler.EABhandler._chk_san_lists_get") + def test_009_allowed_domains_check(self, mock_san, mock_cn, mock_wlc): + """test EABhanlder._allowed_domains_check()""" + mock_san.return_value = (["foo"], []) + mock_cn.return_value = ["foo", "bar"] + mock_wlc.side_effect = [True, True] + self.assertFalse( + self.eabhandler._allowed_domains_check("csr", ["domain", "list"]) + ) + + @patch("examples.eab_handler.sql_handler.EABhandler._wllist_check") + @patch("examples.eab_handler.sql_handler.EABhandler._cn_add") + @patch("examples.eab_handler.sql_handler.EABhandler._chk_san_lists_get") + def test_010_allowed_domains_check(self, mock_san, mock_cn, mock_wlc): + """test EABhanlder._allowed_domains_check()""" + mock_san.return_value = (["foo"], [False]) + mock_cn.return_value = ["foo", "bar"] + mock_wlc.side_effect = [True, True] + self.assertEqual( + "Either CN or SANs are not allowed by profile", + self.eabhandler._allowed_domains_check("csr", ["domain", "list"]), + ) + + @patch("examples.eab_handler.sql_handler.EABhandler._wllist_check") + @patch("examples.eab_handler.sql_handler.EABhandler._cn_add") + @patch("examples.eab_handler.sql_handler.EABhandler._chk_san_lists_get") + def test_011_allowed_domains_check(self, mock_san, mock_cn, mock_wlc): + """test EABhanlder._allowed_domains_check()""" + mock_san.return_value = (["foo"], []) + mock_cn.return_value = ["foo", "bar"] + mock_wlc.side_effect = [False, True] + self.assertEqual( + "Either CN or SANs are not allowed by profile", + self.eabhandler._allowed_domains_check("csr", ["domain", "list"]), + ) + + @patch("examples.eab_handler.sql_handler.EABhandler.key_file_load") + def test_012_eab_profile_get(self, mock_key_file_load): + """test EABhandler._eab_profile_get()""" + mock_key_file_load.return_value = { + "eab_kid": {"cahandler": {"foo_parameter": "bar_parameter"}} + } + models_mock = MagicMock() + models_mock.DBstore().certificate_lookup.return_value = { + "foo": "bar", + "order__account__eab_kid": "eab_kid", + } + modules = {"acme_srv.db_handler": models_mock} + patch.dict("sys.modules", modules).start() + self.assertEqual( + {"foo_parameter": "bar_parameter"}, self.eabhandler.eab_profile_get("csr") + ) + + @patch("examples.eab_handler.sql_handler.EABhandler.key_file_load") + def test_013_eab_profile_get(self, mock_key_file_load): + """test EABhandler._eab_profile_get()""" + mock_key_file_load.return_value = { + "eab_kid": {"cahandler_invalid": {"foo_parameter": "bar_parameter"}} + } + models_mock = MagicMock() + models_mock.DBstore().certificate_lookup.return_value = { + "foo": "bar", + "order__account__eab_kid": "eab_kid", + } + modules = {"acme_srv.db_handler": models_mock} + patch.dict("sys.modules", modules).start() + self.assertFalse(self.eabhandler.eab_profile_get("csr")) + + @patch("examples.eab_handler.sql_handler.EABhandler.key_file_load") + def test_014_eab_profile_get(self, mock_key_file_load): + """test EABhandler._eab_profile_get()""" + mock_key_file_load.return_value = { + "eab_kid": {"cahandler1": {"foo_parameter": "bar_parameter"}} + } + models_mock = MagicMock() + models_mock.DBstore().certificate_lookup.return_value = { + "foo": "bar", + "1order__account__eab_kid": "eab_kid", + } + modules = {"acme_srv.db_handler": models_mock} + patch.dict("sys.modules", modules).start() + self.assertFalse(self.eabhandler.eab_profile_get("csr")) + + @patch("examples.eab_handler.sql_handler.EABhandler.key_file_load") + def test_015_eab_profile_get(self, mock_key_file_load): + """test EABhandler._eab_profile_get()""" + mock_key_file_load.return_value = { + "eab_kid": {"cahandler": {"foo_parameter": "bar_parameter"}} + } + models_mock = MagicMock() + models_mock.DBstore().certificate_lookup.return_value = { + "foo": "bar", + "order__account__eab_kid": "eab_kid1", + } + modules = {"acme_srv.db_handler": models_mock} + patch.dict("sys.modules", modules).start() + self.assertFalse(self.eabhandler.eab_profile_get("csr")) + + @patch("examples.eab_handler.sql_handler.EABhandler.key_file_load") + def test_016_eab_profile_get(self, mock_prof): + """test EABhandler._eab_profile_get()""" + mock_prof.return_value = { + "eab_kid": {"cahandler": {"foo_parameter": "bar_parameter"}} + } + models_mock = MagicMock() + models_mock.DBstore().certificate_lookup.side_effect = Exception("ex_db_lookup") + modules = {"acme_srv.db_handler": models_mock} + patch.dict("sys.modules", modules).start() + with self.assertLogs("test_a2c", level="INFO") as lcm: + self.assertFalse(self.eabhandler.eab_profile_get("csr")) + self.assertIn( + "ERROR:test_a2c:Database error while retrieving eab_kid: ex_db_lookup", + lcm.output, + ) +