Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cisco Secure Access/Samples/Identities/sgt-sync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
A simple sync tool to bring Security Group Tags from ISE to Secure Access.

## Usage
main.py [-h] [--list-ise | --list-sa | --list-sa-inactive | --diff-only]

options:
-h, --help show this help message and exit
--list-ise List all Security Group Tags found in Cisco Identity Services Engine (ISE).
--list-sa List all Security Group Tags found in Cisco Secure Access (active and inactive).
--list-sa-inactive List only the INACTIVE Security Group Tags found in Cisco Secure Access.
--diff-only Show the difference between ISE and Secure Access SGTs without performing any synchronization (no changes applied).

## Environmental Variables
| Variable | Comment |
|----------------|-------------------------|
ISE-SERVER | IP address or FQDN
ISE-USER | ISE ERS Admin Username
ISE-PASS | ISE ERS Admin Password
SA-KEY | Secure Access API Key
SA-SECRET | Secure Access API Secret

152 changes: 152 additions & 0 deletions Cisco Secure Access/Samples/Identities/sgt-sync/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
Copyright (c) 2025 Cisco and/or its affiliates.
This software is licensed to you under the terms of the Cisco Sample
Code License, Version 1.1 (the "License"). You may obtain a copy of the
License at

https://developer.cisco.com/docs/licenses

All use of the material herein must be in accordance with the terms of
the License. All rights not expressly granted by the License are
reserved. Unless required by applicable law or agreed to separately in
writing, software distributed under the License is distributed on an "AS
IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
"""

# main.py
import sys
import argparse
from sgt_sync.config import Config
from sgt_sync.logging_config import setup_logging
from sgt_sync.clients.ise_client import IseClient, IseClientError
from sgt_sync.clients.secure_access_client import (
SecureAccessClient,
SecureAccessClientError,
)
from sgt_sync.synchronizer import SgtSynchronizer
from sgt_sync.models.sgt import SecurityGroupTag

# Configure logging for the entire application
logger = setup_logging()


def print_sgts(title: str, sgts: list[SecurityGroupTag]):
"""Helper function to print a list of SGTs in a readable format."""
if not sgts:
logger.info(f"No {title} SGTs found.")
return

logger.info(f"\n--- {title} ({len(sgts)} SGTs) ---")
for sgt in sgts:
logger.info(
f" Key: {sgt.key}, Label: '{sgt.label}', Tag ID: {sgt.tag_id}, Status: {sgt.status}"
)
logger.info(f"--- End of {title} ---")


def main():
"""Main function to run the SGT synchronization or perform CLI actions."""
parser = argparse.ArgumentParser(
description="Synchronize Security Group Tags from ISE to Cisco Secure Access, or perform diagnostic actions."
)

# Create a mutually exclusive group for commands that should not run together
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--list-ise",
action="store_true",
help="List all Security Group Tags found in Cisco Identity Services Engine (ISE).",
)
group.add_argument(
"--list-sa",
action="store_true",
help="List all Security Group Tags found in Cisco Secure Access (active and inactive).",
)
group.add_argument(
"--list-sa-inactive",
action="store_true",
help="List only the INACTIVE Security Group Tags found in Cisco Secure Access.",
)
group.add_argument(
"--diff-only",
action="store_true",
help="Show the difference between ISE and Secure Access SGTs without performing any synchronization (no changes applied).",
)
args = parser.parse_args()

try:
# Load and validate configuration
Config.validate()
config = Config()
logger.info("Configuration loaded and validated.")

# Initialize clients
ise_client = IseClient(config)
sa_client = SecureAccessClient(config)
logger.info("API clients initialized.")

# Handle CLI arguments
if args.list_ise:
logger.info("Fetching SGTs from ISE...")
ise_sgts = ise_client.get_sgts()
print_sgts("ISE SGTs", ise_sgts)
sys.exit(0)

elif args.list_sa:
logger.info("Fetching SGTs from Secure Access (all statuses)...")
sa_sgts = sa_client.get_sgts()
print_sgts("Secure Access SGTs (All)", sa_sgts)
sys.exit(0)

elif args.list_sa_inactive:
logger.info("Fetching INACTIVE SGTs from Secure Access...")
all_sa_sgts = sa_client.get_sgts()
inactive_sa_sgts = [sgt for sgt in all_sa_sgts if sgt.status == "inactive"]
print_sgts("Secure Access SGTs (Inactive)", inactive_sa_sgts)
sys.exit(0)

elif args.diff_only:
logger.info(
"Performing SGT difference analysis (diff-only mode). No changes will be applied."
)
ise_sgts = ise_client.get_sgts()
sa_sgts = sa_client.get_sgts()

synchronizer = SgtSynchronizer(ise_client, sa_client)
sgts_to_add_update, sgts_to_mark_inactive = synchronizer.diff_sgts(
ise_sgts, sa_sgts
)

print_sgts(
"SGTs to Add/Update in Secure Access (would be added/modified)",
sgts_to_add_update,
)
print_sgts(
"SGTs to Mark Inactive in Secure Access (would be set to inactive)",
sgts_to_mark_inactive,
)
sys.exit(0)

# Full synchronization if no specific CLI arguments are provided
else:
logger.info(
"No specific CLI arguments provided. Proceeding with full SGT synchronization."
)
synchronizer = SgtSynchronizer(ise_client, sa_client)
synchronizer.sync_sgts()
logger.info("SGT synchronization finished successfully.")

except ValueError as e:
logger.critical(f"Configuration Error: {e}")
sys.exit(1)
except (IseClientError, SecureAccessClientError) as e:
logger.critical(f"API Client Error: {e}")
sys.exit(1)
except Exception as e:
logger.critical(f"An unexpected error occurred: {e}", exc_info=True)
sys.exit(1)


if __name__ == "__main__":
main()
12 changes: 12 additions & 0 deletions Cisco Secure Access/Samples/Identities/sgt-sync/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[project]
name = "sgt-sync"
version = "0.1.0"
description = "A simple proof-of-concept sync tool to bring Security Group Tags from ISE to Secure Access"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"argparse>=1.4.0",
"dotenv>=0.9.9",
"httpx>=0.28.1",
"requests-auth>=8.0.0",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# sgt_sync/clients/ise_client.py
import httpx
import json
import logging
from requests.auth import HTTPBasicAuth
from typing import List

from ..config import Config
from ..models.sgt import SecurityGroupTag

logger = logging.getLogger("sgt_sync.ise_client")

class IseClientError(Exception):
"""Custom exception for ISE client errors."""
pass

class IseClient:
"""
Client for interacting with Cisco Identity Services Engine (ISE) ERS APIs.
Handles SGT fetching.
"""
def __init__(self, config: Config):
self.config = config
self.auth = HTTPBasicAuth(self.config.ISE_USER, self.config.ISE_PASS)
self.headers = {
"Accept": "application/JSON",
"Content-Type": "application/JSON",
}
self.client = httpx.Client(verify=self.config.VERIFY_SSL)

def get_sgts(self) -> List[SecurityGroupTag]:
"""
Retrieve all Security Group Tags from ISE using the ERS API.
Handles pagination and fetches full SGT details by following links.
"""
logger.info(f"Retrieving ISE SGTs from {self.config.BASE_ISE_ERS_URL}/sgt...")
ise_sgt_list: List[SecurityGroupTag] = []
url = f"{self.config.BASE_ISE_ERS_URL}/sgt"

current_start_index = 0
page_size = 100
total_sgts = -1

try:
while total_sgts == -1 or current_start_index < total_sgts:
params = {"size": page_size, "startIndex": current_start_index}
logger.debug(
f"Fetching ISE SGTs with startIndex={current_start_index}, size={page_size}"
)

response = self.client.get(url, auth=self.auth, params=params, headers=self.headers)
response.raise_for_status()

data = response.json()
search_result = data.get("SearchResult", {})
resources = search_result.get("resources", [])

if not resources:
logger.debug("No more resources found in ISE response.")
break

for resource in resources:
sgt_detail_link = resource.get("link", {}).get("href")

if sgt_detail_link:
sgt_detail_response = self.client.get(
sgt_detail_link, auth=self.auth, headers=self.headers
)
sgt_detail_response.raise_for_status()

sgt_data = sgt_detail_response.json().get("Sgt")

if sgt_data:
try:
ise_sgt_list.append(SecurityGroupTag.from_ise_data(sgt_data))
except ValueError as ve:
logger.warning(f"Skipping malformed ISE SGT data: {sgt_data} - {ve}")
else:
logger.warning(
f"Could not find link for ISE SGT resource: {resource}"
)

total_sgts = search_result.get("total", len(ise_sgt_list))
current_start_index += len(resources)
logger.debug(
f"Current ISE SGTs retrieved: {len(ise_sgt_list)}, Total expected: {total_sgts}"
)

logger.info(f"Successfully retrieved {len(ise_sgt_list)} SGTs from ISE.")
return ise_sgt_list
except httpx.HTTPStatusError as e:
logger.error(
f"Failed to retrieve ISE SGTs (HTTP Status {e.response.status_code}): {e.response.text}"
)
raise IseClientError("Failed to retrieve ISE SGTs.") from e
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON response from ISE: {e}")
raise IseClientError("Invalid JSON response from ISE.") from e
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting ISE SGTs: {e}")
raise IseClientError("Network error during ISE SGT retrieval.") from e
Loading