FastAPI application with PostgreSQL for Cow Monitoring.
- Requirements
- Quick Start with Docker
- Local Development Setup
- Running Tests
- Project Structure
- API Endpoints
- API Documentation
- Environment Variables
- Database Migrations
- Recreate Database (Destroy & Recreate)
- Utils
- Scalability
- Docker 20.10+
- Docker Compose 2.0+
- Python 3.11+
- PostgreSQL 16+ (running locally or accessible)
- pip (Python package manager)
cd /home/albertovico/repos/cow
cp .env.example .envdocker compose --env-file .env up -dThe API will be available at:
- API: http://localhost:8000
- API Docs: http://localhost:8000/docs
docker-compose downTo remove volumes (including database data):
docker-compose down -vJust restart the API:
docker compose --env-file .env restart apipython -m venv venv
source venv/bin/activate pip install -e ".[dev]"Make sure PostgreSQL is running locally and update .env file:
cp .env.example .env
# Edit .env with your local database credentials
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/cow_dbalembic upgrade headuvicorn app.main:app --reload --host 0.0.0.0 --port 8000The API will be available at http://localhost:8000
source venv/bin/activate
pytest -qpytest tests/ --cov=app --cov-report=term-missingcow/
├── app/ # application package (FastAPI)
│ ├── __init__.py
│ ├── main.py # FastAPI app entrypoint
│ ├── config.py # application settings
│ ├── database.py # DB connection/session helpers
│ ├── models.py # SQLAlchemy models
│ ├── schemas.py # Pydantic schemas
│ └── routes/ # API route modules
├── alembic/ # migration config and versions
│ ├── env.py
│ └── versions/
├── data/ # sample inputs and generated reports
│ ├── input/
│ └── reports/
├── tests/ # unit tests
├── utils/ # helper scripts and notebooks
│ ├── clean_data.sh
│ ├── load_data.py
│ ├── read_sample_data.py
│ ├── explore_data.ipynb
│ └── explore_reports.ipynb
├── docker-compose.yml # Docker Compose configuration
├── Dockerfile # Application container
├── pyproject.toml # Python dependencies and dev extras
├── alembic.ini # Alembic CLI config
└── README.md
POST /api/v1/cows/- Create a new cowGET /api/v1/cows/- List all cowsGET /api/v1/cows/{cow_id}- Get a specific cow
# get cow
http://localhost:8000/api/v1/cows/fa5625d5-d657-40a7-9de9-a52af87aef1f
POST /api/v1/sensors/- Create a new sensorGET /api/v1/sensors/- List all sensorsGET /api/v1/sensors/{sensor_id}- Get a specific sensor
# get sensor
http://localhost:8000/api/v1/sensors/3459d3dd-b662-40eb-931e-931701cbeef7
POST /api/v1/measurements/- Create a new measurementGET /api/v1/measurements/- List all measurementsGET /api/v1/measurements/{measurement_id}- Get a specific measurement
# list measurements by {sensor_id} and {cow_id}
http://localhost:8000/api/v1/measurements/?skip=0&limit=100&cow_id=fa5625d5-d657-40a7-9de9-a52af87aef1f&sensor_id=3459d3dd-b662-40eb-931e-931701cbeef7
GET /api/v1/reports/weights- Get cow weights report
http://localhost:8000/api/v1/reports/weights?date=2024-06-30
GET /api/v1/reports/milk- Get milk production report
http://localhost:8000/api/v1/reports/milk?start_date=2024-06-01&end_date=2024-06-30
Once the application is running, visit:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
| Variable | Description | Default |
|---|---|---|
DATABASE_URL |
PostgreSQL connection string | postgresql+asyncpg://postgres:postgres@db:5432/cow_db |
PROJECT_NAME |
API project name | Cow API |
VERSION |
API version | 0.1.0 |
alembic revision --autogenerate -m "description of changes"alembic upgrade headalembic downgrade -1Warning: these steps will permanently delete the database data stored in the Docker volume. Only run on development or throwaway environments unless you have backups.
- Stop containers and remove volumes (this destroys DB data):
# Stop services and remove volumes (DB data removed)
docker compose --env-file .env down -v- Rebuild and start services (this will recreate the DB container and empty database):
docker compose --env-file .env up -d --build-
Wait for the database to become healthy (simple
pscheck): -
Apply migrations (run inside the
apicontainer so it uses the same environment):
# Run Alembic from the api container
docker compose --env-file .env exec api alembic upgrade head
# Or run locally if your local env points to the recreated DB
alembic upgrade head- (Optional) Re-load initial data using the loader utilities:
python utils/load_data.py --data-dir data/inputSmall utility scripts live under utils/. Below are the primary helpers used during development and data loading.
The execution of these scripts require local development setup:
python -m venv venv
source venv/bin/activate
pip install -e ".[dev]"- Purpose: Find Parquet files and print a short preview and summary for each.
- Location:
utils/read_sample_data.py
CLI usage:
python utils/read_sample_data.py data/input- Purpose: Read
cows.parquet,sensors.parquet, andmeasurements.parquetand POST them to the running API asynchronously. - Location:
utils/load_data.py
Important notes:
- The script POSTs cows and sensors by explicit IDs, and posts measurements to the measurements collection endpoint.
- Use
--dry-runfirst to verify what will be sent (no network requests). - The script takes around ~20 minutes to load 500K measurements.
CLI usage:
# Dry-run (prints payloads only)
python utils/load_data.py --data-dir data/input --dry-run
# Real run (performs HTTP POSTs)
python utils/load_data.py --data-dir data/inputOptions:
--data-dir(default:data/input) — directory withcows.parquet,sensors.parquet,measurements.parquet.--dry-run— print HTTP payloads without sending requests.
- Purpose: Remove all rows from
measurements,sensors, andcowsin the development DB. This runs SQL directly against Postgres (faster and avoids API-level validation). - Location:
utils/clean_data.sh
WARNING: Destructive operation. Always run with --dry-run first and ensure you have backups if needed.
CLI usage:
# Show SQL to be executed (safe)
./utils/clean_data.sh --dry-run
# Execute against the default local DB
./utils/clean_data.sh
# Execute against a custom DB URL
./utils/clean_data.sh --db-url postgresql://user:pass@host:port/dbnameOptions:
--db-url— Postgres connection string (default:postgresql+asyncpg://postgres:postgres@localhost:5433/cow_db).--dry-run— print the SQL and exit.
Behavior details:
- If
psqlis installed locally the script pipes SQL into it. - If
psqlis missing the script will attempt to exec into the runningdbcontainer (docker exec/docker-compose exec) and runpsqlthere.
- Purpose: Free exploration of
measurements,sensors, andcowsparquet datasets with the goal of improving understanding of the data model. - Location:
utils/explore_data.ipynb
- Purpose: Free exploration of
milkandweightsCSV files. Noted some observations and conclusions about the data, including my guess about how to know if a cow is potentially ill. - Location:
utils/explore_reports.ipynb
- To enable capacity for processing high amount of parallel calls efficiently, the API should be moved to a scalable cloud microservices paradigm (Kubernetes, Lambdas).
- Instead of having a single monolitic computing layer acting as a bottleneck, we would have different endpoints in different machines with their own capability to scale independently.
- High volume of measurements being pushed in parallel from the API would crash and lock the database.
- Instead of the API writing directly to the database, it should act as producer to send messages to a Kafka topic (or equivalent service like AWS Kinesis, Azure Eventhubs, etc).
- If the ingestion throughput is very heavy, it's best to move to a NoSQL cloud database, at least for measurements data, like DynamoDB, CosmosDB, Cassandra.
- If the throughput is not that heavy, we could consider using TimescaleDB, which is a Postgres extension optimized for time-series.
- If we keep Postgres, we should move to a cloud host like AWS RDS for improved scalability, and also implement Read Replicas.
- If we setup a Kafka topic, we should add a subscription to move the data to a Data Lake as avro/parquet files, where they would get processed and modelled through the Data Lake layers.
- For cows and sensors data, the API should be used by the Data Lake ingestion processes to import to the landing layer on daily batches.
- If we need a hot path to analyse measurements in Real Time, we could consume the topic from a service like Spark Streaming on Databricks, or AWS Firehose.