Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
__pycache__/
*.py[cod]
*.log
*.sqlite3
*.db
.env
.env.*
.venv/
.idea/
.vscode/
.mypy_cache/
.pytest_cache/
coverage.xml
htmlcov/
exports/
metrics.json
dist/
build/
*.egg-info/
db_utils/__pycache__/
87 changes: 86 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,86 @@
# Citadel
# Sentiment Analysis Pipeline

## Overview

This repository now ships a production-ready sentiment analysis pipeline that ingests
posts from social sources (Twitter and Reddit), scores their sentiment, persists the
results, exports structured data, and records runtime metrics. The pipeline was
refactored to emphasise reliability, observability, and maintainability.

## Key Features

- **Centralised configuration** via `AppConfig` with strict validation of required
environment variables, configurable timeouts, retry counts, and batching controls.
- **Structured logging** across the stack with DEBUG/INFO/WARNING/ERROR semantics.
- **Robust API access** featuring rate limiting, exponential backoff retries,
response validation, and sanitisation of fetched payloads.
- **Async orchestration** that fetches from multiple sources concurrently while
respecting rate limits and tracking batch progress.
- **Modular architecture** with dedicated components for fetchers, analysis,
persistence, exporting, and metrics.
- **Database reliability** through a repository that uses connection pooling,
schema migrations, context-managed operations, and duplicate detection.
- **Data hygiene** by sanitising text, validating timestamps, filtering duplicates,
and converting everything to a consistent timezone.
- **Operational insights** via metrics for processed posts, duplicates, failures,
and per-sentiment distribution alongside optional JSON/CSV exports.

## Environment Variables

| Variable | Description |
| --- | --- |
| `DATABASE_URL` | Database connection string. Supports `sqlite:///path/to.db` or `postgres://user:pass@host:port/db`. |
| `TWITTER_BEARER_TOKEN` | Required when enabling the Twitter fetcher. |
| `REDDIT_CLIENT_ID` / `REDDIT_CLIENT_SECRET` | Required when enabling the Reddit fetcher. |
| `SENTIMENT_SOURCES` | Comma-separated list of fetchers to enable (`twitter,reddit`). Defaults to both. |
| `REQUEST_TIMEOUT` | HTTP request timeout in seconds (default `10`). |
| `MAX_RETRIES` | Number of API retry attempts (default `3`). |
| `RETRY_BACKOFF_FACTOR` | Exponential backoff multiplier (default `2`). |
| `BATCH_SIZE` | Number of posts processed concurrently in each batch (default `25`). |
| `RATE_LIMIT_PER_MINUTE` | Maximum API calls per minute (default `60`). |
| `EXPORT_PATH` | Directory for exported datasets (default `exports`). |
| `METRICS_EXPORT_PATH` | Output path for metrics JSON (default `metrics.json`). |

## Running the Pipeline

1. Ensure Python 3.11+ is available and install dependencies (e.g. `pip install -r requirements.txt`).
2. Export the required environment variables shown above.
3. Launch the CLI:

```bash
python -m sentiment.cli --query "ai" --limit 50 --export --log-level INFO
```

The CLI validates configuration, spins up the async pipeline, streams logs, and writes
metrics/exports on completion.

## Testing

Unit tests cover configuration, sanitisation, the sentiment analyser, repository,
and pipeline orchestration. Execute them with:

```bash
python -m unittest discover tests
```

## Project Structure

```
sentiment/
analyzer.py # Sentiment scoring service
cli.py # Command-line entry point
config.py # Dataclass-backed configuration
exporter.py # JSON/CSV export helpers
fetchers/ # Twitter & Reddit API clients derived from BaseFetcher
metrics.py # Metrics tracking utilities
pipeline.py # Async orchestrator
repository.py # DB access layer with pooling + migrations
sanitizer.py # Text/time validation & sanitisation helpers
```

## Notes

- Postgres deployments require `psycopg2`. SQLite is supported out-of-the-box for
local testing and is used inside the automated unit tests.
- Metrics are exported after each pipeline run and can be ingested by dashboards or
alerting tooling as needed.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
psycopg2-binary>=2.9
6 changes: 6 additions & 0 deletions sentiment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""High level sentiment analysis toolkit."""

from .config import AppConfig # noqa: F401
from .analyzer import SentimentAnalyzer # noqa: F401
from .repository import DataRepository # noqa: F401
from .pipeline import SentimentPipeline # noqa: F401
57 changes: 57 additions & 0 deletions sentiment/analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Lightweight sentiment analyzer service."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Iterable, Set

from .config import AppConfig
from .models import SentimentLabel, SentimentResult


@dataclass
class SentimentAnalyzer:
"""Performs rule-based sentiment analysis on sanitized text."""

config: AppConfig
positive_words: Set[str] = field(default_factory=lambda: {
"great",
"awesome",
"good",
"love",
"happy",
"fantastic",
"win",
"excellent",
})
negative_words: Set[str] = field(default_factory=lambda: {
"bad",
"terrible",
"hate",
"sad",
"awful",
"fail",
"poor",
"angry",
})

def analyze(self, text: str) -> SentimentResult:
"""Return the sentiment label for the given text."""

tokens = [token.lower() for token in text.split() if token]
score = 0
for token in tokens:
if token in self.positive_words:
score += 1
elif token in self.negative_words:
score -= 1

normalized_score = score / max(len(tokens), 1)
if normalized_score >= self.config.sentiment_positive_threshold:
label = SentimentLabel.POSITIVE
elif normalized_score <= self.config.sentiment_negative_threshold:
label = SentimentLabel.NEGATIVE
else:
label = SentimentLabel.NEUTRAL

return SentimentResult(score=normalized_score, label=label)
122 changes: 122 additions & 0 deletions sentiment/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Command line interface for the sentiment pipeline."""

from __future__ import annotations

import argparse
import asyncio
import logging
import sys
from typing import List

from .analyzer import SentimentAnalyzer
from .config import AppConfig
from .exceptions import ConfigurationError
from .exporter import DataExporter
from .fetchers.reddit import RedditFetcher
from .fetchers.twitter import TwitterFetcher
from .logging_utils import configure_logging
from .metrics import MetricsTracker
from .pipeline import SentimentPipeline
from .repository import DataRepository


async def async_main(args: argparse.Namespace) -> int:
"""Run the async pipeline entry point."""

try:
config = AppConfig.from_env()
except ConfigurationError as exc:
logging.getLogger("sentiment").error("Configuration error: %s", exc)
return 1

configure_logging(args.log_level)
logger = logging.getLogger("sentiment.cli")

fetchers: List = []
if "twitter" in config.enabled_sources:
try:
fetchers.append(TwitterFetcher(config))
except Exception as exc:
logger.error("Twitter fetcher initialization failed: %s", exc)
if "reddit" in config.enabled_sources:
try:
fetchers.append(RedditFetcher(config))
except Exception as exc:
logger.error("Reddit fetcher initialization failed: %s", exc)

if not fetchers:
logger.error("No fetchers available. Ensure credentials are configured correctly.")
return 1

repository = DataRepository(config)
analyzer = SentimentAnalyzer(config)
exporter = DataExporter(config.export_path)
metrics = MetricsTracker()

pipeline = SentimentPipeline(
config=config,
repository=repository,
analyzer=analyzer,
fetchers=fetchers,
exporter=exporter,
metrics=metrics,
)

try:
result = await pipeline.run(
query=args.query,
limit=args.limit,
sentiment_filter=args.sentiment,
sort_by=args.sort_by,
descending=args.descending,
export=args.export,
)
logger.info(
"Processed %s posts (duplicates: %s)",
result.metrics["posts_processed"],
result.metrics["duplicates_skipped"],
)
return 0
finally:
repository.close()


def build_parser() -> argparse.ArgumentParser:
"""Create an argument parser for the CLI."""

parser = argparse.ArgumentParser(description="Social sentiment pipeline")
parser.add_argument("--query", default="ai", help="Search query")
parser.add_argument("--limit", type=int, default=25, help="Max items per fetcher")
parser.add_argument(
"--sentiment",
nargs="*",
choices=["positive", "negative", "neutral"],
help="Filter output posts by sentiment",
)
parser.add_argument(
"--sort-by",
choices=["created_at", "author", "sentiment_score"],
default="created_at",
help="Sort column",
)
parser.add_argument("--descending", action="store_true", help="Sort descending")
parser.add_argument("--export", action="store_true", help="Export processed posts")
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Log verbosity",
)
return parser


def main(argv: List[str] | None = None) -> int:
"""Synchronous CLI entry point."""

parser = build_parser()
args = parser.parse_args(argv)
return asyncio.run(async_main(args))


if __name__ == "__main__": # pragma: no cover - manual execution
sys.exit(main())
Loading