Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions .github/workflows/ci-cd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: Credit Card Transactions CICD

on:
push:
branches:
- dev
- main

jobs:
# Job 1: Run ONLY `test_transactions_processing.py` for Dev Branch
run-tests:
if: github.ref == 'refs/heads/dev'
runs-on: ubuntu-latest

steps:
- name: Checkout Code
uses: actions/checkout@v3

- name: Set Up Python
uses: actions/setup-python@v3
with:
python-version: "3.11"

- name: Install Dependencies
run: |
pip install -r requirements.txt

- name: Run Pytest for `test_transactions_processing.py`
run: pytest tests/test_transactions_processing.py

# Job 2: Upload Airflow DAG & PySpark Job to GCS (Only for Main Branch)
deploy-to-prod:
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest

steps:
- name: Checkout Code
uses: actions/checkout@v3

- name: Authenticate to GCP
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}

- name: Setup Google Cloud SDK
uses: google-github-actions/setup-gcloud@v1
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}

# Upload PySpark Job to GCS
- name: Upload Spark Job to GCS
run: |
gsutil cp spark_job.py gs://credit-card-data-analysis/spark_job/

# Upload Airflow DAG to Composer
- name: Upload Airflow DAG to Composer
run: |
gcloud composer environments storage dags import \
--environment airflow-cluster \
--location us-central1 \
--source airflow_job.py
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Airflow DAG that:
- Moves processed files to the archive folder to prevent reprocessing
- test_transactions_processing.py: Unit tests for PySpark application logic, validating transformation and business logic using PyTest.

#### requirements.txt
A simple text file that lists all the Python packages (libraries, modules) and their specific versions needed for a project
#### .github/workflows/ci-cd.yaml:
GitHub Actions workflow that:
- Runs tests using PyTest on dev branch
Expand Down
82 changes: 82 additions & 0 deletions airflow_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import datetime, timedelta
import uuid
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectsWithPrefixExistenceSensor
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

# DAG default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2025, 2, 7),
}

# Define the DAG
with DAG(
dag_id="credit_card_transactions_dataproc_dag",
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:

# Define GCS Bucket & File Pattern
gcs_bucket = "credit-card-data-analysis"
file_pattern = "transactions/transactions_"
source_prefix = "transactions/"
archive_prefix = "archive/"

# Task 1: GCS Sensor (Detects Latest JSON File)
file_sensor = GCSObjectsWithPrefixExistenceSensor(
task_id="check_json_file_arrival",
bucket=gcs_bucket,
prefix=file_pattern,
timeout=600,
poke_interval=30,
mode="poke",
)

# Generate a unique batch ID using UUID
batch_id = f"credit-card-batch-{str(uuid.uuid4())[:8]}" # Shortened UUID

# Task 2: Submit PySpark job to Dataproc Serverless
batch_details = {
"pyspark_batch": {
"main_python_file_uri": f"gs://credit-card-data-analysis/spark_job/spark_job.py"
},
"runtime_config": {
"version": "2.2",
},
"environment_config": {
"execution_config": {
"service_account": "70622048644-compute@developer.gserviceaccount.com",
"network_uri": "projects/psyched-service-442305-q1/global/networks/default",
"subnetwork_uri": "projects/psyched-service-442305-q1/regions/us-central1/subnetworks/default",
}
},
}

pyspark_task = DataprocCreateBatchOperator(
task_id="run_credit_card_processing_job",
batch=batch_details,
batch_id=batch_id,
project_id="psyched-service-442305-q1",
region="us-central1",
gcp_conn_id="google_cloud_default",
)

move_files_to_archive = GCSToGCSOperator(
task_id="move_files_to_archive",
source_bucket=gcs_bucket,
source_object=source_prefix,
destination_bucket=gcs_bucket,
destination_object=archive_prefix,
move_object=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
)


file_sensor >> pyspark_task >> move_files_to_archive
Loading