Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bb2daa8
parser: add DBWriter helper (phase 1) - batch inserts and validation …
cchwala Jan 22, 2026
b1d6874
parser: add FileManager (phase 2) - archiving and quarantine utilities
cchwala Jan 22, 2026
95555bc
parser: add BaseParser and CSV parsers (phase 3) - rawdata & metadata…
cchwala Jan 22, 2026
1a11497
parser: add ParserRegistry and FileWatcher (phase 4) - mapping and wa…
cchwala Jan 22, 2026
2a11e87
parser: orchestration and entrypoint (phase 5) - ParserService and st…
cchwala Jan 22, 2026
9f8cfac
parser: update requirements (phase 6) - add watchdog and dateutil
cchwala Jan 22, 2026
e426ab8
tests: add parser unit tests (phase 7)
cchwala Jan 22, 2026
05f779f
docker-compose: mount SFTP uploads and add parser archive/quarantine …
cchwala Jan 22, 2026
03c5a51
parser: accept rawdata even if metadata missing; log truncated missin…
cchwala Jan 22, 2026
b126c25
parser: accept rawdata even if metadata missing; log truncated missin…
cchwala Jan 22, 2026
c56e20d
parser: timezone-aware quarantine timestamp; env-driven config; DB co…
cchwala Jan 22, 2026
b78eb25
refactor: simplify parser code and reduce complexity (~75 LOC)
cchwala Jan 22, 2026
1c33474
test: add unit tests for CSV and DBWriter functionality
cchwala Jan 22, 2026
85102fc
docs: add README for Parser Service with features, architecture, and …
cchwala Jan 22, 2026
8da319f
ci: add parser unit tests workflow
cchwala Jan 22, 2026
54a9daa
docs: simplify parser README for conciseness
cchwala Jan 22, 2026
adb83cb
docs: enhance README with detailed features, architecture, and config…
cchwala Jan 22, 2026
db8578f
ci: add pytest and pytest-cov to parser requirements
cchwala Jan 22, 2026
6b8e769
ci: remove pytest timeout from pyproject to avoid warning when pytest…
cchwala Jan 22, 2026
0f63261
fix: enhance connection check in DBWriter and improve CSV parser hand…
cchwala Jan 22, 2026
0cd834f
test: add basic tests for FileWatcher and FileUploadHandler functiona…
cchwala Jan 22, 2026
a72b809
refactor: restructure demo_csv_data parser, add example files, and up…
cchwala Jan 22, 2026
7f81836
feat: update CML metadata handling in data generator and validation f…
cchwala Jan 22, 2026
f2fe166
refactor: move demo_csv_data to parsers/, update metadata example, an…
cchwala Jan 22, 2026
a81e33b
Refactor parser to use function-based approach
cchwala Jan 22, 2026
de650ff
refactor: streamline parser service and extract CML processing logic …
cchwala Jan 22, 2026
f984a57
Add sublink-specific metadata support with composite primary key
cchwala Jan 22, 2026
bf4ab7a
add length column to cml_metadata and update related components
cchwala Jan 22, 2026
4efc328
fix: add wait mechanisms to e2e tests for MNO simulator data generation
cchwala Jan 22, 2026
155d169
docs: update documentation to reflect composite key schema and consol…
cchwala Jan 22, 2026
2858cd7
fix: start database and parser services in CI e2e workflow
cchwala Jan 22, 2026
8bd26da
ci: add comprehensive diagnostics to e2e workflow
cchwala Jan 22, 2026
b8a6784
fix: explicitly set parser directory paths in docker-compose
cchwala Jan 22, 2026
00173ec
"Fix E2E test timeouts and add local test script
cchwala Jan 22, 2026
8c4b7ba
fix: handle composite key in metadata queries and update health checks
cchwala Jan 23, 2026
58d3eaf
refactor: add automatic database reconnection and consolidate batch i…
cchwala Jan 23, 2026
46524be
fix: update webserver test to check database access instead of archiv…
cchwala Jan 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions .github/workflows/test_integration_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
paths:
- 'tests/**'
- 'docker-compose.yml'
- 'parser/**'
- 'webserver/**'
- 'mno_data_source_simulator/**'
- 'database/**'
Expand All @@ -15,6 +16,7 @@ on:
paths:
- 'tests/**'
- 'docker-compose.yml'
- 'parser/**'
- 'webserver/**'
- 'mno_data_source_simulator/**'
- 'database/**'
Expand Down Expand Up @@ -60,20 +62,24 @@ jobs:

- name: Start services
run: |
docker compose up -d sftp_receiver webserver mno_simulator
docker compose up -d database sftp_receiver parser webserver mno_simulator
sleep 10

- name: Wait for services to be ready
run: |
# Wait for database to be ready
echo "Waiting for database..."
timeout 60 bash -c 'until docker compose exec -T database pg_isready -U postgres; do sleep 1; done'
timeout 60 bash -c 'until docker compose exec -T database pg_isready -U myuser; do sleep 1; done'
echo "Database is ready"

# Give webserver time to connect to database
sleep 5
# Give services time to start and connect
sleep 10

# Check service status
echo "Checking service status..."
docker compose ps

# Wait for webserver to respond (ignore HTTP status, just check if it's listening)
# Wait for webserver to respond
echo "Waiting for webserver..."
timeout 60 bash -c 'until curl -s http://localhost:5000/ >/dev/null 2>&1; do echo "Retrying..."; sleep 2; done'
echo "Webserver is ready"
Expand All @@ -83,6 +89,18 @@ jobs:
timeout 30 bash -c 'until nc -z localhost 2222; do sleep 1; done'
echo "SFTP server is ready"

# Give MNO simulator time to generate first batch of data
echo "Waiting for MNO simulator first generation cycle (40 seconds)..."
sleep 40

# Check if files appeared in SFTP
echo "Checking SFTP uploads directory..."
docker compose exec -T sftp_receiver ls -la /home/cml_user/uploads/ || echo "Could not list SFTP directory"

# Check if parser sees the files
echo "Checking parser incoming directory..."
docker compose exec -T parser ls -la /app/data/incoming/ || echo "Could not list parser directory"

echo "All services are ready"

- name: Run E2E integration tests
Expand All @@ -92,14 +110,44 @@ jobs:
- name: Show logs on failure
if: failure()
run: |
echo "=== Service Status ==="
docker compose ps

echo ""
echo "=== Database Logs ==="
docker compose logs database

echo ""
echo "=== SFTP Receiver Logs ==="
docker compose logs sftp_receiver

echo ""
echo "=== Parser Logs ==="
docker compose logs parser

echo ""
echo "=== Webserver Logs ==="
docker compose logs webserver

echo ""
echo "=== MNO Simulator Logs ==="
docker compose logs mno_simulator
echo "=== Database Logs ==="
docker compose logs database

echo ""
echo "=== SFTP Directory Contents ==="
docker compose exec -T sftp_receiver ls -la /home/cml_user/uploads/ || echo "Could not access SFTP directory"

echo ""
echo "=== Parser Incoming Directory Contents ==="
docker compose exec -T parser ls -la /app/data/incoming/ || echo "Could not access parser directory"

echo ""
echo "=== Parser Archived Directory Contents ==="
docker compose exec -T parser ls -la /app/data/archived/ || echo "Could not access parser archived directory"

echo ""
echo "=== Parser Quarantine Directory Contents ==="
docker compose exec -T parser ls -la /app/data/quarantine/ || echo "Could not access parser quarantine directory"

- name: Cleanup
if: always()
Expand Down
43 changes: 43 additions & 0 deletions .github/workflows/test_parser.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Parser Unit Tests

on:
push:
branches: [ main ]
paths:
- 'parser/**'
- '.github/workflows/test_parser.yml'
pull_request:
branches: [ main ]
paths:
- 'parser/**'

jobs:
unit-tests:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install dependencies
run: |
cd parser
pip install -r requirements.txt

- name: Run parser unit tests with coverage
run: |
cd parser
pytest tests/ -v --cov=. --cov-report=xml --cov-report=term

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
with:
file: ./parser/coverage.xml
flags: parser
fail_ci_if_error: false
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
9 changes: 7 additions & 2 deletions database/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ CREATE TABLE cml_data (
);

CREATE TABLE cml_metadata (
cml_id TEXT PRIMARY KEY,
cml_id TEXT NOT NULL,
sublink_id TEXT NOT NULL,
site_0_lon REAL,
site_0_lat REAL,
site_1_lon REAL,
site_1_lat REAL
site_1_lat REAL,
frequency REAL,
polarization TEXT,
length REAL,
PRIMARY KEY (cml_id, sublink_id)
);

SELECT create_hypertable('cml_data', 'time');
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ services:
- database
environment:
- DATABASE_URL=postgresql://myuser:mypassword@database:5432/mydatabase
- PARSER_INCOMING_DIR=/app/data/incoming
- PARSER_ARCHIVED_DIR=/app/data/archived
- PARSER_QUARANTINE_DIR=/app/data/quarantine
- PARSER_ENABLED=true
- PROCESS_EXISTING_ON_STARTUP=true
- LOG_LEVEL=INFO
volumes:
- sftp_uploads:/app/data/incoming
- parser_archived:/app/data/archived
- parser_quarantine:/app/data/quarantine


metadata_processor:
Expand Down Expand Up @@ -150,4 +160,6 @@ volumes:
grafana_data:
mno_data_to_upload:
mno_data_uploaded:
parser_archived:
parser_quarantine:
# minio_data: # Uncomment if using MinIO
9 changes: 7 additions & 2 deletions mno_data_source_simulator/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ sftp:
known_hosts_path: "/app/ssh_keys/known_hosts"
remote_path: "/uploads"
# Upload frequency in seconds
upload_frequency_seconds: 60
upload_frequency_seconds: 30
# Connection timeout in seconds
connection_timeout: 30

# Data generation configuration
generator:
# How often to generate new data points (in seconds)
generation_frequency_seconds: 60
generation_frequency_seconds: 30
# Number of timestamps to include in each generated file
# (timestamps will be spaced by time_resolution_seconds)
timestamps_per_file: 3
# Time resolution between timestamps within a file (in seconds)
time_resolution_seconds: 10
# Directory where generated CSV files will be written
output_dir: "data_to_upload"

Expand Down
76 changes: 30 additions & 46 deletions mno_data_source_simulator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,28 +163,30 @@ def generate_data(

def get_metadata_dataframe(self) -> pd.DataFrame:
"""
Get CML metadata as a pandas DataFrame.

Extracts all metadata coordinates from the NetCDF dataset
(excluding dimension coordinates like time, cml_id, sublink_id).

Returns
-------
pd.DataFrame
DataFrame with CML metadata.
Get CML metadata as a pandas DataFrame, with one row per (cml_id, sublink_id).
Includes: cml_id, sublink_id, site_0_lon, site_0_lat, site_1_lon, site_1_lat, frequency, polarization, length
"""
# Identify metadata coordinates (non-dimension coordinates)
dimension_coords = set(self.dataset.sizes.keys())
all_coords = set(self.dataset.coords.keys())
metadata_coord_names = list(all_coords - dimension_coords)

# Extract metadata as DataFrame
metadata_df = self.dataset[metadata_coord_names].to_dataframe()

# Sort by index to ensure deterministic order across different systems
metadata_df = metadata_df.sort_index()

return metadata_df
# Extract all coordinates and variables needed for metadata
# Assume sublink_id is a dimension, so we need to reset index to get it as a column
# This will produce one row per (cml_id, sublink_id)
required_columns = [
"cml_id",
"sublink_id",
"site_0_lon",
"site_0_lat",
"site_1_lon",
"site_1_lat",
"frequency",
"polarization",
"length",
]
# Convert to DataFrame
df = self.dataset[required_columns].to_dataframe().reset_index()
# Remove duplicate columns if present
df = df.loc[:, ~df.columns.duplicated()]
# Sort for deterministic output
df = df.sort_values(["cml_id", "sublink_id"]).reset_index(drop=True)
return df

def generate_data_and_write_csv(
self,
Expand Down Expand Up @@ -252,28 +254,14 @@ def generate_data_and_write_csv(

def write_metadata_csv(self, filepath: str = None) -> str:
"""
Write CML metadata to a CSV file.

Parameters
----------
filepath : str, optional
Full path to the output CSV file. If not provided, generates
a filename with timestamp in the output directory.

Returns
-------
str
Path to the generated metadata CSV file.
Write CML metadata to a CSV file, with one row per (cml_id, sublink_id).
Database schema now expects one row per (cml_id, sublink_id) to preserve
sublink-specific metadata like frequency and polarization.
"""
# Get metadata as DataFrame
metadata_df = self.get_metadata_dataframe()

# Reset index to include cml_id and sublink_id as columns
# This ensures the sorted order is preserved in the CSV
metadata_df = metadata_df.reset_index()

# Reorder columns: cml_id, sublink_id, site_0 (lon, lat), site_1 (lon, lat), frequency, polarization, length
column_order = [
# Keep only the columns needed for the database
db_columns = [
"cml_id",
"sublink_id",
"site_0_lon",
Expand All @@ -284,22 +272,18 @@ def write_metadata_csv(self, filepath: str = None) -> str:
"polarization",
"length",
]
# Only include columns that exist in the dataframe
column_order = [col for col in column_order if col in metadata_df.columns]
metadata_df = metadata_df[column_order]
# Filter to database columns (no deduplication needed)
metadata_df = metadata_df[db_columns]

# Generate filepath if not provided
if filepath is None:
timestamp_str = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
filename = f"cml_metadata_{timestamp_str}.csv"
filepath = self.output_dir / filename

# Write to CSV
metadata_df.to_csv(filepath, index=False)
logger.info(
f"Generated metadata CSV file: {filepath} ({len(metadata_df)} rows)"
)

return str(filepath)

def close(self):
Expand Down
35 changes: 34 additions & 1 deletion mno_data_source_simulator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import sys
from pathlib import Path
from datetime import datetime, timedelta
import yaml

from data_generator import CMLDataGenerator
Expand Down Expand Up @@ -114,13 +115,45 @@ def main():
upload_frequency = config["sftp"]["upload_frequency_seconds"]
last_upload_time = time.time()

# Get generation configuration
timestamps_per_file = config["generator"].get("timestamps_per_file", 1)
time_resolution_seconds = config["generator"].get("time_resolution_seconds", 60)

# Generate metadata file at startup (metadata is static)
try:
metadata_file = generator.write_metadata_csv()
logger.info(f"Generated metadata file: {metadata_file}")

# If SFTP uploader is available, upload the metadata file immediately
if sftp_uploader:
try:
uploaded_count = sftp_uploader.upload_pending_files()
if uploaded_count > 0:
logger.info(f"Uploaded {uploaded_count} file(s) including metadata")
last_upload_time = time.time()
except Exception as e:
logger.error(f"Failed to upload initial metadata: {e}")
except Exception as e:
logger.error(f"Failed to generate metadata file: {e}")

try:
logger.info("Entering main loop")

while True:
try:
# Generate timestamps for this cycle
current_time = datetime.now()
if timestamps_per_file > 1:
# Generate multiple timestamps with specified resolution
timestamps = [
current_time + timedelta(seconds=i * time_resolution_seconds)
for i in range(timestamps_per_file)
]
else:
timestamps = None # Will use current time

# Generate data and write to CSV file
csv_file = generator.generate_data_and_write_csv()
csv_file = generator.generate_data_and_write_csv(timestamps=timestamps)
logger.info(f"Generated CSV file: {csv_file}")

# Check if it's time to upload
Expand Down
2 changes: 1 addition & 1 deletion mno_data_source_simulator/tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def test_metadata_csv_generation(test_dir):
# Load and validate CSV content
loaded_df = pd.read_csv(filepath)

# Check required columns exist (including cml_id and sublink_id)
# Check required columns exist (matching database schema)
required_columns = [
"cml_id",
"sublink_id",
Expand Down
Loading