diff --git a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala index 82b78153f..79ef5c5a3 100644 --- a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala +++ b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala @@ -209,14 +209,16 @@ object AppConfig { final case class ConsumerGroupsAlgebraConfig( kafkaInternalConsumerGroupsTopic: String, commonConsumerGroup: ConsumerGroup, - consumerGroupsConsumerEnabled: Boolean + consumerGroupsConsumerEnabled: Boolean, + lagPublishInterval: FiniteDuration ) private val consumerGroupAlgebraConfig: ConfigValue[ConsumerGroupsAlgebraConfig] = ( env("KAFKA_CONSUMER_GROUPS_INTERNAL_TOPIC_NAME").as[String].default("__consumer_offsets"), - env("HYDRA_CONSUMER_GROUPS_COMMON_CONSUMER_GROUP").as[ConsumerGroup].default("kafkaInternalConsumerGroupsTopic-ConsumerGroupName"), - env("CONSUMER_GROUPS_CONSUMER_ENABLED").as[Boolean].default(true) + env("HYDRA_CONSUMER_GROUPS_COMMON_CONSUMER_GROUP").as[ConsumerGroup].default("kafkaInternalConsumerGroupsTopic-ConsumerGroupName"), + env("CONSUMER_GROUPS_CONSUMER_ENABLED").as[Boolean].default(true), + env("CONSUMER_GROUPS_INTERNAL_TOPIC_LAG_PUBLISH_INTERVAL").as[FiniteDuration].default(1.minutes) ).parMapN(ConsumerGroupsAlgebraConfig) final case class IngestConfig( diff --git a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala index 8ee343e41..c5e81b4af 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala @@ -64,7 +64,8 @@ object Algebras { kafkaClientAlgebra = kafkaClient, kAA = kafkaAdmin, sra = schemaRegistry, - config.kafkaClientSecurityConfig + config.kafkaClientSecurityConfig, + config.consumerGroupsAlgebraConfig.lagPublishInterval ) awsIamClient <- AwsIamClient.make awsStsClient <- AwsStsClient.make diff --git a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala index f8c4fcecc..b6d29137a 100644 --- a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala @@ -33,6 +33,7 @@ import scalacache.Cache import scalacache.guava.GuavaCache import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt @@ -147,7 +148,7 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala implicit val notificationSenderMock: InternalNotificationSender[IO] = getInternalNotificationSenderMock[IO] ConsumerGroupsAlgebra.make("",Subject.createValidated("dvs.blah.blah").get, Subject.createValidated("dvs.heyo.blah").get,"","","", - kafkaClientAlgebra,kafkaAdminAlgebra,schemaAlgebra, kafkaSecurityEmptyConfig) + kafkaClientAlgebra,kafkaAdminAlgebra,schemaAlgebra, kafkaSecurityEmptyConfig, 1.minutes) } "The deletionEndpoint path" should { diff --git a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala index b7f789293..858502821 100644 --- a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala +++ b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala @@ -10,7 +10,7 @@ import hydra.common.NotificationsTestSuite import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender} import hydra.common.config.KafkaConfigUtils.{KafkaClientSecurityConfig, SchemaRegistrySecurityConfig, kafkaSecurityEmptyConfig} import hydra.common.alerting.sender.InternalNotificationSender -import hydra.kafka.algebras.ConsumerGroupsAlgebra.PartitionOffsetMap +import hydra.kafka.algebras.ConsumerGroupsAlgebra.{PartitionOffsetMap, PartitionOffsetsWithTotalLag} import hydra.kafka.algebras.KafkaClientAlgebra.{OffsetInfo, Record} import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue} import hydra.kafka.model.TopicConsumerOffset.{TopicConsumerOffsetKey, TopicConsumerOffsetValue} @@ -72,7 +72,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl kafkaClient <- KafkaClientAlgebra.live[IO](container.bootstrapServers, "https://schema-registry", schemaRegistry , kafkaSecurityEmptyConfig) consumerGroupAlgebra <- ConsumerGroupsAlgebra.make(internalKafkaConsumerTopic, dvsConsumerTopic, dvsInternalKafkaOffsetsTopic, - container.bootstrapServers, consumerGroup, consumerGroup, kafkaClient, kafkaAdmin, schemaRegistry, kafkaSecurityEmptyConfig) + container.bootstrapServers, consumerGroup, consumerGroup, kafkaClient, kafkaAdmin, schemaRegistry, kafkaSecurityEmptyConfig, 1.minutes) _ <- consumerGroupAlgebra.startConsumer } yield { runTests(consumerGroupAlgebra, schemaRegistry, kafkaClient, kafkaAdmin) @@ -185,6 +185,31 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl case Right(_) => succeed }.unsafeRunSync() } + + "test getLagOnInternalConsumerTopic to verify no lag with commit offsets as false" in { + val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") + + kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync() + + kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) + .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some + + cga.getLagOnDvsInternalCGTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_)) + } + + "test getLagOnInternalConsumerTopic to verify some lag with commit offsets as false" in { + val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") + val (key2, value2) = getGenericRecords(dvsConsumerTopic.value, "abcd", "1234") + + kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync() + kafkaClient.publishMessage((key2, Some(value2), None), dvsConsumerTopic.value).unsafeRunSync() + + kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) + .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some + + cga.getLagOnDvsInternalCGTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _)) + } + } } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 8b49cfd14..146de1e26 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -1,21 +1,13 @@ package hydra.kafka.algebras -import cats.ApplicativeError - -import java.time.Instant import cats.effect.concurrent.Ref import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, IO, Timer} import cats.implicits._ -import fs2.kafka._ import hydra.avro.registry.SchemaRegistry -import hydra.common.alerting.AlertProtocol.NotificationMessage -import hydra.common.alerting.NotificationLevel -import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender} +import hydra.common.alerting.sender.InternalNotificationSender import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig -import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers} import hydra.kafka.algebras.ConsumerGroupsAlgebra._ -import hydra.kafka.algebras.HydraTag.StringJsonFormat -import hydra.kafka.algebras.KafkaClientAlgebra.Record +import hydra.kafka.algebras.KafkaClientAlgebra.{Offset, Partition, Record} import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Infinite import hydra.kafka.algebras.RetryableFs2Stream._ import hydra.kafka.model.TopicConsumer @@ -23,9 +15,11 @@ import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue} import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented import hydra.kafka.util.ConsumerGroupsOffsetConsumer -import org.apache.avro.generic.GenericRecord import org.typelevel.log4cats.Logger +import java.time.Instant +import scala.concurrent.duration.{DurationInt, FiniteDuration} + trait ConsumerGroupsAlgebra[F[_]] { def getConsumersForTopic(topicName: String): F[TopicConsumers] @@ -44,6 +38,8 @@ trait ConsumerGroupsAlgebra[F[_]] { def consumerGroupIsActive(str: String): F[(Boolean, String)] def getUniquePerNodeConsumerGroup: String + + def getLagOnDvsInternalCGTopic: F[PartitionOffsetsWithTotalLag] } final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKey, (TopicConsumerValue, String)]) extends ConsumerGroupsAlgebra[IO] { @@ -96,6 +92,11 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" + override def getLagOnDvsInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = { + IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50, + List(PartitionOffset(1, 10, 20, 10), PartitionOffset(2, 10, 20, 10), PartitionOffset(3, 10, 20, 10)) + )) + } } object TestConsumerGroupsAlgebra { @@ -106,6 +107,9 @@ object ConsumerGroupsAlgebra { type PartitionOffsetMap = Map[Int, Long] + final case class PartitionOffsetsWithTotalLag(totalLargestOffset: Long, totalGroupOffset: Long, totalLag: Long, + lagPercentage: Double, partitionOffsets: List[PartitionOffset]) + final case class PartitionOffset(partition: Int, groupOffset: Long, largestOffset: Long, partitionLag: Long) final case class TopicConsumers(topicName: String, consumers: List[Consumer]) @@ -129,22 +133,56 @@ object ConsumerGroupsAlgebra { kafkaClientAlgebra: KafkaClientAlgebra[F], kAA: KafkaAdminAlgebra[F], sra: SchemaRegistry[F], - kafkaClientSecurityConfig: KafkaClientSecurityConfig + kafkaClientSecurityConfig: KafkaClientSecurityConfig, + lagPublishInterval: FiniteDuration ) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = { - val dvsConsumersStream: fs2.Stream[F, Record] = { - kafkaClientAlgebra.consumeSafelyMessages(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false) + + val dvsConsumersStream: fs2.Stream[F, (Record, (Partition, Offset))] = { + kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false) //Ignore records with errors .collect { case Right(value) => value } } for { consumerGroupsStorageFacade <- Ref[F].of(ConsumerGroupsStorageFacade.empty) + consumerGroupsOffsetFacade <- Ref[F].of(ConsumerGroupsOffsetFacade.empty) } yield new ConsumerGroupsAlgebra[F] { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) + override def getLagOnDvsInternalCGTopic: F[PartitionOffsetsWithTotalLag] = { + + def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = + offsetMap.get(partition) match { + case Some(value) => value + 1.toLong //Adding offsets by 1 as Kafka Admin algebra's getLatestOffsets method is having the same behaviour. + case _ => 0 + } + + for { + groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getOffsets()) + + partitionOffsetMapWithLag <- kAA.getLatestOffsets(dvsConsumersTopic.value) + .map(_.toList + .filter(_._2.value > 0.toLong) + .map(latestOffset => PartitionOffset( + latestOffset._1.partition, + getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap), + latestOffset._2.value, + latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap) + )).toList) + + (totalLargestOffset, totalGroupOffset) = + (partitionOffsetMapWithLag.map(_.largestOffset).sum, partitionOffsetMapWithLag.map(_.groupOffset).sum) + + totalLag = totalLargestOffset - totalGroupOffset + + lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100 + + } yield PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) + } + private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = { val detailedF: F[List[Consumer]] = topicConsumers.consumers.traverse { consumer => val fState = getConsumerActiveState(consumer.consumerGroupName) @@ -166,11 +204,19 @@ object ConsumerGroupsAlgebra { override def startConsumer: F[Unit] = { for { - _ <- Concurrent[F].start(consumeDVSConsumersTopicIntoCache(dvsConsumersStream, consumerGroupsStorageFacade)) + _ <- Concurrent[F].start(consumeDVSConsumersTopicIntoCache(dvsConsumersStream, consumerGroupsStorageFacade, consumerGroupsOffsetFacade)) _ <- Concurrent[F].start { ConsumerGroupsOffsetConsumer.start(kafkaClientAlgebra, kAA, sra, uniquePerNodeConsumerGroup, consumerOffsetsOffsetsTopicConfig, kafkaInternalTopic, dvsConsumersTopic, bootstrapServers, commonConsumerGroup, kafkaClientSecurityConfig) } + _ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap( + lagInfo => Logger[F].info( + s"Lag details on ${dvsConsumersTopic}. Total_Offset_Lag = ${lagInfo.totalLag.toString}, " + + f"Lag_percentage = ${lagInfo.lagPercentage}%2.4f, " + + s"Total_Group_Offset = ${lagInfo.totalGroupOffset}, " + + s"Total_Largest_Offset = ${lagInfo.totalLargestOffset}, " + + s"Total_active_partitions = ${Option(lagInfo.partitionOffsets).map(_.size).getOrElse(0)}" + ))).compile.drain } yield () } @@ -206,17 +252,19 @@ object ConsumerGroupsAlgebra { } private def consumeDVSConsumersTopicIntoCache[F[_] : ContextShift : ConcurrentEffect : Timer : Logger]( - dvsConsumersStream: fs2.Stream[F, Record], - consumerGroupsStorageFacade: Ref[F, ConsumerGroupsStorageFacade] + dvsConsumersStream: fs2.Stream[F, (Record, (Partition, Offset))], + consumerGroupsStorageFacade: Ref[F, ConsumerGroupsStorageFacade], + consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade], )(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = { - dvsConsumersStream.evalTap { case (key, value, _) => - TopicConsumer.decode[F](key, value).flatMap { + dvsConsumersStream.evalTap { + case ((key, value, _),(partition, offset)) => + TopicConsumer.decode[F](key, value).flatMap { case (topicKey, topicValue) => topicValue match { case Some(tV) => - consumerGroupsStorageFacade.update(_.addConsumerGroup(topicKey, tV)) + consumerGroupsStorageFacade.update(_.addConsumerGroup(topicKey, tV)) *> consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) case None => - consumerGroupsStorageFacade.update(_.removeConsumerGroup(topicKey)) + consumerGroupsStorageFacade.update(_.removeConsumerGroup(topicKey)) *> consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) } }.recoverWith { case e => Logger[F].error(e)("Error in ConsumergroupsAlgebra consumer") @@ -258,3 +306,20 @@ private object ConsumerGroupsStorageFacade { def empty: ConsumerGroupsStorageFacade = ConsumerGroupsStorageFacade(Map.empty) } +private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) { + + def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = + this.copy(this.offsetMap + (key -> value)) + + def getOffsets(): Map[Partition, Offset] = + this.offsetMap + + def removeOffset(key: Partition): ConsumerGroupsOffsetFacade = + this.copy(this.offsetMap - key) + + } + + private object ConsumerGroupsOffsetFacade { + def empty: ConsumerGroupsOffsetFacade = ConsumerGroupsOffsetFacade(Map.empty) + } + diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala index 2392a8837..4e66f0612 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala @@ -531,7 +531,7 @@ object KafkaClientAlgebra { : fs2.Stream[F, Either[Throwable, (StringRecord, (Partition, Offset))]] = ??? override def consumeSafelyWithOffsetInfo(topicName: TopicName, consumerGroup: ConsumerGroup, commitOffsets: Boolean) - : fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = ??? + : fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = fs2.Stream.empty override def withProducerRecordSizeLimit(sizeLimitBytes: Long): F[KafkaClientAlgebra[F]] = Sync[F].delay { getTestInstance(cache, schemaRegistryUrl, schemaRegistry, sizeLimitBytes.some)