Production-ready table discovery and synchronization orchestrator for YugabyteDB to BigQuery.A production-ready, rock-solid Docker image for streaming Change Data Capture (CDC) events from YugabyteDB to Google BigQuery. Built using battle-tested, industry-standard components for maximum reliability and minimal maintenance.
The Table Sync Orchestrator automatically discovers annotated tables in YugabyteDB and manages their synchronization to BigQuery. It scans all databases, schemas, and tables every 30 seconds, creating BigQuery resources and CDC connectors as needed.- Battle-tested components only: Uses libraries with 5+ years of production use
- Zero custom protocols: Standard Kafka + BigQuery APIs
-
Kubernetes-native: Designed for cloud-native deployments
-
π Automatic Discovery: Scans all databases/schemas/tables for annotations- Minimal maintenance: Self-healing, auto-scaling, graceful degradation
-
π Intelligent Sync: Only syncs when needed (new/changed/missing resources) - AI-assistant friendly: Well-documented, standard patterns, extensive logging
-
β‘ Auto-Provisioning: Creates BigQuery datasets/tables automatically
-
π CDC Management: Creates and manages Kafka Connect CDC connectors## ποΈ Architecture
-
π Production Ready: Health checks, metrics, structured logging
-
π‘οΈ Battle-Tested: Uses proven libraries with 5+ years production experience```
YugabyteDB β Kafka CDC β This Processor β Google BigQuery
[Prometheus Metrics + Health Checks]
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Table Sync Orchestrator β### Core Components
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β- **Kafka Consumer**: `kafka-python` (8+ years in production)
β Every 30s: β- **BigQuery Client**: `google-cloud-bigquery` (Google's official SDK)
β 1. Scan all databases/schemas/tables β- **Observability**: Prometheus + structured logging
β 2. Parse table annotations β- **Health**: Flask endpoints for Kubernetes probes
β 3. Compare with status table β- **Resilience**: Tenacity for retry logic with exponential backoff
β 4. Create BigQuery resources if needed β
β 5. Sync initial data if BigQuery was missing β## π Features
β 6. Create/update CDC connectors β
β 7. Update status table β- **Automatic Table Discovery**: Scans YugabyteDB for tables with bootstrap configuration comments
β β- **Bidirectional Sync**: Supports both YugabyteDB β BigQuery and BigQuery β YugabyteDB data flow
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ- **Real-time CDC**: Uses Debezium connectors for real-time change capture
```- **Smart State Management**: Tracks table sync status in YugabyteDB
- **Auto-provisioning**: Automatically creates BigQuery datasets and tables
## Table Annotations- **Lifecycle Management**: Handles table addition, modification, and removal
- **Health Monitoring**: Built-in health checks and metrics collection
Tables are enabled for sync using comment annotations:- **Docker-ready**: Complete containerized setup with Docker Compose
```sql## π Prerequisites
COMMENT ON TABLE mcp_openapi_ro.mcp_openapi_augmentations IS
'{"bootstrap":{"enabled":true, "bq": "yugabyte_backup.mcp_openapi_augmentations"}}';- Docker and Docker Compose
```- Google Cloud Platform account with BigQuery API enabled
- Service account with BigQuery Admin permissions
## Quick Start- Google Cloud Storage bucket for temporary data transfer
### Docker## ποΈ Architecture
```bash```
# Build the imageβββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
docker build -f Dockerfile.production -t table-sync-orchestrator .β YugabyteDB β β Kafka Connect β β Google Cloud β
β β β (Debezium) β β β
# Run with environment variablesβ βββββββββββββββ β β ββββββββββββββββ β β βββββββββββββββ β
docker run -e YUGABYTEDB_HOST=localhost \β β Tables ββββββββ€ β Connectors β ββββββ€βΊβ BigQuery β β
-e YUGABYTEDB_PORT=5433 \β β with β β β β β β β β Tables β β
-e KAFKA_CONNECT_URL=http://localhost:8083 \β β Comments β β β β β β β β β β
-e GOOGLE_APPLICATION_CREDENTIALS=/app/credentials.json \β βββββββββββββββ β β ββββββββββββββββ β β βββββββββββββββ β
-v /path/to/credentials.json:/app/credentials.json \β β β β β β
table-sync-orchestratorβ βββββββββββββββ β β ββββββββββββββββ β β βββββββββββββββ β
```β β State β β β β Kafka β β β β Cloud β β
β β Table β β β β Topics β β β β Storage β β
### Kubernetesβ βββββββββββββββ β β ββββββββββββββββ β β βββββββββββββββ β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
```bash β² β² β²
kubectl apply -f deployment/production-cdc-processor.yaml β β β
``` βββββββββββββββββββββββββ΄ββββββββββββββββββββββββ
Table Sync Application
## Configuration```
The orchestrator uses YAML configuration with environment variable substitution:## π§ Configuration
```yaml### Environment Variables
# Scanning Configuration
scan_interval_seconds: 30Create a `.env` file (use `src/.env.example` as template):
# YugabyteDB Connection```bash
yugabytedb:# YugabyteDB Configuration
host: yb-tserver-service.yugabyte.svc.cluster.localDATABASE_URL=postgresql://yugabyte:yugabyte@localhost:5433/yugabyte
port: 5433
user: yugabyte# Google Cloud Configuration
password: yugabyteBIGQUERY_PROJECT_ID=your-gcp-project-id
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
# BigQuery Configuration (project auto-derived from service account)TEMP_STORAGE_BUCKET=your-temp-storage-bucket
bigquery:
credentials_path: /vault/secrets/gcp-key.json# Kafka & Debezium Configuration
location: USKAFKA_BOOTSTRAP_SERVERS=localhost:9092
DEBEZIUM_CONNECTOR_URL=http://localhost:8083
# Kafka Connect Configuration
kafka_connect:# Application Configuration
url: http://kafka-connect.kafka.svc.internal.lan:8083SCAN_INTERVAL_SECONDS=30
```LOG_LEVEL=INFO
BATCH_SIZE=10000
## Monitoring```
### Health Checks### Table Bootstrap Configuration
- **Health**: `http://localhost:8080/health`
- **Readiness**: `http://localhost:8080/ready`Add comments to YugabyteDB tables to configure synchronization:
### Metrics (Prometheus)```sql
- **Endpoint**: `http://localhost:8000/metrics`COMMENT ON TABLE public.orders IS $$
- `sync_tables_scanned_total`: Total tables scanned{
- `sync_tables_synced_total`: Total tables synced "bootstrap": {
- `sync_errors_total`: Total sync errors "enabled": true,
- `sync_scan_duration_seconds`: Scan duration histogram "bq": "sales_raw.orders",
- `sync_active_syncs`: Number of active syncs "columns": "id,customer_id,status,total,created_at,updated_at"
}
## Development}
$$;
### Local Setup```
```bash**Configuration Options:**
# Install dependencies- `enabled` (required): Boolean to enable/disable sync
pip install -r requirements.production.txt- `bq` (required): BigQuery destination in format `dataset.table`
- `columns` (optional): Explicit column order for COPY operations
# Copy environment template
cp src/.env.example .env## π Quick Start
# Edit configuration### 1. Setup
vim .env
Set required environment variables:
# Run the orchestrator
python src/table_sync_orchestrator.py```bash
```export BIGQUERY_PROJECT_ID="your-gcp-project"
export GOOGLE_APPLICATION_CREDENTIALS_PATH="/path/to/service-account.json"
### Buildingexport TEMP_STORAGE_BUCKET="your-temp-bucket"
```
```bash
# Use the provided build scriptRun the setup script:
./scripts/build.sh
```bash
# Or build manually./setup.sh
docker build -f Dockerfile.production -t table-sync-orchestrator .```
```
### 2. Start Services
## Directory Structure
```bash
```docker-compose up -d
βββ src/```
β βββ table_sync_orchestrator.py # Main orchestrator application
β βββ __init__.py # Python package marker### 3. Monitor
β βββ .env.example # Environment template
βββ config/```bash
β βββ orchestrator.yaml # Configuration template# View application logs
βββ deployment/docker-compose logs -f table-sync-app
β βββ production-cdc-processor.yaml # Kubernetes deployment
βββ scripts/# Check health status
β βββ build.sh # Build scriptdocker-compose exec table-sync-app python health_check.py health
β βββ health_check.sh # Health check script
βββ .github/workflows/# View sync metrics
β βββ build-production-cdc.yaml # GitHub Actions builddocker-compose exec table-sync-app python health_check.py metrics
βββ Dockerfile.production # Production container
βββ requirements.production.txt # Python dependencies# List tracked tables
```docker-compose exec table-sync-app python health_check.py tables
```
## License
## οΏ½ Schema Initialization
MIT License - see LICENSE file for details.
The application automatically validates and prepares the YugabyteDB schema on startup:
### Automatic Schema Setup
On first startup, the application will:
1. **Test Database Connectivity**: Validate connection and basic permissions
2. **Check Database Capabilities**: Ensure JSONB support and logical replication
3. **Create State Tables**: Set up `table_sync_state` and `table_sync_metadata` tables
4. **Create Indexes**: Add performance indexes for efficient querying
5. **Validate Schema**: Test all operations to ensure everything works
### Manual Schema Testing
You can test the schema initialization independently:
```bash
# Test schema initialization
docker-compose exec table-sync-app python test_schema.py
# Expected output shows:
# - Database connectivity validation
# - Schema preparation steps
# - State table creation and testing
# - Performance validation
```
### Schema Components
The application creates these database objects:
- **`table_sync_state`**: Main state tracking table
- Tracks each table's sync configuration and status
- Stores bootstrap config as JSONB
- Maintains timestamps and status flags
- **`table_sync_metadata`**: Application metadata
- Stores schema version information
- Tracks initialization timestamps
- Future extensibility for app settings
- **Performance Indexes**: Optimized for common queries
- Bootstrap configuration lookups
- Status filtering
- Time-based queries
### Schema Validation
The startup process validates:
- β
Database connectivity and permissions
- β
Required PostgreSQL extensions (uuid-ossp)
- β
JSONB support for configuration storage
- β
Logical replication capabilities for CDC
- β
Complete CRUD operations on state tables
- β
Index creation and performance optimization
If any validation fails, the application will log detailed error messages and exit gracefully.
## οΏ½π How It Works
### 1. Table Discovery
Every 30 seconds, the application scans all YugabyteDB tables for bootstrap configuration comments.
### 2. State Management
Table states are tracked in the `table_sync_state` table:
- Current configuration hash
- BigQuery table status
- Pipeline configuration status
- Last update timestamp
### 3. Synchronization Logic
#### New Table with Bootstrap Config
1. **BigQuery table doesn't exist**:
- Create BigQuery dataset (if needed)
- Create BigQuery table with matching schema
- Copy existing YugabyteDB data to BigQuery
- Set up Debezium connector for real-time CDC
2. **BigQuery table exists**:
- Copy BigQuery data to YugabyteDB (overwrite)
- Set up Debezium connector for real-time CDC
#### Configuration Changes
- **Bootstrap enabled**: Create sync pipeline
- **Bootstrap disabled**: Remove BigQuery table and pipeline
- **Config modified**: Update pipeline configuration
#### Table Removal
- Delete BigQuery table
- Remove Debezium connector
- Clean up state records
### 4. Real-time Sync
Debezium connectors capture all changes (INSERT, UPDATE, DELETE) and stream them through Kafka to BigQuery.
## π οΈ Components
### Core Application (`src/app.py`)
- Main application loop
- Table discovery and state management
- Synchronization orchestration
### Database Manager (`src/app.py`)
- YugabyteDB connection pooling
- State table management
- Schema introspection
### BigQuery Manager (`src/app.py`)
- BigQuery table and dataset management
- Schema mapping from PostgreSQL to BigQuery
- Table lifecycle operations
### Data Transfer Manager (`src/data_transfer.py`)
- Bulk data transfer between YugabyteDB and BigQuery
- Uses Cloud Storage as intermediate staging
- Handles large datasets efficiently
### Debezium Manager (`src/debezium_manager.py`)
- Debezium connector lifecycle management
- YugabyteDB publication management
- Kafka Connect API integration
### Health Check (`src/health_check.py`)
- System health monitoring
- Metrics collection
- Component status validation
## π Monitoring & Observability
### Health Checks
```bash
# Overall system health
docker-compose exec table-sync-app python health_check.py health
# Response includes:
# - YugabyteDB connectivity
# - BigQuery connectivity
# - Debezium Connect API status
# - State table accessibility
```
### Metrics
```bash
# Synchronization metrics
docker-compose exec table-sync-app python health_check.py metrics
# Provides:
# - Total tracked tables
# - Active bootstrap configurations
# - Running pipelines
# - Recent activity
```
### Table Details
```bash
# Detailed table information
docker-compose exec table-sync-app python health_check.py tables
# Shows per-table:
# - Configuration status
# - BigQuery targets
# - Pipeline status
# - Last update times
```
## π Troubleshooting
### Common Issues
1. **Debezium Connector Fails**
- Check YugabyteDB publication exists
- Verify WAL level configuration
- Ensure proper permissions
2. **BigQuery Connection Issues**
- Validate service account permissions
- Check BigQuery API enabled
- Verify credentials file path
3. **Data Transfer Failures**
- Ensure temp bucket exists and is accessible
- Check Cloud Storage permissions
- Verify network connectivity
### Logs
```bash
# Application logs
docker-compose logs table-sync-app
# Kafka Connect logs
docker-compose logs kafka-connect
# YugabyteDB logs
docker-compose logs yugabytedb
```
## π Production Deployment
### Kubernetes Deployment
The application is designed to run in Kubernetes environments:
1. **ConfigMaps**: Store configuration
2. **Secrets**: Store sensitive credentials
3. **Deployments**: Run the sync application
4. **Services**: Expose health check endpoints
5. **ServiceMonitors**: Prometheus monitoring integration
### Scaling Considerations
- **Single Instance**: Recommended to avoid conflicts
- **Database Pooling**: Configure appropriate connection limits
- **Resource Limits**: Set memory/CPU limits based on data volume
- **Storage**: Provision adequate storage for Kafka topics
### Security
- Use Google Cloud Workload Identity for GKE deployments
- Rotate service account keys regularly
- Enable audit logging for BigQuery operations
- Use network policies to restrict traffic
## π€ Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests if applicable
5. Submit a pull request
## π License
This project is licensed under the MIT License - see the LICENSE file for details.
## π Support
For issues and questions:
1. Check the troubleshooting section
2. Review application logs
3. Open an issue with detailed information