A modular monitoring system with plugin architecture that collects metrics from multiple agents, forwards them through a gRPC server to Kafka, and enables real-time analysis. Configuration is managed dynamically via etcd.
docker compose up -dpip install -r requirements.txtNote: If you encounter protobuf compatibility issues with etcd3, set this environment variable:
export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
# Add to ~/.zshrc or ~/.bashrc for persistence# Option 1: Manually set up configuration for an agent
python setup_etcd_config.py --hostname agent-001 --interval 5
# Option 2: Let the agent auto-initialize with defaults
# The agent will automatically store default configuration to etcd if none exists# Terminal 1: Start gRPC Server
python3 run_server.py
# Or with environment variables:
# export GRPC_SERVER_PORT=50051
# export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# python3 run_server.py
# Terminal 2: Start Analysis App
python3 run_analysis.py get-metrics
# Or with environment variables:
# export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# python3 run_analysis.py get-metrics
# Terminal 3: Start Agent
python3 run_agent.py --hostname agent-001 --etcd-host localhost --etcd-port 2379
# Or with environment variables:
# export GRPC_SERVER_HOST=localhost
# export GRPC_SERVER_PORT=50051
# export ETCD_HOST=localhost
# export ETCD_PORT=2379
# python3 run_agent.py --hostname agent-001
# Note: Agent will auto-create default config in etcd if none existsβββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Monitor Agent ββββ gRPC ββββββΊβ gRPC Server βββββ Kafka βββββΊβ Analysis App β
β β (Stream) β (Broker) β β β
β β’ Collects data β β β’ Forwards data β β β’ Analyzes data β
β β’ Plugin system β β β β β’ Prints to stdoutβ
β β’ etcd config β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β etcd β
β Configuration β
β Management β
βββββββββββββββββββ
Key Features:
- Plugin Architecture: Extensible plugin system for data processing
- Dynamic Configuration: Real-time configuration updates via etcd
- Unidirectional Streaming: Agent sends metrics to server via gRPC
lab_ds/
βββ agent/ # Monitoring agent module
β βββ agent.py # Main agent orchestrator
β βββ collect.py # Metric collection module
β βββ grpc.py # gRPC communication module
β βββ etcd_config.py # etcd configuration manager
β βββ plugin_manager.py # Plugin loading and management
β βββ plugins/ # Plugin implementations
β βββ base.py # Base plugin class
β βββ deduplication.py # Example deduplication plugin
βββ grpc_server/ # gRPC server + Kafka producer
β βββ server.py # gRPC server implementation
β βββ kafka_producer.py # Kafka producer service
βββ analysis_app/ # Kafka consumer + analysis
β βββ consumer.py # Analysis application
βββ shared/ # Protocol definitions & config
β βββ monitoring.proto # gRPC protocol definition
β βββ config.py # Kafka topics configuration
β βββ monitoring_pb2*.py # Generated protobuf files
βββ setup_etcd_config.py # Helper script for etcd config
βββ run_agent.py # β Run agent
βββ run_server.py # β Run server
βββ run_analysis.py # β Run analysis app
python3 run_agent.py \
--hostname agent-001 \
--server localhost:50051 \
--etcd-host localhost \
--etcd-port 2379 \
--config-key /monitor/config/agent-001 # Optional, defaults to /monitor/config/<hostname>Or use environment variables:
export GRPC_SERVER_HOST=localhost
export GRPC_SERVER_PORT=50051
export ETCD_HOST=localhost
export ETCD_PORT=2379
python3 run_agent.py --hostname agent-001Note: If no configuration exists in etcd for the agent, it will automatically initialize with default values and store them to etcd.
python3 run_server.pyOr use environment variables:
export GRPC_SERVER_PORT=50051
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
python3 run_server.pypython3 run_analysis.py get-metrics \
--hostname <hostname>
--timeout 10Or use environment variables:
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
python3 run_analysis.py get-metrics --group-id my-team --timeout 10Option 1: Manual Setup (Recommended for production)
python setup_etcd_config.py \
--hostname <hostname> \
--interval 5 \
--metrics cpu memory "disk read" "disk write" "net in" "net out" \
--plugins agent.plugins.deduplication.DeduplicationPluginOption 2: Auto-initialization (Agent creates defaults automatically)
# Just start the agent - it will create default config in etcd if none exists
python3 run_agent.py --hostname agent-001Or use environment variables:
export ETCD_HOST=localhost
export ETCD_PORT=2379
python setup_etcd_config.py --hostname agent-001 --interval 5The agent supports a plugin architecture for extensible data processing. Plugins can:
- Filter metrics
- Transform data
- Drop duplicate data (deduplication plugin example)
- Add custom processing logic
- Create a new plugin class in
agent/plugins/:
from agent.plugins.base import BasePlugin
from shared import monitoring_pb2
class MyPlugin(BasePlugin):
def initialize(self, config=None):
# Initialize plugin
pass
def run(self, metrics_request):
# Process metrics_request
# Return modified request or None to drop
return metrics_request
def finalize(self):
# Cleanup
pass- Add plugin to etcd configuration:
{
"interval": 5,
"metrics": ["cpu", "memory"],
"plugins": ["agent.plugins.my_plugin.MyPlugin"]
}Configuration is stored in etcd at /monitor/config/<hostname>:
{
"interval": 5,
"metrics": [
"cpu",
"memory",
"disk read",
"disk write",
"net in",
"net out"
],
"plugins": [
"agent.plugins.deduplication.DeduplicationPlugin"
]
}When an agent starts, the EtcdConfigManager automatically:
- Checks for existing configuration in etcd at
/monitor/config/<hostname> - If config exists: Loads and uses it
- If config doesn't exist:
- Uses default configuration values
- Automatically stores the default config to etcd (so it's visible and can be modified later)
This means:
- β You can start agents without pre-configuring etcd
- β Default configs are automatically persisted to etcd
- β
You can modify the auto-created config later via
setup_etcd_config.pyoretcdctl - β
The agent's
store_config()method can be used programmatically to update config
Default Configuration Values:
interval: 5 secondsmetrics: ["cpu", "memory", "disk read", "disk write", "net in", "net out"]plugins: [] (empty by default)thresholds: Predefined thresholds for alertsmin_cpu: 5.0%min_memory: 5.0%window_size: 5
Configuration changes in etcd are automatically detected and applied:
- Interval: Updated in real-time
- Metrics: Updated immediately
- Plugins: Reloaded dynamically
Automatic Configuration Initialization: When an agent starts, it automatically checks for configuration in etcd. If no configuration exists, the agent will:
- Use default configuration values
- Automatically store the default configuration to etcd (so it's visible and can be modified later)
This means you can start an agent without pre-configuring etcd, and it will initialize itself with sensible defaults.
Manual Configuration Setup:
# Using the setup script
python setup_etcd_config.py --hostname agent-001 --interval 10
# Or directly with etcdctl (if etcd is running in Docker)
docker exec -it etcd etcdctl put /monitor/config/agent-001 '{"interval": 10, "metrics": ["cpu", "memory"], "plugins": []}'Note: The agent's EtcdConfigManager now includes a store_config() method that can be used to programmatically store configuration to etcd.
- CPU usage (%)
- Memory usage (%)
- Memory used/total (MB)
- Disk read/write (MB/s)
- Network in/out (MB/s)
monitoring-data- Agent metrics β Analysis app (via gRPC server)
- Python 3.7+
- Docker (for Kafka, Zookeeper, and etcd)
- psutil (for real metrics mode)
- Kafka:
localhost:9092 - Kafka UI:
http://localhost:8080 - etcd:
localhost:2379
For protobuf compatibility with etcd3:
export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=pythonAll ports and hosts can be configured via environment variables, making it easy to deploy in different environments:
gRPC Server Configuration:
GRPC_SERVER_PORT- Port for the gRPC server (default:50051)GRPC_SERVER_HOST- Host for the gRPC server (default:localhost)
Kafka Configuration:
KAFKA_BOOTSTRAP_SERVERS- Kafka bootstrap servers address (default:localhost:9092)
etcd Configuration:
ETCD_HOST- etcd server hostname (default:localhost)ETCD_PORT- etcd server port (default:2379)
Example Usage:
# Set environment variables
export GRPC_SERVER_PORT=50051
export GRPC_SERVER_HOST=localhost
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export ETCD_HOST=localhost
export ETCD_PORT=2379
# Run services (they will use the environment variables)
python3 run_server.py
python3 run_agent.py --hostname agent-001
python3 run_analysis.py get-metricsNote: Command-line arguments will override environment variables if both are provided. Environment variables provide defaults when command-line arguments are not specified.
# Terminal 1: Start gRPC Server
python3 run_server.py
# Terminal 2: Start Analysis App
python3 run_analysis.py get-metrics
# Terminal 3: Start agent (config will be auto-created if missing)
python3 run_agent.py --hostname agent-001
# Or manually set up config first:
# python setup_etcd_config.py --hostname agent-001
# python3 run_agent.py --hostname agent-001# Option 1: Let agents auto-initialize with defaults
python3 run_agent.py --hostname agent-001 &
python3 run_agent.py --hostname agent-002 &
python3 run_agent.py --hostname agent-003 &
# Option 2: Manually set up configurations first
python setup_etcd_config.py --hostname agent-001 --interval 5
python setup_etcd_config.py --hostname agent-002 --interval 10
python setup_etcd_config.py --hostname agent-003 --interval 15
python3 run_agent.py --hostname agent-001 &
python3 run_agent.py --hostname agent-002 &
python3 run_agent.py --hostname agent-003 &# Update configuration while agent is running
python setup_etcd_config.py --hostname agent-001 --interval 10 --metrics cpu memory
# Agent will automatically detect and apply the change# Add custom plugin to configuration
python setup_etcd_config.py \
--hostname agent-001 \
--plugins agent.plugins.deduplication.DeduplicationPlugin agent.plugins.my_plugin.MyPluginopen http://localhost:8080# Using etcdctl (if etcd is in Docker)
docker exec -it etcd etcdctl get /monitor/config/agent-001
# Or watch for changes
docker exec -it etcd etcdctl watch /monitor/config/agent-001The agent logs show:
- Configuration loading from etcd
- Plugin loading
- Configuration updates
- Metrics collection
To generate the Python protobuf files from the .proto definition, use the provided script:
./generate_protobuf.shOr manually run:
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. shared/monitoring.protoThis will generate:
shared/monitoring_pb2.py- Message classesshared/monitoring_pb2_grpc.py- gRPC service classes
- agent/: Modular agent with collect, grpc, and plugins modules
- grpc_server/: gRPC server that forwards metrics to Kafka
- analysis_app/: Kafka consumer that displays metrics
- shared/: Protocol definitions and shared configuration
- Create plugin class extending
BasePlugin - Implement
initialize(),run(), andfinalize()methods - Add plugin path to etcd configuration
- Plugin will be loaded automatically
- Ensure etcd is running:
docker ps | grep etcd - Check etcd port:
localhost:2379 - Verify network connectivity
If you see protobuf errors with etcd3:
export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python- Verify etcd watch is active (check agent logs)
- Ensure configuration key matches hostname (default:
/monitor/config/<hostname>) - Check etcd connection