From afbbbd69f79ddae436ff7476540456d89705dc1e Mon Sep 17 00:00:00 2001 From: akreit Date: Mon, 29 Dec 2025 12:56:20 +0100 Subject: [PATCH 1/9] move to cats logger --- .../scala/com/async2databricks/Main.scala | 11 +- .../database/DataRepository.scala | 39 ++---- .../async2databricks/etl/EtlPipeline.scala | 76 +++++------ .../async2databricks/model/SampleData.scala | 6 +- .../com/async2databricks/s3/S3Writer.scala | 120 ++++++++++-------- .../async2databricks/utils/CatsLogger.scala | 7 +- 6 files changed, 125 insertions(+), 134 deletions(-) diff --git a/src/main/scala/com/async2databricks/Main.scala b/src/main/scala/com/async2databricks/Main.scala index 8d34834..3889c3f 100644 --- a/src/main/scala/com/async2databricks/Main.scala +++ b/src/main/scala/com/async2databricks/Main.scala @@ -11,15 +11,16 @@ import com.async2databricks.utils.SafeFileOps.* object Main extends IOApp with CatsLogger { override def run(args: List[String]): IO[ExitCode] = { - logger.info("Application starting...") + val ioLogger = logger[IO] + ioLogger.info("Application starting...") val program = for { // Load configuration config <- loadConfig[AppConfig]("application.conf") - _ <- logger.info("Configuration loaded successfully") - _ <- logger.info(s"Database: ${config.database.url}") - _ <- logger.info(s"S3 Bucket: ${config.s3.bucket}") + _ <- ioLogger.info("Configuration loaded successfully") + _ <- ioLogger.info(s"Database: ${config.database.url}") + _ <- ioLogger.info(s"S3 Bucket: ${config.s3.bucket}") // Run ETL pipeline pipeline = EtlPipeline[IO](config) @@ -29,7 +30,7 @@ object Main extends IOApp with CatsLogger { program .handleErrorWith { error => - logger + ioLogger .error(s"Application failed with error: ${error.getMessage}") .as(ExitCode.Error) } diff --git a/src/main/scala/com/async2databricks/database/DataRepository.scala b/src/main/scala/com/async2databricks/database/DataRepository.scala index fe9941f..10c5714 100644 --- a/src/main/scala/com/async2databricks/database/DataRepository.scala +++ b/src/main/scala/com/async2databricks/database/DataRepository.scala @@ -5,9 +5,7 @@ import com.async2databricks.model.SampleData import com.typesafe.scalalogging.LazyLogging import doobie.* import doobie.implicits.* -import doobie.postgres.implicits.* import fs2.Stream -import java.time.LocalDateTime trait DataRepository[F[_]] { @@ -19,32 +17,15 @@ trait DataRepository[F[_]] { object DataRepository extends LazyLogging { def apply[F[_]: Async](xa: Transactor[F]): DataRepository[F] = - new DataRepository[F] { - - /** Implicit reader for SampleData - using tuple destructuring - */ - implicit val sampleDataRead: Read[SampleData] = - Read[(Long, String, Double, String, LocalDateTime)].map { - case (id, name, value, category, createdAt) => - SampleData(id, name, value, category, createdAt) - } - - override def streamData( - query: String, - batchSize: Int - ): Stream[F, SampleData] = { - logger.info(s"Starting to stream data with query: $query") - - Fragment - .const(query) - .query[SampleData] - .stream - .transact(xa) - .chunkN(batchSize) - .flatMap(chunk => Stream.chunk(chunk)) - .evalTap(_ => - Async[F].delay(logger.debug("Fetched record from database")) - ) - } + (query: String, batchSize: Int) => { + logger.info(s"Starting to stream data with query: $query") + + Fragment + .const(query) + .query[SampleData] + .stream + .transact(xa) + .chunkN(batchSize) + .flatMap(chunk => Stream.chunk(chunk)) } } diff --git a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala index 3b3887d..22ec6b5 100644 --- a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala +++ b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala @@ -1,60 +1,52 @@ package com.async2databricks.etl -import cats.effect.* +import cats.effect.Async import cats.implicits.* import com.async2databricks.config.AppConfig import com.async2databricks.database.DataRepository import com.async2databricks.database.DatabaseConnection import com.async2databricks.s3.S3Writer -import com.typesafe.scalalogging.LazyLogging +import com.async2databricks.utils.CatsLogger /** Main ETL Pipeline orchestrator Streams data from PostgreSQL and writes to S3 * as Parquet */ -class EtlPipeline[F[_]: Async](config: AppConfig) extends LazyLogging { +class EtlPipeline[F[_]: Async](config: AppConfig) extends CatsLogger { /** Execute the ETL pipeline */ def run(): F[Unit] = { - logger.info("Starting ETL Pipeline") - - val resources = for { - // Create database transactor - xa <- DatabaseConnection.createTransactor[F](config.database) - - // Create S3 writer - s3Writer <- S3Writer[F](config.s3) - - } yield (xa, s3Writer) - - resources - .use { case (xa, s3Writer) => - for { - _ <- Async[F].delay( - logger.info("Resources initialized, starting data extraction") - ) - - // Create repository - repo = DataRepository[F](xa) - - // Stream data from database - dataStream = repo.streamData(config.etl.query, config.etl.batchSize) - - // Generate output path - outputPath = S3Writer.generateOutputPath(config.s3.prefix) - - // Write to S3 - _ <- s3Writer.writeParquet(dataStream, outputPath) - - _ <- Async[F].delay( - logger.info("ETL Pipeline completed successfully") - ) - } yield () - } - .handleErrorWith { error => - Async[F].delay(logger.error("ETL Pipeline failed", error)) *> - Async[F].raiseError(error) - } + logger[F].info("Starting ETL Pipeline") >> { + val resources = for { + // Create database transactor + xa <- DatabaseConnection.createTransactor[F](config.database) + // Create S3 writer + s3Writer <- S3Writer[F](config.s3) + } yield (xa, s3Writer) + + resources + .use { case (xa, s3Writer) => + for { + _ <- logger[F].info( + "Resources initialized, starting data extraction" + ) + // Create repository + repo = DataRepository[F](xa) + // Stream data from database + dataStream = repo.streamData(config.etl.query, config.etl.batchSize) + // Generate output path + outputPath <- Async[F].delay( + S3Writer.generateOutputPath(config.s3.prefix) + ) + // Write to S3 + _ <- s3Writer.writeParquet(dataStream, outputPath) + } yield () + } + .handleErrorWith { error => + logger[F].error(s"ETL Pipeline failed due to: $error") *> + Async[F].raiseError(error) + } + } } } diff --git a/src/main/scala/com/async2databricks/model/SampleData.scala b/src/main/scala/com/async2databricks/model/SampleData.scala index 760e198..d167170 100644 --- a/src/main/scala/com/async2databricks/model/SampleData.scala +++ b/src/main/scala/com/async2databricks/model/SampleData.scala @@ -1,5 +1,8 @@ package com.async2databricks.model +import doobie.postgres.implicits.JavaLocalDateTimeMeta +import doobie.util.Read +import doobie.util.Write import java.time.LocalDateTime /** Sample data model representing a row from the database This is a generic @@ -11,4 +14,5 @@ case class SampleData( value: Double, category: String, createdAt: LocalDateTime -) +) derives Read, + Write diff --git a/src/main/scala/com/async2databricks/s3/S3Writer.scala b/src/main/scala/com/async2databricks/s3/S3Writer.scala index c4bba7c..20ef778 100644 --- a/src/main/scala/com/async2databricks/s3/S3Writer.scala +++ b/src/main/scala/com/async2databricks/s3/S3Writer.scala @@ -4,9 +4,9 @@ import cats.effect.* import cats.implicits.* import com.async2databricks.config.S3Config import com.async2databricks.model.SampleData +import com.async2databricks.utils.CatsLogger import com.github.mjakubowski84.parquet4s.ParquetWriter import com.github.mjakubowski84.parquet4s.Path as ParquetPath -import com.typesafe.scalalogging.LazyLogging import fs2.Stream import java.net.URI import java.time.LocalDateTime @@ -26,13 +26,14 @@ trait S3Writer[F[_]] { def writeParquet(data: Stream[F, SampleData], outputPath: String): F[Unit] } -object S3Writer extends LazyLogging { +object S3Writer extends CatsLogger { /** Creates an S3 client configured for LocalStack or AWS */ - def createS3Client(config: S3Config): Resource[IO, S3Client] = { + def createS3Client[F[_]: Async](config: S3Config): Resource[F, S3Client] = { + val log = logger[F] Resource.make { - IO.delay { + Async[F].delay { val credentialsProvider = StaticCredentialsProvider.create( AwsBasicCredentials.create(config.accessKey, config.secretKey) ) @@ -47,88 +48,97 @@ object S3Writer extends LazyLogging { if ( config.endpoint.nonEmpty && config.endpoint != "https://s3.amazonaws.com" ) { - builder - .endpointOverride(URI.create(config.endpoint)) - .build() + builder.endpointOverride(URI.create(config.endpoint)).build() } else { builder.build() } - logger.info(s"S3 client created for endpoint: ${config.endpoint}") + // Log effectfully + log.info( + s"S3 client created for endpoint: ${config.endpoint}" + ) // Only for side-effect, not recommended in prod client } - }(client => IO.delay(client.close())) + }(client => Async[F].delay(client.close())) } /** Ensures the S3 bucket exists, creates it if not */ - def ensureBucket(s3Client: S3Client, bucketName: String): IO[Unit] = { - IO.delay { + def ensureBucket[F[_]: Async]( + s3Client: S3Client, + bucketName: String + ): F[Unit] = { + val log = logger[F] + Async[F].delay { try { s3Client.headBucket( HeadBucketRequest.builder().bucket(bucketName).build() ) - logger.info(s"Bucket $bucketName already exists") + log.info( + s"Bucket $bucketName already exists" + ) // Only for side-effect, not recommended in prod } catch { case _: NoSuchBucketException => - logger.info(s"Creating bucket $bucketName") + log.info(s"Creating bucket $bucketName") s3Client.createBucket( CreateBucketRequest.builder().bucket(bucketName).build() ) - logger.info(s"Bucket $bucketName created successfully") + log.info(s"Bucket $bucketName created successfully") } } } def apply[F[_]: Async](config: S3Config): Resource[F, S3Writer[F]] = { - Resource.eval(Async[F].delay(new S3Writer[F] { - override def writeParquet( - data: Stream[F, SampleData], - outputPath: String - ): F[Unit] = { - // Configure Hadoop for S3 access - val hadoopConf = new org.apache.hadoop.conf.Configuration() - hadoopConf.set("fs.s3a.access.key", config.accessKey) - hadoopConf.set("fs.s3a.secret.key", config.secretKey) - hadoopConf.set("fs.s3a.endpoint", config.endpoint) - hadoopConf.set("fs.s3a.path.style.access", "true") - hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - hadoopConf.set( - "fs.s3a.connection.ssl.enabled", - "false" - ) // For LocalStack - - Async[F] - .delay { - logger.info( + val log = logger[F] + Resource.eval( + new S3Writer[F] { + override def writeParquet( + data: Stream[F, SampleData], + outputPath: String + ): F[Unit] = { + // Configure Hadoop for S3 access + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("fs.s3a.access.key", config.accessKey) + hadoopConf.set("fs.s3a.secret.key", config.secretKey) + hadoopConf.set("fs.s3a.endpoint", config.endpoint) + hadoopConf.set("fs.s3a.path.style.access", "true") + hadoopConf.set( + "fs.s3a.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem" + ) + hadoopConf.set( + "fs.s3a.connection.ssl.enabled", + "false" + ) // For LocalStack + for { + _ <- log.info( s"Writing parquet data to: s3://${config.bucket}/$outputPath" ) - logger.debug( + _ <- log.debug( s"Hadoop configuration set for S3: endpoint=${config.endpoint}" ) - } - .flatMap { _ => - // Convert stream to list and write - data.compile.toList.flatMap { records => - Async[F].delay { - if (records.nonEmpty) { - val path = ParquetPath(s"s3a://${config.bucket}/$outputPath") - // Use builder API with Hadoop configuration - ParquetWriter - .of[SampleData] - .options(ParquetWriter.Options(hadoopConf = hadoopConf)) - .writeAndClose(path, records) - logger.info( + records <- data.compile.toList + _ <- + if (records.nonEmpty) { + val path = ParquetPath(s"s3a://${config.bucket}/$outputPath") + for { + _ <- Async[F].blocking { + ParquetWriter + .of[SampleData] + .options(ParquetWriter.Options(hadoopConf = hadoopConf)) + .writeAndClose(path, records) + } + _ <- log.info( s"Successfully wrote ${records.size} records to $outputPath" ) - } else { - logger.warn("No records to write") - } + } yield () + } else { + log.warn("No records to write") } - } - } - } - })) + } yield () + } + }.pure[F] + ) } /** Generate a timestamped output path diff --git a/src/main/scala/com/async2databricks/utils/CatsLogger.scala b/src/main/scala/com/async2databricks/utils/CatsLogger.scala index 653c67c..34db5a9 100644 --- a/src/main/scala/com/async2databricks/utils/CatsLogger.scala +++ b/src/main/scala/com/async2databricks/utils/CatsLogger.scala @@ -1,12 +1,15 @@ package com.async2databricks.utils -import cats.effect.IO +import cats.effect.Sync import org.typelevel.log4cats.SelfAwareStructuredLogger import org.typelevel.log4cats.slf4j.Slf4jFactory /** Mixin trait to provide a Cats Effect logger. Helpful to add asynchronous * logging capabilities. + * @see + * https://typelevel.org/log4cats/#logging-using-capabilities */ trait CatsLogger { - val logger: SelfAwareStructuredLogger[IO] = Slf4jFactory.create[IO].getLogger + def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = + Slf4jFactory.create[F].getLogger } From 169d59d44ef0f135cdf3128d830c73120c290167 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:15:14 +0000 Subject: [PATCH 2/9] Initial plan From 9103f1ea02d6b122d8b1d94354dc1d472f624243 Mon Sep 17 00:00:00 2001 From: AndreasK <64101884+akreit@users.noreply.github.com> Date: Mon, 29 Dec 2025 13:15:30 +0100 Subject: [PATCH 3/9] Update src/main/scala/com/async2databricks/etl/EtlPipeline.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/scala/com/async2databricks/etl/EtlPipeline.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala index 22ec6b5..c93f2c3 100644 --- a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala +++ b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala @@ -43,7 +43,7 @@ class EtlPipeline[F[_]: Async](config: AppConfig) extends CatsLogger { } yield () } .handleErrorWith { error => - logger[F].error(s"ETL Pipeline failed due to: $error") *> + logger[F].error(error)("ETL Pipeline failed during ETL run") *> Async[F].raiseError(error) } } From 54a18a58c13ecf255f313889b52c97f30eb2dd8f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:16:00 +0000 Subject: [PATCH 4/9] Initial plan From 3acb23e548c73df74c28787a1b47e7bc1258160a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:19:36 +0000 Subject: [PATCH 5/9] Fix logging side-effects to be properly sequenced in effect system Co-authored-by: akreit <64101884+akreit@users.noreply.github.com> --- .../com/async2databricks/s3/S3Writer.scala | 69 ++++++++++--------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/src/main/scala/com/async2databricks/s3/S3Writer.scala b/src/main/scala/com/async2databricks/s3/S3Writer.scala index 20ef778..2cf3ed0 100644 --- a/src/main/scala/com/async2databricks/s3/S3Writer.scala +++ b/src/main/scala/com/async2databricks/s3/S3Writer.scala @@ -33,18 +33,18 @@ object S3Writer extends CatsLogger { def createS3Client[F[_]: Async](config: S3Config): Resource[F, S3Client] = { val log = logger[F] Resource.make { - Async[F].delay { - val credentialsProvider = StaticCredentialsProvider.create( - AwsBasicCredentials.create(config.accessKey, config.secretKey) - ) + for { + client <- Async[F].delay { + val credentialsProvider = StaticCredentialsProvider.create( + AwsBasicCredentials.create(config.accessKey, config.secretKey) + ) - val builder = S3Client - .builder() - .credentialsProvider(credentialsProvider) - .region(Region.of(config.region)) + val builder = S3Client + .builder() + .credentialsProvider(credentialsProvider) + .region(Region.of(config.region)) - // Use custom endpoint for LocalStack - val client = + // Use custom endpoint for LocalStack if ( config.endpoint.nonEmpty && config.endpoint != "https://s3.amazonaws.com" ) { @@ -52,13 +52,9 @@ object S3Writer extends CatsLogger { } else { builder.build() } - - // Log effectfully - log.info( - s"S3 client created for endpoint: ${config.endpoint}" - ) // Only for side-effect, not recommended in prod - client - } + } + _ <- log.info(s"S3 client created for endpoint: ${config.endpoint}") + } yield client }(client => Async[F].delay(client.close())) } @@ -69,23 +65,32 @@ object S3Writer extends CatsLogger { bucketName: String ): F[Unit] = { val log = logger[F] - Async[F].delay { - try { - s3Client.headBucket( - HeadBucketRequest.builder().bucket(bucketName).build() - ) - log.info( - s"Bucket $bucketName already exists" - ) // Only for side-effect, not recommended in prod - } catch { - case _: NoSuchBucketException => - log.info(s"Creating bucket $bucketName") - s3Client.createBucket( - CreateBucketRequest.builder().bucket(bucketName).build() + Async[F] + .delay { + try { + s3Client.headBucket( + HeadBucketRequest.builder().bucket(bucketName).build() ) - log.info(s"Bucket $bucketName created successfully") + Right(()) + } catch { + case _: NoSuchBucketException => + Left(()) + } + } + .flatMap { + case Right(_) => + log.info(s"Bucket $bucketName already exists") + case Left(_) => + for { + _ <- log.info(s"Creating bucket $bucketName") + _ <- Async[F].delay { + s3Client.createBucket( + CreateBucketRequest.builder().bucket(bucketName).build() + ) + } + _ <- log.info(s"Bucket $bucketName created successfully") + } yield () } - } } def apply[F[_]: Async](config: S3Config): Resource[F, S3Writer[F]] = { From e14e757c0340b8204619a7f73b3c683e0356e4e6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:20:01 +0000 Subject: [PATCH 6/9] Fix DataRepository to return proper trait implementation instead of lambda Co-authored-by: akreit <64101884+akreit@users.noreply.github.com> --- .../database/DataRepository.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/async2databricks/database/DataRepository.scala b/src/main/scala/com/async2databricks/database/DataRepository.scala index 10c5714..c1100c5 100644 --- a/src/main/scala/com/async2databricks/database/DataRepository.scala +++ b/src/main/scala/com/async2databricks/database/DataRepository.scala @@ -17,15 +17,17 @@ trait DataRepository[F[_]] { object DataRepository extends LazyLogging { def apply[F[_]: Async](xa: Transactor[F]): DataRepository[F] = - (query: String, batchSize: Int) => { - logger.info(s"Starting to stream data with query: $query") + new DataRepository[F] { + override def streamData(query: String, batchSize: Int): Stream[F, SampleData] = { + logger.info(s"Starting to stream data with query: $query") - Fragment - .const(query) - .query[SampleData] - .stream - .transact(xa) - .chunkN(batchSize) - .flatMap(chunk => Stream.chunk(chunk)) + Fragment + .const(query) + .query[SampleData] + .stream + .transact(xa) + .chunkN(batchSize) + .flatMap(chunk => Stream.chunk(chunk)) + } } } From 07fed58dc260bf4bad803543c7c3cacad0a567b2 Mon Sep 17 00:00:00 2001 From: akreit Date: Mon, 29 Dec 2025 13:38:48 +0100 Subject: [PATCH 7/9] apply scalafmt --- .../scala/com/async2databricks/database/DataRepository.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/async2databricks/database/DataRepository.scala b/src/main/scala/com/async2databricks/database/DataRepository.scala index c1100c5..0e9de1a 100644 --- a/src/main/scala/com/async2databricks/database/DataRepository.scala +++ b/src/main/scala/com/async2databricks/database/DataRepository.scala @@ -18,7 +18,10 @@ object DataRepository extends LazyLogging { def apply[F[_]: Async](xa: Transactor[F]): DataRepository[F] = new DataRepository[F] { - override def streamData(query: String, batchSize: Int): Stream[F, SampleData] = { + override def streamData( + query: String, + batchSize: Int + ): Stream[F, SampleData] = { logger.info(s"Starting to stream data with query: $query") Fragment From 4e7970de00361ebc8249ab2f194f059ebe4f1f18 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Dec 2025 12:44:51 +0000 Subject: [PATCH 8/9] Initial plan From 2acab0dd80aa1f973f3af6bcafa4aa6cc3fd828f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Dec 2025 13:03:32 +0000 Subject: [PATCH 9/9] Restore 'ETL Pipeline completed successfully' log message Co-authored-by: akreit <64101884+akreit@users.noreply.github.com> --- src/main/scala/com/async2databricks/etl/EtlPipeline.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala index c93f2c3..c7b57fe 100644 --- a/src/main/scala/com/async2databricks/etl/EtlPipeline.scala +++ b/src/main/scala/com/async2databricks/etl/EtlPipeline.scala @@ -40,6 +40,7 @@ class EtlPipeline[F[_]: Async](config: AppConfig) extends CatsLogger { ) // Write to S3 _ <- s3Writer.writeParquet(dataStream, outputPath) + _ <- logger[F].info("ETL Pipeline completed successfully") } yield () } .handleErrorWith { error =>