diff --git a/src/main/scala/com/async2databricks/Main.scala b/src/main/scala/com/async2databricks/Main.scala index 8d34834..3889c3f 100644 --- a/src/main/scala/com/async2databricks/Main.scala +++ b/src/main/scala/com/async2databricks/Main.scala @@ -11,15 +11,16 @@ import com.async2databricks.utils.SafeFileOps.* object Main extends IOApp with CatsLogger { override def run(args: List[String]): IO[ExitCode] = { - logger.info("Application starting...") + val ioLogger = logger[IO] + ioLogger.info("Application starting...") val program = for { // Load configuration config <- loadConfig[AppConfig]("application.conf") - _ <- logger.info("Configuration loaded successfully") - _ <- logger.info(s"Database: ${config.database.url}") - _ <- logger.info(s"S3 Bucket: ${config.s3.bucket}") + _ <- ioLogger.info("Configuration loaded successfully") + _ <- ioLogger.info(s"Database: ${config.database.url}") + _ <- ioLogger.info(s"S3 Bucket: ${config.s3.bucket}") // Run ETL pipeline pipeline = EtlPipeline[IO](config) @@ -29,7 +30,7 @@ object Main extends IOApp with CatsLogger { program .handleErrorWith { error => - logger + ioLogger .error(s"Application failed with error: ${error.getMessage}") .as(ExitCode.Error) } diff --git a/src/main/scala/com/async2databricks/database/DataRepository.scala b/src/main/scala/com/async2databricks/database/DataRepository.scala index fe9941f..0e9de1a 100644 --- a/src/main/scala/com/async2databricks/database/DataRepository.scala +++ b/src/main/scala/com/async2databricks/database/DataRepository.scala @@ -5,9 +5,7 @@ import com.async2databricks.model.SampleData import com.typesafe.scalalogging.LazyLogging import doobie.* import doobie.implicits.* -import doobie.postgres.implicits.* import fs2.Stream -import java.time.LocalDateTime trait DataRepository[F[_]] { @@ -20,15 +18,6 @@ object DataRepository extends LazyLogging { def apply[F[_]: Async](xa: Transactor[F]): DataRepository[F] = new DataRepository[F] { - - /** Implicit reader for SampleData - using tuple destructuring - */ - implicit val sampleDataRead: Read[SampleData] = - Read[(Long, String, Double, String, LocalDateTime)].map { - case (id, name, value, category, createdAt) => - SampleData(id, name, value, category, createdAt) - } - override def streamData( query: String, batchSize: Int @@ -42,9 +31,6 @@ object DataRepository extends LazyLogging { .transact(xa) .chunkN(batchSize) .flatMap(chunk => Stream.chunk(chunk)) - .evalTap(_ => - Async[F].delay(logger.debug("Fetched record from database")) - ) } } } diff --git a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala index 3b3887d..c7b57fe 100644 --- a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala +++ b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala @@ -1,60 +1,53 @@ package com.async2databricks.etl -import cats.effect.* +import cats.effect.Async import cats.implicits.* import com.async2databricks.config.AppConfig import com.async2databricks.database.DataRepository import com.async2databricks.database.DatabaseConnection import com.async2databricks.s3.S3Writer -import com.typesafe.scalalogging.LazyLogging +import com.async2databricks.utils.CatsLogger /** Main ETL Pipeline orchestrator Streams data from PostgreSQL and writes to S3 * as Parquet */ -class EtlPipeline[F[_]: Async](config: AppConfig) extends LazyLogging { +class EtlPipeline[F[_]: Async](config: AppConfig) extends CatsLogger { /** Execute the ETL pipeline */ def run(): F[Unit] = { - logger.info("Starting ETL Pipeline") - - val resources = for { - // Create database transactor - xa <- DatabaseConnection.createTransactor[F](config.database) - - // Create S3 writer - s3Writer <- S3Writer[F](config.s3) - - } yield (xa, s3Writer) - - resources - .use { case (xa, s3Writer) => - for { - _ <- Async[F].delay( - logger.info("Resources initialized, starting data extraction") - ) - - // Create repository - repo = DataRepository[F](xa) - - // Stream data from database - dataStream = repo.streamData(config.etl.query, config.etl.batchSize) - - // Generate output path - outputPath = S3Writer.generateOutputPath(config.s3.prefix) - - // Write to S3 - _ <- s3Writer.writeParquet(dataStream, outputPath) - - _ <- Async[F].delay( - logger.info("ETL Pipeline completed successfully") - ) - } yield () - } - .handleErrorWith { error => - Async[F].delay(logger.error("ETL Pipeline failed", error)) *> - Async[F].raiseError(error) - } + logger[F].info("Starting ETL Pipeline") >> { + val resources = for { + // Create database transactor + xa <- DatabaseConnection.createTransactor[F](config.database) + // Create S3 writer + s3Writer <- S3Writer[F](config.s3) + } yield (xa, s3Writer) + + resources + .use { case (xa, s3Writer) => + for { + _ <- logger[F].info( + "Resources initialized, starting data extraction" + ) + // Create repository + repo = DataRepository[F](xa) + // Stream data from database + dataStream = repo.streamData(config.etl.query, config.etl.batchSize) + // Generate output path + outputPath <- Async[F].delay( + S3Writer.generateOutputPath(config.s3.prefix) + ) + // Write to S3 + _ <- s3Writer.writeParquet(dataStream, outputPath) + _ <- logger[F].info("ETL Pipeline completed successfully") + } yield () + } + .handleErrorWith { error => + logger[F].error(error)("ETL Pipeline failed during ETL run") *> + Async[F].raiseError(error) + } + } } } diff --git a/src/main/scala/com/async2databricks/model/SampleData.scala b/src/main/scala/com/async2databricks/model/SampleData.scala index 760e198..d167170 100644 --- a/src/main/scala/com/async2databricks/model/SampleData.scala +++ b/src/main/scala/com/async2databricks/model/SampleData.scala @@ -1,5 +1,8 @@ package com.async2databricks.model +import doobie.postgres.implicits.JavaLocalDateTimeMeta +import doobie.util.Read +import doobie.util.Write import java.time.LocalDateTime /** Sample data model representing a row from the database This is a generic @@ -11,4 +14,5 @@ case class SampleData( value: Double, category: String, createdAt: LocalDateTime -) +) derives Read, + Write diff --git a/src/main/scala/com/async2databricks/s3/S3Writer.scala b/src/main/scala/com/async2databricks/s3/S3Writer.scala index c4bba7c..2cf3ed0 100644 --- a/src/main/scala/com/async2databricks/s3/S3Writer.scala +++ b/src/main/scala/com/async2databricks/s3/S3Writer.scala @@ -4,9 +4,9 @@ import cats.effect.* import cats.implicits.* import com.async2databricks.config.S3Config import com.async2databricks.model.SampleData +import com.async2databricks.utils.CatsLogger import com.github.mjakubowski84.parquet4s.ParquetWriter import com.github.mjakubowski84.parquet4s.Path as ParquetPath -import com.typesafe.scalalogging.LazyLogging import fs2.Stream import java.net.URI import java.time.LocalDateTime @@ -26,109 +26,124 @@ trait S3Writer[F[_]] { def writeParquet(data: Stream[F, SampleData], outputPath: String): F[Unit] } -object S3Writer extends LazyLogging { +object S3Writer extends CatsLogger { /** Creates an S3 client configured for LocalStack or AWS */ - def createS3Client(config: S3Config): Resource[IO, S3Client] = { + def createS3Client[F[_]: Async](config: S3Config): Resource[F, S3Client] = { + val log = logger[F] Resource.make { - IO.delay { - val credentialsProvider = StaticCredentialsProvider.create( - AwsBasicCredentials.create(config.accessKey, config.secretKey) - ) + for { + client <- Async[F].delay { + val credentialsProvider = StaticCredentialsProvider.create( + AwsBasicCredentials.create(config.accessKey, config.secretKey) + ) - val builder = S3Client - .builder() - .credentialsProvider(credentialsProvider) - .region(Region.of(config.region)) + val builder = S3Client + .builder() + .credentialsProvider(credentialsProvider) + .region(Region.of(config.region)) - // Use custom endpoint for LocalStack - val client = + // Use custom endpoint for LocalStack if ( config.endpoint.nonEmpty && config.endpoint != "https://s3.amazonaws.com" ) { - builder - .endpointOverride(URI.create(config.endpoint)) - .build() + builder.endpointOverride(URI.create(config.endpoint)).build() } else { builder.build() } - - logger.info(s"S3 client created for endpoint: ${config.endpoint}") - client - } - }(client => IO.delay(client.close())) + } + _ <- log.info(s"S3 client created for endpoint: ${config.endpoint}") + } yield client + }(client => Async[F].delay(client.close())) } /** Ensures the S3 bucket exists, creates it if not */ - def ensureBucket(s3Client: S3Client, bucketName: String): IO[Unit] = { - IO.delay { - try { - s3Client.headBucket( - HeadBucketRequest.builder().bucket(bucketName).build() - ) - logger.info(s"Bucket $bucketName already exists") - } catch { - case _: NoSuchBucketException => - logger.info(s"Creating bucket $bucketName") - s3Client.createBucket( - CreateBucketRequest.builder().bucket(bucketName).build() + def ensureBucket[F[_]: Async]( + s3Client: S3Client, + bucketName: String + ): F[Unit] = { + val log = logger[F] + Async[F] + .delay { + try { + s3Client.headBucket( + HeadBucketRequest.builder().bucket(bucketName).build() ) - logger.info(s"Bucket $bucketName created successfully") + Right(()) + } catch { + case _: NoSuchBucketException => + Left(()) + } + } + .flatMap { + case Right(_) => + log.info(s"Bucket $bucketName already exists") + case Left(_) => + for { + _ <- log.info(s"Creating bucket $bucketName") + _ <- Async[F].delay { + s3Client.createBucket( + CreateBucketRequest.builder().bucket(bucketName).build() + ) + } + _ <- log.info(s"Bucket $bucketName created successfully") + } yield () } - } } def apply[F[_]: Async](config: S3Config): Resource[F, S3Writer[F]] = { - Resource.eval(Async[F].delay(new S3Writer[F] { - override def writeParquet( - data: Stream[F, SampleData], - outputPath: String - ): F[Unit] = { - // Configure Hadoop for S3 access - val hadoopConf = new org.apache.hadoop.conf.Configuration() - hadoopConf.set("fs.s3a.access.key", config.accessKey) - hadoopConf.set("fs.s3a.secret.key", config.secretKey) - hadoopConf.set("fs.s3a.endpoint", config.endpoint) - hadoopConf.set("fs.s3a.path.style.access", "true") - hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - hadoopConf.set( - "fs.s3a.connection.ssl.enabled", - "false" - ) // For LocalStack - - Async[F] - .delay { - logger.info( + val log = logger[F] + Resource.eval( + new S3Writer[F] { + override def writeParquet( + data: Stream[F, SampleData], + outputPath: String + ): F[Unit] = { + // Configure Hadoop for S3 access + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("fs.s3a.access.key", config.accessKey) + hadoopConf.set("fs.s3a.secret.key", config.secretKey) + hadoopConf.set("fs.s3a.endpoint", config.endpoint) + hadoopConf.set("fs.s3a.path.style.access", "true") + hadoopConf.set( + "fs.s3a.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem" + ) + hadoopConf.set( + "fs.s3a.connection.ssl.enabled", + "false" + ) // For LocalStack + for { + _ <- log.info( s"Writing parquet data to: s3://${config.bucket}/$outputPath" ) - logger.debug( + _ <- log.debug( s"Hadoop configuration set for S3: endpoint=${config.endpoint}" ) - } - .flatMap { _ => - // Convert stream to list and write - data.compile.toList.flatMap { records => - Async[F].delay { - if (records.nonEmpty) { - val path = ParquetPath(s"s3a://${config.bucket}/$outputPath") - // Use builder API with Hadoop configuration - ParquetWriter - .of[SampleData] - .options(ParquetWriter.Options(hadoopConf = hadoopConf)) - .writeAndClose(path, records) - logger.info( + records <- data.compile.toList + _ <- + if (records.nonEmpty) { + val path = ParquetPath(s"s3a://${config.bucket}/$outputPath") + for { + _ <- Async[F].blocking { + ParquetWriter + .of[SampleData] + .options(ParquetWriter.Options(hadoopConf = hadoopConf)) + .writeAndClose(path, records) + } + _ <- log.info( s"Successfully wrote ${records.size} records to $outputPath" ) - } else { - logger.warn("No records to write") - } + } yield () + } else { + log.warn("No records to write") } - } - } - } - })) + } yield () + } + }.pure[F] + ) } /** Generate a timestamped output path diff --git a/src/main/scala/com/async2databricks/utils/CatsLogger.scala b/src/main/scala/com/async2databricks/utils/CatsLogger.scala index 653c67c..34db5a9 100644 --- a/src/main/scala/com/async2databricks/utils/CatsLogger.scala +++ b/src/main/scala/com/async2databricks/utils/CatsLogger.scala @@ -1,12 +1,15 @@ package com.async2databricks.utils -import cats.effect.IO +import cats.effect.Sync import org.typelevel.log4cats.SelfAwareStructuredLogger import org.typelevel.log4cats.slf4j.Slf4jFactory /** Mixin trait to provide a Cats Effect logger. Helpful to add asynchronous * logging capabilities. + * @see + * https://typelevel.org/log4cats/#logging-using-capabilities */ trait CatsLogger { - val logger: SelfAwareStructuredLogger[IO] = Slf4jFactory.create[IO].getLogger + def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = + Slf4jFactory.create[F].getLogger }