Skip to content
Merged
11 changes: 6 additions & 5 deletions src/main/scala/com/async2databricks/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import com.async2databricks.utils.SafeFileOps.*
object Main extends IOApp with CatsLogger {

override def run(args: List[String]): IO[ExitCode] = {
logger.info("Application starting...")
val ioLogger = logger[IO]
ioLogger.info("Application starting...")

val program = for {
// Load configuration
config <- loadConfig[AppConfig]("application.conf")

_ <- logger.info("Configuration loaded successfully")
_ <- logger.info(s"Database: ${config.database.url}")
_ <- logger.info(s"S3 Bucket: ${config.s3.bucket}")
_ <- ioLogger.info("Configuration loaded successfully")
_ <- ioLogger.info(s"Database: ${config.database.url}")
_ <- ioLogger.info(s"S3 Bucket: ${config.s3.bucket}")

// Run ETL pipeline
pipeline = EtlPipeline[IO](config)
Expand All @@ -29,7 +30,7 @@ object Main extends IOApp with CatsLogger {

program
.handleErrorWith { error =>
logger
ioLogger
.error(s"Application failed with error: ${error.getMessage}")
.as(ExitCode.Error)
}
Expand Down
14 changes: 0 additions & 14 deletions src/main/scala/com/async2databricks/database/DataRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import com.async2databricks.model.SampleData
import com.typesafe.scalalogging.LazyLogging
import doobie.*
import doobie.implicits.*
import doobie.postgres.implicits.*
import fs2.Stream
import java.time.LocalDateTime

trait DataRepository[F[_]] {

Expand All @@ -20,15 +18,6 @@ object DataRepository extends LazyLogging {

def apply[F[_]: Async](xa: Transactor[F]): DataRepository[F] =
new DataRepository[F] {

/** Implicit reader for SampleData - using tuple destructuring
*/
implicit val sampleDataRead: Read[SampleData] =
Read[(Long, String, Double, String, LocalDateTime)].map {
case (id, name, value, category, createdAt) =>
SampleData(id, name, value, category, createdAt)
}

override def streamData(
query: String,
batchSize: Int
Expand All @@ -42,9 +31,6 @@ object DataRepository extends LazyLogging {
.transact(xa)
.chunkN(batchSize)
.flatMap(chunk => Stream.chunk(chunk))
.evalTap(_ =>
Async[F].delay(logger.debug("Fetched record from database"))
)
}
}
}
77 changes: 35 additions & 42 deletions src/main/scala/com/async2databricks/etl/EtlPipeline.scala
Original file line number Diff line number Diff line change
@@ -1,60 +1,53 @@
package com.async2databricks.etl

import cats.effect.*
import cats.effect.Async
import cats.implicits.*
import com.async2databricks.config.AppConfig
import com.async2databricks.database.DataRepository
import com.async2databricks.database.DatabaseConnection
import com.async2databricks.s3.S3Writer
import com.typesafe.scalalogging.LazyLogging
import com.async2databricks.utils.CatsLogger

/** Main ETL Pipeline orchestrator Streams data from PostgreSQL and writes to S3
* as Parquet
*/
class EtlPipeline[F[_]: Async](config: AppConfig) extends LazyLogging {
class EtlPipeline[F[_]: Async](config: AppConfig) extends CatsLogger {

/** Execute the ETL pipeline
*/
def run(): F[Unit] = {
logger.info("Starting ETL Pipeline")

val resources = for {
// Create database transactor
xa <- DatabaseConnection.createTransactor[F](config.database)

// Create S3 writer
s3Writer <- S3Writer[F](config.s3)

} yield (xa, s3Writer)

resources
.use { case (xa, s3Writer) =>
for {
_ <- Async[F].delay(
logger.info("Resources initialized, starting data extraction")
)

// Create repository
repo = DataRepository[F](xa)

// Stream data from database
dataStream = repo.streamData(config.etl.query, config.etl.batchSize)

// Generate output path
outputPath = S3Writer.generateOutputPath(config.s3.prefix)

// Write to S3
_ <- s3Writer.writeParquet(dataStream, outputPath)

_ <- Async[F].delay(
logger.info("ETL Pipeline completed successfully")
)
} yield ()
}
.handleErrorWith { error =>
Async[F].delay(logger.error("ETL Pipeline failed", error)) *>
Async[F].raiseError(error)
}
logger[F].info("Starting ETL Pipeline") >> {
val resources = for {
// Create database transactor
xa <- DatabaseConnection.createTransactor[F](config.database)
// Create S3 writer
s3Writer <- S3Writer[F](config.s3)
} yield (xa, s3Writer)

resources
.use { case (xa, s3Writer) =>
for {
_ <- logger[F].info(
"Resources initialized, starting data extraction"
)
// Create repository
repo = DataRepository[F](xa)
// Stream data from database
dataStream = repo.streamData(config.etl.query, config.etl.batchSize)
// Generate output path
outputPath <- Async[F].delay(
S3Writer.generateOutputPath(config.s3.prefix)
)
// Write to S3
_ <- s3Writer.writeParquet(dataStream, outputPath)
_ <- logger[F].info("ETL Pipeline completed successfully")
} yield ()
}
.handleErrorWith { error =>
logger[F].error(error)("ETL Pipeline failed during ETL run") *>
Async[F].raiseError(error)
}
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/com/async2databricks/model/SampleData.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.async2databricks.model

import doobie.postgres.implicits.JavaLocalDateTimeMeta
import doobie.util.Read
import doobie.util.Write
import java.time.LocalDateTime

/** Sample data model representing a row from the database This is a generic
Expand All @@ -11,4 +14,5 @@ case class SampleData(
value: Double,
category: String,
createdAt: LocalDateTime
)
) derives Read,
Write
167 changes: 91 additions & 76 deletions src/main/scala/com/async2databricks/s3/S3Writer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import cats.effect.*
import cats.implicits.*
import com.async2databricks.config.S3Config
import com.async2databricks.model.SampleData
import com.async2databricks.utils.CatsLogger
import com.github.mjakubowski84.parquet4s.ParquetWriter
import com.github.mjakubowski84.parquet4s.Path as ParquetPath
import com.typesafe.scalalogging.LazyLogging
import fs2.Stream
import java.net.URI
import java.time.LocalDateTime
Expand All @@ -26,109 +26,124 @@ trait S3Writer[F[_]] {
def writeParquet(data: Stream[F, SampleData], outputPath: String): F[Unit]
}

object S3Writer extends LazyLogging {
object S3Writer extends CatsLogger {

/** Creates an S3 client configured for LocalStack or AWS
*/
def createS3Client(config: S3Config): Resource[IO, S3Client] = {
def createS3Client[F[_]: Async](config: S3Config): Resource[F, S3Client] = {
val log = logger[F]
Resource.make {
IO.delay {
val credentialsProvider = StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.accessKey, config.secretKey)
)
for {
client <- Async[F].delay {
val credentialsProvider = StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.accessKey, config.secretKey)
)

val builder = S3Client
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.of(config.region))
val builder = S3Client
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.of(config.region))

// Use custom endpoint for LocalStack
val client =
// Use custom endpoint for LocalStack
if (
config.endpoint.nonEmpty && config.endpoint != "https://s3.amazonaws.com"
) {
builder
.endpointOverride(URI.create(config.endpoint))
.build()
builder.endpointOverride(URI.create(config.endpoint)).build()
} else {
builder.build()
}

logger.info(s"S3 client created for endpoint: ${config.endpoint}")
client
}
}(client => IO.delay(client.close()))
}
_ <- log.info(s"S3 client created for endpoint: ${config.endpoint}")
} yield client
}(client => Async[F].delay(client.close()))
}

/** Ensures the S3 bucket exists, creates it if not
*/
def ensureBucket(s3Client: S3Client, bucketName: String): IO[Unit] = {
IO.delay {
try {
s3Client.headBucket(
HeadBucketRequest.builder().bucket(bucketName).build()
)
logger.info(s"Bucket $bucketName already exists")
} catch {
case _: NoSuchBucketException =>
logger.info(s"Creating bucket $bucketName")
s3Client.createBucket(
CreateBucketRequest.builder().bucket(bucketName).build()
def ensureBucket[F[_]: Async](
s3Client: S3Client,
bucketName: String
): F[Unit] = {
val log = logger[F]
Async[F]
.delay {
try {
s3Client.headBucket(
HeadBucketRequest.builder().bucket(bucketName).build()
)
logger.info(s"Bucket $bucketName created successfully")
Right(())
} catch {
case _: NoSuchBucketException =>
Left(())
}
}
.flatMap {
case Right(_) =>
log.info(s"Bucket $bucketName already exists")
case Left(_) =>
for {
_ <- log.info(s"Creating bucket $bucketName")
_ <- Async[F].delay {
s3Client.createBucket(
CreateBucketRequest.builder().bucket(bucketName).build()
)
}
_ <- log.info(s"Bucket $bucketName created successfully")
} yield ()
}
}
}

def apply[F[_]: Async](config: S3Config): Resource[F, S3Writer[F]] = {
Resource.eval(Async[F].delay(new S3Writer[F] {
override def writeParquet(
data: Stream[F, SampleData],
outputPath: String
): F[Unit] = {
// Configure Hadoop for S3 access
val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.set("fs.s3a.access.key", config.accessKey)
hadoopConf.set("fs.s3a.secret.key", config.secretKey)
hadoopConf.set("fs.s3a.endpoint", config.endpoint)
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set(
"fs.s3a.connection.ssl.enabled",
"false"
) // For LocalStack

Async[F]
.delay {
logger.info(
val log = logger[F]
Resource.eval(
new S3Writer[F] {
override def writeParquet(
data: Stream[F, SampleData],
outputPath: String
): F[Unit] = {
// Configure Hadoop for S3 access
val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.set("fs.s3a.access.key", config.accessKey)
hadoopConf.set("fs.s3a.secret.key", config.secretKey)
hadoopConf.set("fs.s3a.endpoint", config.endpoint)
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set(
"fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem"
)
hadoopConf.set(
"fs.s3a.connection.ssl.enabled",
"false"
) // For LocalStack
for {
_ <- log.info(
s"Writing parquet data to: s3://${config.bucket}/$outputPath"
)
logger.debug(
_ <- log.debug(
s"Hadoop configuration set for S3: endpoint=${config.endpoint}"
)
}
.flatMap { _ =>
// Convert stream to list and write
data.compile.toList.flatMap { records =>
Async[F].delay {
if (records.nonEmpty) {
val path = ParquetPath(s"s3a://${config.bucket}/$outputPath")
// Use builder API with Hadoop configuration
ParquetWriter
.of[SampleData]
.options(ParquetWriter.Options(hadoopConf = hadoopConf))
.writeAndClose(path, records)
logger.info(
records <- data.compile.toList
_ <-
if (records.nonEmpty) {
val path = ParquetPath(s"s3a://${config.bucket}/$outputPath")
for {
_ <- Async[F].blocking {
ParquetWriter
.of[SampleData]
.options(ParquetWriter.Options(hadoopConf = hadoopConf))
.writeAndClose(path, records)
}
_ <- log.info(
s"Successfully wrote ${records.size} records to $outputPath"
)
} else {
logger.warn("No records to write")
}
} yield ()
} else {
log.warn("No records to write")
}
}
}
}
}))
} yield ()
}
}.pure[F]
)
}

/** Generate a timestamped output path
Expand Down
Loading
Loading