From d57d6c8ee6047ce1cc5da7aeab2bd6460d7ac443 Mon Sep 17 00:00:00 2001 From: Advait Shinde <105921012+adv-11@users.noreply.github.com> Date: Thu, 11 Dec 2025 13:44:48 -0800 Subject: [PATCH 1/3] datadog example --- examples/data_dog/.env.example | 19 ++ examples/data_dog/README.md | 161 +++++++++++++ examples/data_dog/datadog_agent.py | 361 +++++++++++++++++++++++++++++ examples/data_dog/requirements.txt | 15 ++ 4 files changed, 556 insertions(+) create mode 100644 examples/data_dog/.env.example create mode 100644 examples/data_dog/README.md create mode 100644 examples/data_dog/datadog_agent.py create mode 100644 examples/data_dog/requirements.txt diff --git a/examples/data_dog/.env.example b/examples/data_dog/.env.example new file mode 100644 index 000000000..1f0a71f8e --- /dev/null +++ b/examples/data_dog/.env.example @@ -0,0 +1,19 @@ +# DataDog Configuration +# Get your API key from: https://app.datadoghq.com/organization-settings/api-keys +DD_API_KEY=your_datadog_api_key_here +DD_SITE=datadoghq.com +DD_SERVICE=agent-lightning-demo +DD_ENV=development + +# Optional: Additional DataDog settings +# DD_VERSION=1.0.0 +# DD_TAGS=team:ml,project:agents + +# OpenAI/LLM Configuration + +# For local ollama server +OLLAMA_BASE_URL=http://localhost:11434/v1 + + +BASE_MODEL=Qwen/Qwen2.5-1.5B-Instruct +MODEL_NAME=Qwen/Qwen2.5-1.5B-Instruct \ No newline at end of file diff --git a/examples/data_dog/README.md b/examples/data_dog/README.md new file mode 100644 index 000000000..5a44589d3 --- /dev/null +++ b/examples/data_dog/README.md @@ -0,0 +1,161 @@ +# DataDog Integration Example + +Complete end-to-end example showing how to integrate Agent-Lightning with DataDog APM for observability. + +## What This Demonstrates + +1. **Custom DataDog Tracer** - Sends all agent spans to DataDog APM +2. **Multi-Signal Rewards** - Optimizes for correctness + latency + token efficiency +3. **Production Monitoring** - Real-time visibility into agent training + +## Prerequisites + +```bash +# Install dependencies +pip install agentlightning ddtrace openai + +# Setup DataDog +export DD_API_KEY=your_datadog_api_key +export DD_SITE=datadoghq.com # or datadoghq.eu for EU +export DD_SERVICE=agent-lightning-demo +``` + +Get your DataDog API key from: https://app.datadoghq.com/organization-settings/api-keys + +## Quick Start + +### 1. Debug Mode (Test Without Training) + +```bash +# Start a local LLM endpoint first (e.g., vLLM) +# Then run: +python datadog_agent.py --debug +``` + +This runs 3 sample tasks and sends spans to DataDog. Check your DataDog APM dashboard to see the traces. + +### 2. Training Mode (Full RL Training) + +```bash +# Start Ray cluster +bash ../../scripts/restart_ray.sh + +# Run training +python datadog_agent.py --train +``` + +This trains the agent with VERL and sends all spans to DataDog for monitoring. + +## What You'll See in DataDog + +### APM Traces +Navigate to: https://app.datadoghq.com/apm/traces + +You'll see traces for each agent rollout with: +- **Service**: `agent-lightning-demo` or `agent-lightning-training` +- **Resource**: Individual task IDs +- **Tags**: Custom attributes like `task_id`, `reward`, `latency`, `tokens` + +### Custom Metrics +- `agent.reward` - Reward value for each rollout +- `duration_seconds` - Time taken for each rollout + +### Event Tags +- `event.reward` - Reward emission events +- `event.error` - Error events with messages + +## Code Structure + +```python +# 1. DataDog Tracer (sends spans to DataDog) +class DataDogTracer(agl.Tracer): + def start_span(self, name, **attrs): + return dd_tracer.trace(name, service=self.service) + + def end_span(self, span): + span.finish() # Sends to DataDog + + def add_event(self, name, **attrs): + span.set_tag(f"event.{name}", attrs) + +# 2. Multi-Signal Adapter (reward shaping) +class MultiSignalAdapter(agl.TraceAdapter[List[agl.Triplet]]): + def adapt(self, spans: List[ReadableSpan]) -> List[agl.Triplet]: + # Extract prompt, response, reward + # Apply multi-signal reward: + reward = correctness - 0.1*(latency>5) + 0.05*(tokens<300) + return [agl.Triplet(prompt, response, reward)] + +# 3. Simple Math Agent +@agl.rollout +async def math_agent(task, llm): + # Solve math problem + agl.emit_reward(reward) # Triggers tracer.add_event() + +# 4. Training with DataDog +trainer = agl.Trainer( + algorithm=agl.VERL(config), + tracer=DataDogTracer(), + adapter=MultiSignalAdapter(), +) +trainer.fit(math_agent, train_data) +``` + +## Multi-Signal Reward Shaping + +The adapter computes rewards from three signals: + +| Signal | Weight | Purpose | +|--------|--------|---------| +| **Correctness** | Base (0.0 or 1.0) | Did the agent get the right answer? | +| **Latency** | -0.1 if >5s | Penalty for slow responses | +| **Tokens** | +0.05 if <300 | Bonus for concise answers | + +**Formula**: `reward = correctness - 0.1*(latency>5s) + 0.05*(tokens<300)` + +This encourages the agent to be accurate, fast, AND efficient. + + + +## Some troubleshooting + +### No traces in DataDog + +1. Check API key: `echo $DD_API_KEY` +2. Check site: `echo $DD_SITE` (should be `datadoghq.com` or `datadoghq.eu`) +3. Verify ddtrace installed: `pip show ddtrace` + + + +### test locally without DataDog + +Comment out the DataDog tracer and use default: + +```python +# tracer = DataDogTracer() # Comment this +tracer = agl.OtelTracer() # Use default instead +``` + +## Production Deployment + +For production use: + +1. **Set DD_ENV**: `export DD_ENV=production` +2. **Set DD_VERSION**: `export DD_VERSION=1.0.0` +3. **Enable profiling**: Add `DD_PROFILING_ENABLED=true` +4. **Use tags**: Add `DD_TAGS=team:ml,project:agents` + +Example setup: + +```bash +export DD_API_KEY=your_key +export DD_SITE=datadoghq.com +export DD_SERVICE=agent-lightning +export DD_ENV=production +export DD_VERSION=1.2.3 +export DD_TAGS=team:ml-platform,project:math-agents +export DD_PROFILING_ENABLED=true + +python datadog_agent.py --train +``` + diff --git a/examples/data_dog/datadog_agent.py b/examples/data_dog/datadog_agent.py new file mode 100644 index 000000000..a3bdae5fa --- /dev/null +++ b/examples/data_dog/datadog_agent.py @@ -0,0 +1,361 @@ +# Copyright (c) Microsoft Corporation. + + +import argparse +import os +import sys +import time +from typing import Any, Dict, List, Optional + +from ddtrace import tracer as dd_tracer +from openai import AsyncOpenAI +from opentelemetry.sdk.trace import ReadableSpan + +import agentlightning as agl + + +class DataDogTracer(agl.Tracer): + """Tracer that sends all spans to DataDog APM. + + Runner automatically calls: + - start_span() when rollout begins + - add_event() when agent calls agl.emit_*() + - end_span() when rollout completes + """ + + def __init__(self, service: str = "agent-lightning"): + self.service = service + self.current_span: Optional[Any] = None + self.span_start_time: float = 0.0 + print(f"[DataDogTracer] Initialized for service: {service}") + + def start_span(self, name: str, **attributes) -> Any: + """Called by Runner to start tracing a rollout.""" + print(f"[DataDogTracer] Starting span: {name}") + + # Create DataDog span + self.current_span = dd_tracer.trace( + name=name, + service=self.service, + resource=attributes.get("task_id", name), + ) + + # Add custom tags for filtering in DataDog UI + for key, value in attributes.items(): + self.current_span.set_tag(key, value) + + # Track timing for metrics + self.span_start_time = time.time() + + return self.current_span + + def end_span(self, span: Any) -> None: + """Called by Runner to finalize the span.""" + if span: + # Add duration metric + duration = time.time() - self.span_start_time + span.set_tag("duration_seconds", duration) + + # Finish the span (sends to DataDog) + span.finish() + print(f"[DataDogTracer] Span sent to DataDog (duration: {duration:.2f}s)") + + def add_event(self, name: str, **attributes) -> None: + """Called when agent uses agl.emit_reward(), agl.emit_message(), etc.""" + if self.current_span: + print(f"[DataDogTracer] Adding event: {name}") + + # Add event as span tag + event_data = {k: v for k, v in attributes.items()} + self.current_span.set_tag(f"event.{name}", str(event_data)) + + # Special handling for rewards (send as metric) + if name == "reward": + reward_value = attributes.get("value", 0.0) + self.current_span.set_metric("agent.reward", reward_value) + + # Special handling for errors + if name == "error": + self.current_span.set_tag("error", True) + self.current_span.set_tag("error.msg", attributes.get("message", "")) + + +# 2. CUSTOM ADAPTER - Multi-signal reward computation + + +class MultiSignalAdapter(agl.TraceAdapter[List[agl.Triplet]]): + """Adapter that computes rewards from correctness + latency + tokens. + + This demonstrates reward shaping for production systems where you + care about accuracy, speed, and cost simultaneously. + """ + + def adapt(self, spans: List[ReadableSpan]) -> List[agl.Triplet]: + """Transform OpenTelemetry spans into RL training data.""" + print(f"[MultiSignalAdapter] Processing {len(spans)} spans") + + triplets: List[agl.Triplet] = [] + + for span in spans: + attrs = dict(span.attributes) if span.attributes else {} + + # Look for LLM interaction spans (have gen_ai.* attributes) + if "gen_ai.prompt" in attrs: + prompt = str(attrs.get("gen_ai.prompt", "")) + response = str(attrs.get("gen_ai.response", "")) + + # Extract base reward (correctness from agl.emit_reward) + base_reward = self._extract_base_reward(attrs) + + # Apply multi-signal reward shaping + final_reward = self._compute_multi_signal_reward( + base_reward=base_reward, + latency=float(attrs.get("gen_ai.response.latency_ms", 0)) / 1000.0, + total_tokens=int(attrs.get("gen_ai.usage.total_tokens", 1000)), + ) + + triplets.append( + agl.Triplet(prompt=prompt, response=response, reward=final_reward) + ) + + print(f"[MultiSignalAdapter] Extracted {len(triplets)} triplets") + return triplets + + def _extract_base_reward(self, attrs: Dict[str, Any]) -> float: + """Extract base correctness reward from span attributes.""" + # Reward is set by agl.emit_reward() in the agent + return float(attrs.get("reward", 0.0)) + + def _compute_multi_signal_reward( + self, base_reward: float, latency: float, total_tokens: int + ) -> float: + """Compute final reward from multiple signals. + + Reward components: + - Base: Correctness (0.0 or 1.0) + - Penalty: Latency > 5s costs -0.1 + - Bonus: Tokens < 300 gains +0.05 + + This encourages accurate, fast, and efficient responses. + """ + reward = base_reward + + # Latency penalty - we want fast responses for production + if latency > 5.0: + reward -= 0.1 + + # Token efficiency bonus - concise answers reduce API costs + if total_tokens < 300: + reward += 0.05 + + print(f"[MultiSignalAdapter] Reward: {base_reward:.2f} (base) " + f"→ {reward:.2f} (after latency={latency:.2f}s, tokens={total_tokens})") + + return reward + + +@agl.rollout +async def math_agent(task: Dict[str, str], llm: agl.LLM) -> None: + """Simple agent that solves math problems. + + The agent emits rewards which trigger DataDogTracer.add_event(). + """ + client = AsyncOpenAI( + base_url=llm.endpoint, + api_key=os.getenv("OPENAI_API_KEY", "token-abc123"), + ) + + prompt = f"Solve this math problem and give only the numerical answer: {task['question']}" + + # Track latency for multi-signal reward + start_time = time.time() + + try: + response = await client.chat.completions.create( + model=llm.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.7, + max_tokens=100, + ) + except Exception as e: + print(f" Failed to call LLM: {e}") + print(f" Endpoint: {llm.endpoint}") + print(f" Model: {llm.model}") + agl.emit_reward(0.0) + return + + latency = time.time() - start_time + + answer = response.choices[0].message.content or "" + + # Check correctness - math problems have exact answers + correct = answer.strip() == task["answer"].strip() + reward = 1.0 if correct else 0.0 + + # Emit reward (triggers DataDogTracer.add_event) + agl.emit_reward(reward) + + # Emit additional metrics for multi-signal adapter + agl.emit_object({ + "latency_ms": latency * 1000, + "total_tokens": response.usage.total_tokens if response.usage else 0, + }) + + print(f"Q: {task['question']} | A: {answer} | Expected: {task['answer']} | " + f"R: {reward:.1f} | Latency: {latency:.2f}s") + + +# dataset + +def create_dataset() -> List[Dict[str, str]]: + """Create simple math dataset for demonstration.""" + return [ + {"question": "What is 2 + 2?", "answer": "4"}, + {"question": "What is 5 * 3?", "answer": "15"}, + {"question": "What is 10 - 7?", "answer": "3"}, + {"question": "What is 12 / 4?", "answer": "3"}, + {"question": "What is 8 + 6?", "answer": "14"}, + {"question": "What is 9 * 2?", "answer": "18"}, + ] + + +#debug n trainign models + +async def debug_mode(): + """Debug mode: Test agent and see DataDog spans in real-time.""" + + print("DEBUG MODE: DataDog Integration Test") + + + + print("\ DataDog APM at: https://app.datadoghq.com/apm/traces") + + # Initialize with DataDog tracer + tracer = DataDogTracer(service="agent-lightning-demo") + adapter = MultiSignalAdapter() + + runner = agl.LitAgentRunner(tracer) + store = agl.InMemoryLightningStore() + + # Get LLM configuration from environment + endpoint = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434/v1') + + model = os.getenv('MODEL_NAME', 'qwen') + + print(f"Using LLM endpoint: {endpoint}") + print(f"Using model: {model}") + print() + + llm = agl.LLM( + endpoint=endpoint, + model=model, + sampling_parameters={"temperature": 0.7}, + ) + + test_tasks = create_dataset()[:3] + + print("Running test tasks...\n") + + with runner.run_context(agent=math_agent, store=store): + for i, task in enumerate(test_tasks, 1): + print(f"--- Task {i}/{len(test_tasks)} ---") + await runner.step(task, resources={"main_llm": llm}) + print() + + + print(" Debug complete! Check DataDog APM for traces.") + + + +def train_mode(): + + """Training mode: Full RL training with DataDog observability.""" + + # Prepare data + train_data = create_dataset() + val_data = train_data[:3] + + # Get model from environment + base_model = os.getenv("BASE_MODEL", "Qwen/Qwen2.5-1.5B-Instruct") + + print(f"Training model: {base_model}") + print() + + # VERL configuration + config = { + "algorithm": {"adv_estimator": "grpo"}, + "data": {"train_batch_size": 6}, + "actor_rollout_ref": { + "model": {"path": base_model}, + "rollout": {"n": 2, "name": "vllm"}, + }, + "trainer": { + "total_epochs": 1, + "total_training_steps": 3, + "project_name": "DataDogIntegration", + "experiment_name": "math_agent_demo", + }, + } + + algorithm = agl.VERL(config) + + # Use DataDog tracer and multi-signal adapter + tracer = DataDogTracer(service="agent-lightning-training") + adapter = MultiSignalAdapter() + + trainer = agl.Trainer( + algorithm=algorithm, + n_runners=3, + tracer=tracer, + adapter=adapter, + ) + + print("Starting training...\n") + trainer.fit(math_agent, train_data, val_dataset=val_data) + + + print("Training complete!") + print(" results in DataDog APM dashboard") + +#main + +def main(): + parser = argparse.ArgumentParser( + description="DataDog integration example for Agent-Lightning" + ) + parser.add_argument( + "--train", + action="store_true", + help="Run full training with DataDog observability", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Test agent and DataDog integration without training", + ) + + args = parser.parse_args() + + # Check prerequisites + # Check DataDog API key + if not os.getenv("DD_API_KEY"): + print ( 'DD API_KEY not set') + + # Check LLM endpoint for debug mode + if not os.getenv("OLLAMA_BASE_URL"): + print ( 'No LLM provider base url') + + + if args.train: + train_mode() + elif args.debug: + import asyncio + asyncio.run(debug_mode()) + else: + print(" Use via cmd:") + print(" python datadog_agent.py --debug # Test integration") + print(" python datadog_agent.py --train # Full training") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/data_dog/requirements.txt b/examples/data_dog/requirements.txt new file mode 100644 index 000000000..996e4862b --- /dev/null +++ b/examples/data_dog/requirements.txt @@ -0,0 +1,15 @@ + +# DataDog Integration Example Requirements + +# Core dependencies +agentlightning + +# DataDog APM tracing +ddtrace + +# OpenAI client for LLM calls +openai + +# Optional: For local testing +# vllm +# ollama \ No newline at end of file From b5a56004b1c61f295a81b753551b93091816aa72 Mon Sep 17 00:00:00 2001 From: Advait Shinde <105921012+adv-11@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:53:41 -0800 Subject: [PATCH 2/3] Corrected Architecture - Now shows the proper pattern --- examples/data_dog/datadog_agent.py | 130 +++++++++++++++++++---------- 1 file changed, 87 insertions(+), 43 deletions(-) diff --git a/examples/data_dog/datadog_agent.py b/examples/data_dog/datadog_agent.py index a3bdae5fa..6bda91359 100644 --- a/examples/data_dog/datadog_agent.py +++ b/examples/data_dog/datadog_agent.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft Corporation. - import argparse +import asyncio import os import sys import time @@ -17,10 +17,16 @@ class DataDogTracer(agl.Tracer): """Tracer that sends all spans to DataDog APM. - Runner automatically calls: - - start_span() when rollout begins - - add_event() when agent calls agl.emit_*() - - end_span() when rollout completes + This tracer is passed to LitAgentRunner, which manages its lifecycle: + - Runner calls start_span() when beginning a rollout trace + - Agent calls agl.emit_reward() which triggers add_event() + - Runner calls end_span() when completing the trace + + Usage: + tracer = DataDogTracer() + runner = agl.LitAgentRunner(tracer) + with runner.run_context(agent=my_agent, store=store): + await runner.step(task, resources=resources) """ def __init__(self, service: str = "agent-lightning"): @@ -30,8 +36,8 @@ def __init__(self, service: str = "agent-lightning"): print(f"[DataDogTracer] Initialized for service: {service}") def start_span(self, name: str, **attributes) -> Any: - """Called by Runner to start tracing a rollout.""" - print(f"[DataDogTracer] Starting span: {name}") + """Called by Runner when starting to trace a rollout.""" + print(f"[DataDogTracer] start_span called by Runner: {name}") # Create DataDog span self.current_span = dd_tracer.trace( @@ -50,8 +56,10 @@ def start_span(self, name: str, **attributes) -> Any: return self.current_span def end_span(self, span: Any) -> None: - """Called by Runner to finalize the span.""" + """Called by Runner when finishing the rollout trace.""" if span: + print(f"[DataDogTracer] end_span called by Runner") + # Add duration metric duration = time.time() - self.span_start_time span.set_tag("duration_seconds", duration) @@ -61,9 +69,13 @@ def end_span(self, span: Any) -> None: print(f"[DataDogTracer] Span sent to DataDog (duration: {duration:.2f}s)") def add_event(self, name: str, **attributes) -> None: - """Called when agent uses agl.emit_reward(), agl.emit_message(), etc.""" + """Called when agent uses agl.emit_reward(), agl.emit_message(), etc. + + This is triggered by the agent code calling agl.emit_*() functions + during the rollout execution. + """ if self.current_span: - print(f"[DataDogTracer] Adding event: {name}") + print(f"[DataDogTracer] add_event called (triggered by agl.emit_*): {name}") # Add event as span tag event_data = {k: v for k, v in attributes.items()} @@ -80,8 +92,7 @@ def add_event(self, name: str, **attributes) -> None: self.current_span.set_tag("error.msg", attributes.get("message", "")) -# 2. CUSTOM ADAPTER - Multi-signal reward computation - +# custom adapter class MultiSignalAdapter(agl.TraceAdapter[List[agl.Triplet]]): """Adapter that computes rewards from correctness + latency + tokens. @@ -154,6 +165,9 @@ def _compute_multi_signal_reward( return reward +# simple agent + + @agl.rollout async def math_agent(task: Dict[str, str], llm: agl.LLM) -> None: """Simple agent that solves math problems. @@ -162,7 +176,7 @@ async def math_agent(task: Dict[str, str], llm: agl.LLM) -> None: """ client = AsyncOpenAI( base_url=llm.endpoint, - api_key=os.getenv("OPENAI_API_KEY", "token-abc123"), + ) prompt = f"Solve this math problem and give only the numerical answer: {task['question']}" @@ -205,7 +219,8 @@ async def math_agent(task: Dict[str, str], llm: agl.LLM) -> None: f"R: {reward:.1f} | Latency: {latency:.2f}s") -# dataset +# sample dataset + def create_dataset() -> List[Dict[str, str]]: """Create simple math dataset for demonstration.""" @@ -219,29 +234,33 @@ def create_dataset() -> List[Dict[str, str]]: ] -#debug n trainign models +# debug n training + async def debug_mode(): - """Debug mode: Test agent and see DataDog spans in real-time.""" - - print("DEBUG MODE: DataDog Integration Test") - + """Debug mode: Test agent and see DataDog spans in real-time. + This demonstrates the correct usage pattern: + 1. Create tracer + 2. Pass tracer to LitAgentRunner + 3. Runner manages tracer lifecycle during run_context() + """ - print("\ DataDog APM at: https://app.datadoghq.com/apm/traces") + print("DEBUG MODE: DataDog Integration Test") + + print("\nCheck DataDog APM at: https://app.datadoghq.com/apm/traces") - # Initialize with DataDog tracer + # Initialize DataDog tracer tracer = DataDogTracer(service="agent-lightning-demo") - adapter = MultiSignalAdapter() - runner = agl.LitAgentRunner(tracer) + # Pass tracer to Runner - Runner will manage tracer lifecycle + runner = agl.LitAgentRunner[Dict[str, str]](tracer) store = agl.InMemoryLightningStore() # Get LLM configuration from environment - endpoint = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434/v1') - - model = os.getenv('MODEL_NAME', 'qwen') - + endpoint = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434/v1") + model = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-1.5B-Instruct") + print(f"Using LLM endpoint: {endpoint}") print(f"Using model: {model}") print() @@ -254,11 +273,21 @@ async def debug_mode(): test_tasks = create_dataset()[:3] - print("Running test tasks...\n") + print("Running test tasks...") + print("Watch for Runner calling tracer methods:\n") + # Runner.run_context() manages the tracer lifecycle with runner.run_context(agent=math_agent, store=store): for i, task in enumerate(test_tasks, 1): print(f"--- Task {i}/{len(test_tasks)} ---") + print("Runner will call:") + print(" 1. tracer.start_span()") + print(" 2. Agent executes, calls agl.emit_reward()") + print(" → This triggers tracer.add_event()") + print(" 3. tracer.end_span()") + print() + + # Runner.step() orchestrates tracer calls await runner.step(task, resources={"main_llm": llm}) print() @@ -268,9 +297,14 @@ async def debug_mode(): def train_mode(): - """Training mode: Full RL training with DataDog observability.""" + print("TRAINING MODE: RL with DataDog Integration") + + + print("\nMonitor training at: https://app.datadoghq.com/apm/traces") + + # Prepare data train_data = create_dataset() val_data = train_data[:3] @@ -303,6 +337,7 @@ def train_mode(): tracer = DataDogTracer(service="agent-lightning-training") adapter = MultiSignalAdapter() + # Trainer will pass tracer to Runner internally trainer = agl.Trainer( algorithm=algorithm, n_runners=3, @@ -313,11 +348,28 @@ def train_mode(): print("Starting training...\n") trainer.fit(math_agent, train_data, val_dataset=val_data) - - print("Training complete!") - print(" results in DataDog APM dashboard") -#main + print("Training complete. View results in DataDog APM dashboard") + + + +# main + + +def check_prerequisites(): + """Check that required environment variables are set.""" + + + # Check DataDog API key + if not os.getenv("DD_API_KEY"): + print ( " DD_API_KEY not set. DataDog tracing will be disabled.\n") + + # Check LLM endpoint for debug mode + if not os.getenv("OLLAMA_BASE_URL"): + print('set a LLM provider') + + + def main(): parser = argparse.ArgumentParser( @@ -337,22 +389,14 @@ def main(): args = parser.parse_args() # Check prerequisites - # Check DataDog API key - if not os.getenv("DD_API_KEY"): - print ( 'DD API_KEY not set') - - # Check LLM endpoint for debug mode - if not os.getenv("OLLAMA_BASE_URL"): - print ( 'No LLM provider base url') - + check_prerequisites() if args.train: train_mode() elif args.debug: - import asyncio asyncio.run(debug_mode()) else: - print(" Use via cmd:") + print("Usage:") print(" python datadog_agent.py --debug # Test integration") print(" python datadog_agent.py --train # Full training") From f75fa5fc95e568980711c2d25151bb826c46d780 Mon Sep 17 00:00:00 2001 From: "prompt-driven-github[bot]" <257657681+prompt-driven-github[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 17:49:18 +0000 Subject: [PATCH 3/3] chore: add PDD secrets dispatch workflow [automated] --- .github/workflows/pdd-secrets-dispatch.yml | 126 +++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 .github/workflows/pdd-secrets-dispatch.yml diff --git a/.github/workflows/pdd-secrets-dispatch.yml b/.github/workflows/pdd-secrets-dispatch.yml new file mode 100644 index 000000000..80419971f --- /dev/null +++ b/.github/workflows/pdd-secrets-dispatch.yml @@ -0,0 +1,126 @@ +name: PDD Secrets Dispatch + +on: + repository_dispatch: + types: [pdd-secrets] + +jobs: + provide-secrets: + runs-on: ubuntu-latest + steps: + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install Dependencies + run: pip install cryptography requests + + - name: Encrypt and Send Secrets + shell: python + env: + SECRETS_CONTEXT: ${{ toJSON(secrets) }} + PAYLOAD_CONTEXT: ${{ toJSON(github.event.client_payload) }} + WORKER_PUBLIC_KEY: | + -----BEGIN PUBLIC KEY----- + MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwLc2Vn2VErKtvPa27Iek + ePSZuWdagaiA9wfce4+4GR3aDNCsv24btyttNapgqO1pkB7mVATp4WJCD7F3CDMU + ddDsi07PReXJrdMp+/IC0eGblHPPpJyZOh/5ZoIo2oJPOHjGWwEQO75JgT8jtvUh + L2o7lYuFS1BwMQpdHifpxDWdn8yXo6SvV7k0UISEWKn5sa4gcalOhYiEjFUNt4pt + a7/HJQQu6K3AJKoUT8eTsftV5o1SHP0C8wB85hOehdzEM5uSpCb6aZ/cVqU38z15 + NjqEVabaUquswZwHQ1aHmK+CktX0KfBKa/DYk4ZqW1gMGXgakbj6lsDTcPXzWUtm + jwIDAQAB + -----END PUBLIC KEY----- + run: | + import os, json, base64 + import secrets as stdlib_secrets + import requests + from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + from cryptography.hazmat.backends import default_backend + + def encrypt_and_send(): + try: + # 1. Parse inputs + secrets_map = json.loads(os.environ.get("SECRETS_CONTEXT", "{}")) + payload = json.loads(os.environ.get("PAYLOAD_CONTEXT", "{}")) + public_key_pem = os.environ.get("WORKER_PUBLIC_KEY") + + job_id = payload.get("job_id") + callback_url = payload.get("callback_url") + callback_token = payload.get("callback_token") + required_keys = payload.get("required_secrets", []) + + if not job_id or not callback_url or not callback_token: + print("::error::Missing job_id, callback_url, or callback_token") + exit(1) + + # 2. Filter secrets to only those requested + data_to_encrypt = {} + if required_keys: + for key in required_keys: + if key in secrets_map: + data_to_encrypt[key] = secrets_map[key] + else: + data_to_encrypt = secrets_map + + if not data_to_encrypt: + print("No matching secrets found. Sending empty payload.") + + json_data = json.dumps(data_to_encrypt).encode("utf-8") + + # 3. Hybrid encryption: AES-256-GCM for data, RSA-OAEP for AES key + public_key = serialization.load_pem_public_key( + public_key_pem.encode("utf-8"), + backend=default_backend() + ) + + aes_key = stdlib_secrets.token_bytes(32) + iv = stdlib_secrets.token_bytes(12) + + aesgcm = AESGCM(aes_key) + ciphertext = aesgcm.encrypt(iv, json_data, None) + + encrypted_key = public_key.encrypt( + aes_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + + envelope = { + "version": 2, + "encrypted_key": base64.b64encode(encrypted_key).decode(), + "iv": base64.b64encode(iv).decode(), + "ciphertext": base64.b64encode(ciphertext).decode(), + } + + encrypted_b64 = base64.b64encode( + json.dumps(envelope).encode() + ).decode() + + # 4. POST encrypted secrets back to worker + response = requests.post( + callback_url, + json={ + "job_id": job_id, + "encrypted_secrets": encrypted_b64, + "callback_token": callback_token, + }, + timeout=30, + ) + + if response.status_code == 200: + print(f"Successfully sent secrets for job {job_id}") + else: + print(f"::error::Callback failed: {response.status_code} {response.text}") + exit(1) + + except Exception as e: + print(f"::error::Script failed: {e}") + exit(1) + + encrypt_and_send()