From d1794778513add6fa839c20ad6e92a362a675370 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Sun, 21 Apr 2024 16:06:27 +0530 Subject: [PATCH 01/18] Added an offset stream which will be having all committed offset info aand also added ConsumerGroupsOffsetFacade for holding all offset info --- .../algebras/ConsumerGroupsAlgebra.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 8b49cfd14..5038cb20b 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -15,7 +15,7 @@ import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers} import hydra.kafka.algebras.ConsumerGroupsAlgebra._ import hydra.kafka.algebras.HydraTag.StringJsonFormat -import hydra.kafka.algebras.KafkaClientAlgebra.Record +import hydra.kafka.algebras.KafkaClientAlgebra.{Offset, Partition, Record} import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Infinite import hydra.kafka.algebras.RetryableFs2Stream._ import hydra.kafka.model.TopicConsumer @@ -132,8 +132,16 @@ object ConsumerGroupsAlgebra { kafkaClientSecurityConfig: KafkaClientSecurityConfig ) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = { + + val parentStream = kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false) + + val offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]] = parentStream.map(x => { + x.map(_._2) + }) + val dvsConsumersStream: fs2.Stream[F, Record] = { - kafkaClientAlgebra.consumeSafelyMessages(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false) + parentStream + .map(_.map(_._1)) //Ignore records with errors .collect { case Right(value) => value } } @@ -258,3 +266,19 @@ private object ConsumerGroupsStorageFacade { def empty: ConsumerGroupsStorageFacade = ConsumerGroupsStorageFacade(Map.empty) } +private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) { + + def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = { + val res = this.copy(this.offsetMap + (key -> value)) + println(this.offsetMap) + res + } + + def removeOffset(key: Partition): ConsumerGroupsOffsetFacade = + this.copy(this.offsetMap - key) + } + + private object ConsumerGroupsOffsetFacade { + def empty: ConsumerGroupsOffsetFacade = ConsumerGroupsOffsetFacade(Map.empty) + } + From 9e43eed8b169fb92e3757efa01f80e2eed7a02eb Mon Sep 17 00:00:00 2001 From: Abhishek Date: Sun, 21 Apr 2024 16:43:48 +0530 Subject: [PATCH 02/18] returning empty fs2.stream instead of no implementation --- .../main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala index 2392a8837..4e66f0612 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaClientAlgebra.scala @@ -531,7 +531,7 @@ object KafkaClientAlgebra { : fs2.Stream[F, Either[Throwable, (StringRecord, (Partition, Offset))]] = ??? override def consumeSafelyWithOffsetInfo(topicName: TopicName, consumerGroup: ConsumerGroup, commitOffsets: Boolean) - : fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = ??? + : fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = fs2.Stream.empty override def withProducerRecordSizeLimit(sizeLimitBytes: Long): F[KafkaClientAlgebra[F]] = Sync[F].delay { getTestInstance(cache, schemaRegistryUrl, schemaRegistry, sizeLimitBytes.some) From 7f16c3b964837916e230a78c38ca04ce13ae750e Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 8 May 2024 11:15:50 +0530 Subject: [PATCH 03/18] Added chnages for creating a new endpoint for returing hydra internal consumer topic partition offset map | Added changes for using offset stream and updating the consumer group offset facade --- .../algebras/ConsumerGroupsAlgebra.scala | 47 +++++++++++++++++++ .../endpoints/ConsumerGroupsEndpoint.scala | 12 +++++ 2 files changed, 59 insertions(+) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 5038cb20b..059b134e3 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -33,6 +33,8 @@ trait ConsumerGroupsAlgebra[F[_]] { def getAllConsumers: F[List[ConsumerTopics]] + def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] + def getAllConsumersByTopic: F[List[TopicConsumers]] def startConsumer: F[Unit] @@ -96,6 +98,7 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" + override def getOffsetsForInternalConsumerGroup: IO[List[PartitionOffset]] = ??? } object TestConsumerGroupsAlgebra { @@ -148,11 +151,39 @@ object ConsumerGroupsAlgebra { for { consumerGroupsStorageFacade <- Ref[F].of(ConsumerGroupsStorageFacade.empty) + consumerGroupsOffsetFacade <- Ref[F].of(ConsumerGroupsOffsetFacade.empty) } yield new ConsumerGroupsAlgebra[F] { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) + override def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] = { + + for { + groupOffsetsFromOffsetStream <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset()) + + largestOffsets <- kAA.getLatestOffsets(dvsConsumersTopic.value) + .map(_.map(k => PartitionOffset + ( + k._1.partition, + groupOffsetsFromOffsetStream.getOrElse(k._1.partition, 0), + k._2.value, + -1 + )).toList) + + offsetsWithLag = largestOffsets + .map( + k => PartitionOffset + ( + k.partition, + k.groupOffset, + k.largestOffset, + k.largestOffset - k.groupOffset + ) + ) + }yield offsetsWithLag + } + private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = { val detailedF: F[List[Consumer]] = topicConsumers.consumers.traverse { consumer => val fState = getConsumerActiveState(consumer.consumerGroupName) @@ -179,6 +210,7 @@ object ConsumerGroupsAlgebra { ConsumerGroupsOffsetConsumer.start(kafkaClientAlgebra, kAA, sra, uniquePerNodeConsumerGroup, consumerOffsetsOffsetsTopicConfig, kafkaInternalTopic, dvsConsumersTopic, bootstrapServers, commonConsumerGroup, kafkaClientSecurityConfig) } + _ <- Concurrent[F].start(consumeOffsetStreamIntoCache(offsetStream, consumerGroupsOffsetFacade)) } yield () } @@ -233,6 +265,17 @@ object ConsumerGroupsAlgebra { .makeRetryableWithNotification(Infinite, "ConsumerGroupsAlgebra") .compile.drain } + + private def consumeOffsetStreamIntoCache[F[_] : ContextShift : ConcurrentEffect : Timer : Logger]( + offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]], + consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade] + )(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = { + + offsetStream.evalTap { + case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) + case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache") + }.compile.drain + } } private case class ConsumerGroupsStorageFacade(consumerMap: Map[TopicConsumerKey, TopicConsumerValue]) { @@ -274,6 +317,10 @@ private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) res } + def getAllPartitionOffset(): Map[Partition, Offset] = { + this.offsetMap + } + def removeOffset(key: Partition): ConsumerGroupsOffsetFacade = this.copy(this.offsetMap - key) } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala index aa7fb47e8..fb23c8f61 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala @@ -34,6 +34,18 @@ class ConsumerGroupsEndpoint[F[_]: Futurable](consumerGroupsAlgebra: ConsumerGro addHttpMetric("", StatusCodes.InternalServerError, "/v2/consumer-groups", startTime, method.value, error = Some(exception.getMessage)) complete(StatusCodes.InternalServerError, exception.getMessage) } + } ~ pathPrefix("hydra-internal-topic") { + val startTime = Instant.now + pathEndOrSingleSlash { + onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalConsumerGroup)) { + case Success(detailedConsumer) => + addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic...", startTime, method.value) + complete(StatusCodes.OK, detailedConsumer) + case Failure(exception) => + addHttpMetric("hydra-internal-topic", StatusCodes.InternalServerError, "/v2/consumer-groups/hydra-internal-topic...", startTime, method.value, error = Some(exception.getMessage)) + complete(StatusCodes.InternalServerError, exception.getMessage) + } + } } ~ pathPrefix(Segment) { consumerGroupName => val startTime = Instant.now pathEndOrSingleSlash { From d9a0390e5ccde901f8b19cab4a3a824358899dba Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 8 May 2024 12:14:01 +0530 Subject: [PATCH 04/18] Added changes for ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala --- .../scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala index fb23c8f61..b67beca05 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala @@ -39,10 +39,10 @@ class ConsumerGroupsEndpoint[F[_]: Futurable](consumerGroupsAlgebra: ConsumerGro pathEndOrSingleSlash { onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalConsumerGroup)) { case Success(detailedConsumer) => - addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic...", startTime, method.value) + addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value) complete(StatusCodes.OK, detailedConsumer) case Failure(exception) => - addHttpMetric("hydra-internal-topic", StatusCodes.InternalServerError, "/v2/consumer-groups/hydra-internal-topic...", startTime, method.value, error = Some(exception.getMessage)) + addHttpMetric("hydra-internal-topic", StatusCodes.InternalServerError, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value, error = Some(exception.getMessage)) complete(StatusCodes.InternalServerError, exception.getMessage) } } From 5052eda31b7c2c2b268f7264c1e724394e045228 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 8 May 2024 12:20:37 +0530 Subject: [PATCH 05/18] Added changes to ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala --- .../kafka/algebras/ConsumerGroupsAlgebra.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 059b134e3..13ffb680c 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -157,11 +157,13 @@ object ConsumerGroupsAlgebra { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) + override def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] = { for { groupOffsetsFromOffsetStream <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset()) + // TODO: To be optimized largestOffsets <- kAA.getLatestOffsets(dvsConsumersTopic.value) .map(_.map(k => PartitionOffset ( @@ -311,18 +313,15 @@ private object ConsumerGroupsStorageFacade { private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) { - def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = { - val res = this.copy(this.offsetMap + (key -> value)) - println(this.offsetMap) - res - } + def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = + this.copy(this.offsetMap + (key -> value)) - def getAllPartitionOffset(): Map[Partition, Offset] = { + def getAllPartitionOffset(): Map[Partition, Offset] = this.offsetMap - } def removeOffset(key: Partition): ConsumerGroupsOffsetFacade = - this.copy(this.offsetMap - key) + this.copy(this.offsetMap - key) + } private object ConsumerGroupsOffsetFacade { From d67bd939bb394c61ed81f4bb08ae08e99eba8252 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Fri, 10 May 2024 15:48:46 +0530 Subject: [PATCH 06/18] Refactored the getOffsetsForInternalCGTopic which is getting offset data from cache | optimized the method | Added code to remove the partitions having 0 latest offset | partition level lag is calucalted properly --- .../algebras/ConsumerGroupsAlgebra.scala | 47 +++++++++---------- .../endpoints/ConsumerGroupsEndpoint.scala | 2 +- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 13ffb680c..eb4bab24d 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -33,7 +33,7 @@ trait ConsumerGroupsAlgebra[F[_]] { def getAllConsumers: F[List[ConsumerTopics]] - def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] + def getOffsetsForInternalCGTopic: F[List[PartitionOffset]] def getAllConsumersByTopic: F[List[TopicConsumers]] @@ -98,7 +98,7 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" - override def getOffsetsForInternalConsumerGroup: IO[List[PartitionOffset]] = ??? + override def getOffsetsForInternalCGTopic: IO[List[PartitionOffset]] = ??? } object TestConsumerGroupsAlgebra { @@ -157,33 +157,28 @@ object ConsumerGroupsAlgebra { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) + override def getOffsetsForInternalCGTopic: F[List[PartitionOffset]] = { - override def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] = { + def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = + offsetMap.get(partition) match { + case Some(value) => value + case _ => -1 + } for { - groupOffsetsFromOffsetStream <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset()) - - // TODO: To be optimized - largestOffsets <- kAA.getLatestOffsets(dvsConsumersTopic.value) - .map(_.map(k => PartitionOffset - ( - k._1.partition, - groupOffsetsFromOffsetStream.getOrElse(k._1.partition, 0), - k._2.value, - -1 - )).toList) - - offsetsWithLag = largestOffsets - .map( - k => PartitionOffset - ( - k.partition, - k.groupOffset, - k.largestOffset, - k.largestOffset - k.groupOffset - ) - ) - }yield offsetsWithLag + groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset()) + partitionOffsetMapWithLag <- kAA.getLatestOffsets(dvsConsumersTopic.value) + .map( _.toList + .filter( _._2.value > 0.toLong) + .map( latestOffset => PartitionOffset( + latestOffset._1.partition, + getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap), + latestOffset._2.value - 1.toLong, + latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap) - 1.toLong + )).toList) + + } yield partitionOffsetMapWithLag + } private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = { diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala index b67beca05..c69d6360b 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala @@ -37,7 +37,7 @@ class ConsumerGroupsEndpoint[F[_]: Futurable](consumerGroupsAlgebra: ConsumerGro } ~ pathPrefix("hydra-internal-topic") { val startTime = Instant.now pathEndOrSingleSlash { - onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalConsumerGroup)) { + onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalCGTopic)) { case Success(detailedConsumer) => addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value) complete(StatusCodes.OK, detailedConsumer) From dba86d218fe21b16fff0a3ae4f887088e5ec614d Mon Sep 17 00:00:00 2001 From: Abhishek Date: Mon, 13 May 2024 13:14:01 +0530 Subject: [PATCH 07/18] Created a new response having consolidated lags and lag percentage | Added marshalling logic for new case object in ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala --- .../algebras/ConsumerGroupsAlgebra.scala | 24 +++++++++++++------ .../ConsumerGroupMarshallers.scala | 14 ++++++++++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index eb4bab24d..b0751beed 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -33,7 +33,7 @@ trait ConsumerGroupsAlgebra[F[_]] { def getAllConsumers: F[List[ConsumerTopics]] - def getOffsetsForInternalCGTopic: F[List[PartitionOffset]] + def getOffsetsForInternalCGTopic: F[TotalOffsetsWithLag] def getAllConsumersByTopic: F[List[TopicConsumers]] @@ -98,7 +98,7 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" - override def getOffsetsForInternalCGTopic: IO[List[PartitionOffset]] = ??? + override def getOffsetsForInternalCGTopic: IO[TotalOffsetsWithLag] = ??? } object TestConsumerGroupsAlgebra { @@ -109,6 +109,8 @@ object ConsumerGroupsAlgebra { type PartitionOffsetMap = Map[Int, Long] + final case class TotalOffsetsWithLag(largestOffset: Long, groupOffset: Long, lag: Long, lagPercentage: Double, offsetMap: List[PartitionOffset]) + final case class PartitionOffset(partition: Int, groupOffset: Long, largestOffset: Long, partitionLag: Long) final case class TopicConsumers(topicName: String, consumers: List[Consumer]) @@ -157,7 +159,7 @@ object ConsumerGroupsAlgebra { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) - override def getOffsetsForInternalCGTopic: F[List[PartitionOffset]] = { + override def getOffsetsForInternalCGTopic: F[TotalOffsetsWithLag] = { def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = offsetMap.get(partition) match { @@ -167,17 +169,25 @@ object ConsumerGroupsAlgebra { for { groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset()) + partitionOffsetMapWithLag <- kAA.getLatestOffsets(dvsConsumersTopic.value) - .map( _.toList - .filter( _._2.value > 0.toLong) - .map( latestOffset => PartitionOffset( + .map(_.toList + .filter(_._2.value > 0.toLong) + .map(latestOffset => PartitionOffset( latestOffset._1.partition, getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap), latestOffset._2.value - 1.toLong, latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap) - 1.toLong )).toList) - } yield partitionOffsetMapWithLag + (totalLargestOffset, totalGroupOffset) = + (partitionOffsetMapWithLag.map(_.largestOffset).sum, partitionOffsetMapWithLag.map(_.groupOffset).sum) + + totalLag = totalLargestOffset - totalGroupOffset + + lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100 + + } yield TotalOffsetsWithLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala index 9527d4fcf..ef7a32f98 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala @@ -5,7 +5,7 @@ import spray.json.{RootJsonFormat, _} import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import hydra.core.transport.AckStrategy import hydra.kafka.algebras.{ConsumerGroupsAlgebra, KafkaAdminAlgebra} -import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers} +import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers, TotalOffsetsWithLag} import hydra.kafka.algebras.KafkaAdminAlgebra.{LagOffsets, Offset, TopicAndPartition} import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented @@ -56,6 +56,18 @@ trait ConsumerGroupMarshallers extends DefaultJsonProtocol with SprayJsonSupport override def read(json: JsValue): ConsumerGroupsAlgebra.Consumer = throw IntentionallyUnimplemented } + implicit object totalOffsetsWithLag extends RootJsonFormat[TotalOffsetsWithLag] { + override def write(totalOffsetsWithLag: TotalOffsetsWithLag): JsValue = JsObject(List( + Some("largestOffset" -> JsNumber(totalOffsetsWithLag.largestOffset)), + Some("groupOffset" -> JsNumber(totalOffsetsWithLag.groupOffset)), + Some("lag" -> JsNumber(totalOffsetsWithLag.lag)), + Some("lagPercentage" -> DoubleJsonFormat.write(totalOffsetsWithLag.lagPercentage)), + Some("offsetInformation" -> JsArray(totalOffsetsWithLag.offsetMap.sortBy(_.partition).map(partitionOffset.write).toVector)) + ).flatten.toMap) + + override def read(json: JsValue): TotalOffsetsWithLag = throw IntentionallyUnimplemented + } + implicit val consumerTopicsFormat: RootJsonFormat[ConsumerTopics] = jsonFormat2(ConsumerTopics) implicit val topicConsumersFormat: RootJsonFormat[TopicConsumers] = jsonFormat2(TopicConsumers) From ad8a210ab8cef115e102a3b611140484f543a553 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Tue, 14 May 2024 11:43:57 +0530 Subject: [PATCH 08/18] Added offset change code for starting with 0 | Add code to marshaller for new case object --- .../algebras/ConsumerGroupsAlgebra.scala | 19 ++++++++-------- .../ConsumerGroupMarshallers.scala | 22 ++++++++++--------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index b0751beed..10c7e97be 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -33,7 +33,7 @@ trait ConsumerGroupsAlgebra[F[_]] { def getAllConsumers: F[List[ConsumerTopics]] - def getOffsetsForInternalCGTopic: F[TotalOffsetsWithLag] + def getOffsetsForInternalCGTopic: F[PartitionOffsetsWithTotalLag] def getAllConsumersByTopic: F[List[TopicConsumers]] @@ -98,7 +98,7 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" - override def getOffsetsForInternalCGTopic: IO[TotalOffsetsWithLag] = ??? + override def getOffsetsForInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = ??? } object TestConsumerGroupsAlgebra { @@ -109,7 +109,8 @@ object ConsumerGroupsAlgebra { type PartitionOffsetMap = Map[Int, Long] - final case class TotalOffsetsWithLag(largestOffset: Long, groupOffset: Long, lag: Long, lagPercentage: Double, offsetMap: List[PartitionOffset]) + final case class PartitionOffsetsWithTotalLag(totalLargestOffset: Long, totalGroupOffset: Long, totalLag: Long, + lagPercentage: Double, partitionOffsets: List[PartitionOffset]) final case class PartitionOffset(partition: Int, groupOffset: Long, largestOffset: Long, partitionLag: Long) @@ -159,12 +160,12 @@ object ConsumerGroupsAlgebra { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) - override def getOffsetsForInternalCGTopic: F[TotalOffsetsWithLag] = { + override def getOffsetsForInternalCGTopic: F[PartitionOffsetsWithTotalLag] = { def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = offsetMap.get(partition) match { - case Some(value) => value - case _ => -1 + case Some(value) => value + 1.toLong + case _ => 0 } for { @@ -176,8 +177,8 @@ object ConsumerGroupsAlgebra { .map(latestOffset => PartitionOffset( latestOffset._1.partition, getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap), - latestOffset._2.value - 1.toLong, - latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap) - 1.toLong + latestOffset._2.value, + latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap) )).toList) (totalLargestOffset, totalGroupOffset) = @@ -187,7 +188,7 @@ object ConsumerGroupsAlgebra { lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100 - } yield TotalOffsetsWithLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) + } yield PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala index ef7a32f98..4ea6d960b 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala @@ -5,7 +5,7 @@ import spray.json.{RootJsonFormat, _} import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import hydra.core.transport.AckStrategy import hydra.kafka.algebras.{ConsumerGroupsAlgebra, KafkaAdminAlgebra} -import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers, TotalOffsetsWithLag} +import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, PartitionOffsetsWithTotalLag, TopicConsumers} import hydra.kafka.algebras.KafkaAdminAlgebra.{LagOffsets, Offset, TopicAndPartition} import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented @@ -56,16 +56,18 @@ trait ConsumerGroupMarshallers extends DefaultJsonProtocol with SprayJsonSupport override def read(json: JsValue): ConsumerGroupsAlgebra.Consumer = throw IntentionallyUnimplemented } - implicit object totalOffsetsWithLag extends RootJsonFormat[TotalOffsetsWithLag] { - override def write(totalOffsetsWithLag: TotalOffsetsWithLag): JsValue = JsObject(List( - Some("largestOffset" -> JsNumber(totalOffsetsWithLag.largestOffset)), - Some("groupOffset" -> JsNumber(totalOffsetsWithLag.groupOffset)), - Some("lag" -> JsNumber(totalOffsetsWithLag.lag)), - Some("lagPercentage" -> DoubleJsonFormat.write(totalOffsetsWithLag.lagPercentage)), - Some("offsetInformation" -> JsArray(totalOffsetsWithLag.offsetMap.sortBy(_.partition).map(partitionOffset.write).toVector)) - ).flatten.toMap) + implicit object totalOffsetsWithLag extends RootJsonFormat[PartitionOffsetsWithTotalLag] { + override def write(partitionOffsetsWithTotalLag: PartitionOffsetsWithTotalLag): JsValue = JsObject(List( + Some("totalGroupOffset" -> JsNumber(partitionOffsetsWithTotalLag.totalGroupOffset)), + Some("totalLargestOffset" -> JsNumber(partitionOffsetsWithTotalLag.totalLargestOffset)), + Some("totalLag" -> JsNumber(partitionOffsetsWithTotalLag.totalLag)), + Some("lagPercentage" -> DoubleJsonFormat.write(partitionOffsetsWithTotalLag.lagPercentage)), + if (partitionOffsetsWithTotalLag.partitionOffsets.isEmpty) None + else Some( + "partitionOffsets" -> JsArray(partitionOffsetsWithTotalLag.partitionOffsets.sortBy(_.partition).map(partitionOffset.write).toVector) + )).flatten.toMap) - override def read(json: JsValue): TotalOffsetsWithLag = throw IntentionallyUnimplemented + override def read(json: JsValue): PartitionOffsetsWithTotalLag = throw IntentionallyUnimplemented } implicit val consumerTopicsFormat: RootJsonFormat[ConsumerTopics] = jsonFormat2(ConsumerTopics) From 07bd28917c7c3411862c7dde9c39a0bd5daa50c4 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Tue, 21 May 2024 22:56:20 +0530 Subject: [PATCH 09/18] Added code for sending the slack notifictaion having all offset info for _hydra.consumer_groups internal topic --- .../algebras/ConsumerGroupsAlgebra.scala | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 10c7e97be..73e2a334d 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -8,8 +8,8 @@ import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, IO, Timer} import cats.implicits._ import fs2.kafka._ import hydra.avro.registry.SchemaRegistry -import hydra.common.alerting.AlertProtocol.NotificationMessage -import hydra.common.alerting.NotificationLevel +import hydra.common.alerting.AlertProtocol.{NotificationMessage, NotificationScope} +import hydra.common.alerting.{NotificationLevel, NotificationType} import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender} import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers} @@ -188,8 +188,15 @@ object ConsumerGroupsAlgebra { lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100 - } yield PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) - + _ <- notificationsService.send(NotificationScope(NotificationLevel.Warn), + NotificationMessage( + s"""Total Offset Lag on ${dvsConsumersTopic} is ${totalLag.toString} , + | Lag percentage is ${lagPercentage.toString} , + | Total_Group_Offset is ${totalGroupOffset} , + | Total_Largest_Offset is ${totalLargestOffset}""".stripMargin) + ) + } yield + PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) } private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = { @@ -279,10 +286,12 @@ object ConsumerGroupsAlgebra { consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade] )(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = { - offsetStream.evalTap { - case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) - case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache") - }.compile.drain + offsetStream.evalTap { + case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) + case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache") + } + .makeRetryableWithNotification(Infinite, "offsetStream") + .compile.drain } } From 4319877e6ac80f6648fde863feb22de499c61ee5 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 22 May 2024 00:17:55 +0530 Subject: [PATCH 10/18] Added test cases for checking the lag --- .../algebras/ConsumerGroupsAlgebraSpec.scala | 27 ++++++++++++++++++- .../algebras/ConsumerGroupsAlgebra.scala | 5 ++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala index b7f789293..7bbb0d787 100644 --- a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala +++ b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala @@ -10,7 +10,7 @@ import hydra.common.NotificationsTestSuite import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender} import hydra.common.config.KafkaConfigUtils.{KafkaClientSecurityConfig, SchemaRegistrySecurityConfig, kafkaSecurityEmptyConfig} import hydra.common.alerting.sender.InternalNotificationSender -import hydra.kafka.algebras.ConsumerGroupsAlgebra.PartitionOffsetMap +import hydra.kafka.algebras.ConsumerGroupsAlgebra.{PartitionOffsetMap, PartitionOffsetsWithTotalLag} import hydra.kafka.algebras.KafkaClientAlgebra.{OffsetInfo, Record} import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue} import hydra.kafka.model.TopicConsumerOffset.{TopicConsumerOffsetKey, TopicConsumerOffsetValue} @@ -185,6 +185,31 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl case Right(_) => succeed }.unsafeRunSync() } + + "getOffsetsForInternalCGTopic test to verify no lag with commit offsets as false" in { + val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") + + kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync() + + kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) + .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some + + cga.getOffsetsForInternalCGTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_)) + } + + "getOffsetsForInternalCGTopic test to verify some lag with commit offsets as false" in { + val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") + val (key2, value2) = getGenericRecords(dvsConsumerTopic.value, "abcd", "1234") + + kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync() + kafkaClient.publishMessage((key2, Some(value2), None), dvsConsumerTopic.value).unsafeRunSync() + + kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) + .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some + + cga.getOffsetsForInternalCGTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _)) + } + } } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 73e2a334d..533c9cef3 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -99,6 +99,11 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" override def getOffsetsForInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = ??? +// { +// IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50, +// List(PartitionOffset(1,10,20,10), PartitionOffset(2,10,20,10), PartitionOffset(3,10,20,10)) +// )) +// } } object TestConsumerGroupsAlgebra { From 593acab24527d5ca15a603ba89a8a7c20261518d Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 22 May 2024 00:26:35 +0530 Subject: [PATCH 11/18] Added impl to test TestConsumerGroupsAlgebra --- .../hydra/kafka/algebras/ConsumerGroupsAlgebra.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 533c9cef3..dcd3f6e27 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -98,12 +98,11 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" - override def getOffsetsForInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = ??? -// { -// IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50, -// List(PartitionOffset(1,10,20,10), PartitionOffset(2,10,20,10), PartitionOffset(3,10,20,10)) -// )) -// } + override def getOffsetsForInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = { + IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50, + List(PartitionOffset(1,10,20,10), PartitionOffset(2,10,20,10), PartitionOffset(3,10,20,10)) + )) + } } object TestConsumerGroupsAlgebra { From 02efd859e0481029e55c435d4cfa68d90dca5058 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 29 May 2024 13:53:36 +0530 Subject: [PATCH 12/18] Adding changes --- .../hydra/kafka/algebras/ConsumerGroupsAlgebra.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index dcd3f6e27..556eaf8c6 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -25,6 +25,9 @@ import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented import hydra.kafka.util.ConsumerGroupsOffsetConsumer import org.apache.avro.generic.GenericRecord import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.extras.Translate.logger + +import scala.concurrent.duration.{DurationInt, FiniteDuration} trait ConsumerGroupsAlgebra[F[_]] { def getConsumersForTopic(topicName: String): F[TopicConsumers] @@ -161,6 +164,11 @@ object ConsumerGroupsAlgebra { consumerGroupsOffsetFacade <- Ref[F].of(ConsumerGroupsOffsetFacade.empty) } yield new ConsumerGroupsAlgebra[F] { + +// fs2.Stream +// .awakeEvery[F](1.minutes) +// .evalMap(_ => getOffsetsForInternalCGTopic).compile.drain + override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) @@ -194,7 +202,7 @@ object ConsumerGroupsAlgebra { _ <- notificationsService.send(NotificationScope(NotificationLevel.Warn), NotificationMessage( - s"""Total Offset Lag on ${dvsConsumersTopic} is ${totalLag.toString} , + s"""For Prod - Total Offset Lag on ${dvsConsumersTopic} is ${totalLag.toString} , | Lag percentage is ${lagPercentage.toString} , | Total_Group_Offset is ${totalGroupOffset} , | Total_Largest_Offset is ${totalLargestOffset}""".stripMargin) From bc7c6f730c19c0b17effefcea47010e78344cde4 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 5 Jun 2024 21:02:22 +0530 Subject: [PATCH 13/18] Refactored code to have a single stream and adjusted updation of cache one after the other. First Consumer group storage facade is getting updated then just after that consumer group offset facade. --- .../algebras/ConsumerGroupsAlgebraSpec.scala | 8 +- .../algebras/ConsumerGroupsAlgebra.scala | 77 +++++++------------ .../endpoints/ConsumerGroupsEndpoint.scala | 12 --- 3 files changed, 30 insertions(+), 67 deletions(-) diff --git a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala index 7bbb0d787..647799784 100644 --- a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala +++ b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala @@ -186,7 +186,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl }.unsafeRunSync() } - "getOffsetsForInternalCGTopic test to verify no lag with commit offsets as false" in { + "test getLagOnDVSConsumerTopic to verify no lag with commit offsets as false" in { val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync() @@ -194,10 +194,10 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some - cga.getOffsetsForInternalCGTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_)) + cga.getLagOnDVSConsumerTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_)) } - "getOffsetsForInternalCGTopic test to verify some lag with commit offsets as false" in { + "test getLagOnDVSConsumerTopic to verify some lag with commit offsets as false" in { val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") val (key2, value2) = getGenericRecords(dvsConsumerTopic.value, "abcd", "1234") @@ -207,7 +207,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some - cga.getOffsetsForInternalCGTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _)) + cga.getLagOnDVSConsumerTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _)) } } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 556eaf8c6..799467a4b 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -36,8 +36,6 @@ trait ConsumerGroupsAlgebra[F[_]] { def getAllConsumers: F[List[ConsumerTopics]] - def getOffsetsForInternalCGTopic: F[PartitionOffsetsWithTotalLag] - def getAllConsumersByTopic: F[List[TopicConsumers]] def startConsumer: F[Unit] @@ -49,6 +47,8 @@ trait ConsumerGroupsAlgebra[F[_]] { def consumerGroupIsActive(str: String): F[(Boolean, String)] def getUniquePerNodeConsumerGroup: String + + def getLagOnDVSConsumerTopic: F[PartitionOffsetsWithTotalLag] } final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKey, (TopicConsumerValue, String)]) extends ConsumerGroupsAlgebra[IO] { @@ -101,9 +101,9 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" - override def getOffsetsForInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = { + override def getLagOnDVSConsumerTopic: IO[PartitionOffsetsWithTotalLag] = { IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50, - List(PartitionOffset(1,10,20,10), PartitionOffset(2,10,20,10), PartitionOffset(3,10,20,10)) + List(PartitionOffset(1, 10, 20, 10), PartitionOffset(2, 10, 20, 10), PartitionOffset(3, 10, 20, 10)) )) } } @@ -146,15 +146,8 @@ object ConsumerGroupsAlgebra { ) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = { - val parentStream = kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false) - - val offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]] = parentStream.map(x => { - x.map(_._2) - }) - - val dvsConsumersStream: fs2.Stream[F, Record] = { - parentStream - .map(_.map(_._1)) + val dvsConsumersStream: fs2.Stream[F, (Record, (Partition, Offset))] = { + kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false) //Ignore records with errors .collect { case Right(value) => value } } @@ -164,15 +157,10 @@ object ConsumerGroupsAlgebra { consumerGroupsOffsetFacade <- Ref[F].of(ConsumerGroupsOffsetFacade.empty) } yield new ConsumerGroupsAlgebra[F] { - -// fs2.Stream -// .awakeEvery[F](1.minutes) -// .evalMap(_ => getOffsetsForInternalCGTopic).compile.drain - override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) - override def getOffsetsForInternalCGTopic: F[PartitionOffsetsWithTotalLag] = { + override def getLagOnDVSConsumerTopic: F[PartitionOffsetsWithTotalLag] = { def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = offsetMap.get(partition) match { @@ -181,7 +169,7 @@ object ConsumerGroupsAlgebra { } for { - groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset()) + groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getOffsets()) partitionOffsetMapWithLag <- kAA.getLatestOffsets(dvsConsumersTopic.value) .map(_.toList @@ -200,15 +188,7 @@ object ConsumerGroupsAlgebra { lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100 - _ <- notificationsService.send(NotificationScope(NotificationLevel.Warn), - NotificationMessage( - s"""For Prod - Total Offset Lag on ${dvsConsumersTopic} is ${totalLag.toString} , - | Lag percentage is ${lagPercentage.toString} , - | Total_Group_Offset is ${totalGroupOffset} , - | Total_Largest_Offset is ${totalLargestOffset}""".stripMargin) - ) - } yield - PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) + } yield PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag) } private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = { @@ -232,12 +212,18 @@ object ConsumerGroupsAlgebra { override def startConsumer: F[Unit] = { for { - _ <- Concurrent[F].start(consumeDVSConsumersTopicIntoCache(dvsConsumersStream, consumerGroupsStorageFacade)) + _ <- Concurrent[F].start(consumeDVSConsumersTopicIntoCache(dvsConsumersStream, consumerGroupsStorageFacade, consumerGroupsOffsetFacade)) _ <- Concurrent[F].start { ConsumerGroupsOffsetConsumer.start(kafkaClientAlgebra, kAA, sra, uniquePerNodeConsumerGroup, consumerOffsetsOffsetsTopicConfig, kafkaInternalTopic, dvsConsumersTopic, bootstrapServers, commonConsumerGroup, kafkaClientSecurityConfig) } - _ <- Concurrent[F].start(consumeOffsetStreamIntoCache(offsetStream, consumerGroupsOffsetFacade)) + _ <- fs2.Stream.awakeEvery[F](1.minutes).evalMap(_ => getLagOnDVSConsumerTopic.flatMap( + lagInfo => Logger[F].info( + s"""Total Offset Lag on ${dvsConsumersTopic} = ${lagInfo.totalLag.toString} , + Lag percentage = ${lagInfo.lagPercentage.toString} , + Total_Group_Offset = ${lagInfo.totalGroupOffset} , + Total_Largest_Offset = ${lagInfo.totalLargestOffset}""" + ))).compile.drain } yield () } @@ -273,17 +259,19 @@ object ConsumerGroupsAlgebra { } private def consumeDVSConsumersTopicIntoCache[F[_] : ContextShift : ConcurrentEffect : Timer : Logger]( - dvsConsumersStream: fs2.Stream[F, Record], - consumerGroupsStorageFacade: Ref[F, ConsumerGroupsStorageFacade] + dvsConsumersStream: fs2.Stream[F, (Record, (Partition, Offset))], + consumerGroupsStorageFacade: Ref[F, ConsumerGroupsStorageFacade], + consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade], )(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = { - dvsConsumersStream.evalTap { case (key, value, _) => - TopicConsumer.decode[F](key, value).flatMap { + dvsConsumersStream.evalTap { + case ((key, value, _),(partition, offset)) => + TopicConsumer.decode[F](key, value).flatMap { case (topicKey, topicValue) => topicValue match { case Some(tV) => - consumerGroupsStorageFacade.update(_.addConsumerGroup(topicKey, tV)) + consumerGroupsStorageFacade.update(_.addConsumerGroup(topicKey, tV)) *> consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) case None => - consumerGroupsStorageFacade.update(_.removeConsumerGroup(topicKey)) + consumerGroupsStorageFacade.update(_.removeConsumerGroup(topicKey)) *> consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) } }.recoverWith { case e => Logger[F].error(e)("Error in ConsumergroupsAlgebra consumer") @@ -292,19 +280,6 @@ object ConsumerGroupsAlgebra { .makeRetryableWithNotification(Infinite, "ConsumerGroupsAlgebra") .compile.drain } - - private def consumeOffsetStreamIntoCache[F[_] : ContextShift : ConcurrentEffect : Timer : Logger]( - offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]], - consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade] - )(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = { - - offsetStream.evalTap { - case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset)) - case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache") - } - .makeRetryableWithNotification(Infinite, "offsetStream") - .compile.drain - } } private case class ConsumerGroupsStorageFacade(consumerMap: Map[TopicConsumerKey, TopicConsumerValue]) { @@ -343,7 +318,7 @@ private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = this.copy(this.offsetMap + (key -> value)) - def getAllPartitionOffset(): Map[Partition, Offset] = + def getOffsets(): Map[Partition, Offset] = this.offsetMap def removeOffset(key: Partition): ConsumerGroupsOffsetFacade = diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala index c69d6360b..aa7fb47e8 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/endpoints/ConsumerGroupsEndpoint.scala @@ -34,18 +34,6 @@ class ConsumerGroupsEndpoint[F[_]: Futurable](consumerGroupsAlgebra: ConsumerGro addHttpMetric("", StatusCodes.InternalServerError, "/v2/consumer-groups", startTime, method.value, error = Some(exception.getMessage)) complete(StatusCodes.InternalServerError, exception.getMessage) } - } ~ pathPrefix("hydra-internal-topic") { - val startTime = Instant.now - pathEndOrSingleSlash { - onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalCGTopic)) { - case Success(detailedConsumer) => - addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value) - complete(StatusCodes.OK, detailedConsumer) - case Failure(exception) => - addHttpMetric("hydra-internal-topic", StatusCodes.InternalServerError, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value, error = Some(exception.getMessage)) - complete(StatusCodes.InternalServerError, exception.getMessage) - } - } } ~ pathPrefix(Segment) { consumerGroupName => val startTime = Instant.now pathEndOrSingleSlash { From cd03d7398a19755db0192566f9eb3d4fac620d69 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 6 Jun 2024 13:15:35 +0530 Subject: [PATCH 14/18] Added code in app config in order to pick lagPublishInterval and set to 1.minutes as deafult | Added cascading changes to spec files --- .../scala/hydra.ingest/app/AppConfig.scala | 8 ++++--- .../scala/hydra.ingest/modules/Algebras.scala | 3 ++- .../http/TopicDeletionEndpointSpec.scala | 3 ++- .../algebras/ConsumerGroupsAlgebraSpec.scala | 10 ++++---- .../algebras/ConsumerGroupsAlgebra.scala | 24 +++++++------------ 5 files changed, 22 insertions(+), 26 deletions(-) diff --git a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala index 82b78153f..79ef5c5a3 100644 --- a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala +++ b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala @@ -209,14 +209,16 @@ object AppConfig { final case class ConsumerGroupsAlgebraConfig( kafkaInternalConsumerGroupsTopic: String, commonConsumerGroup: ConsumerGroup, - consumerGroupsConsumerEnabled: Boolean + consumerGroupsConsumerEnabled: Boolean, + lagPublishInterval: FiniteDuration ) private val consumerGroupAlgebraConfig: ConfigValue[ConsumerGroupsAlgebraConfig] = ( env("KAFKA_CONSUMER_GROUPS_INTERNAL_TOPIC_NAME").as[String].default("__consumer_offsets"), - env("HYDRA_CONSUMER_GROUPS_COMMON_CONSUMER_GROUP").as[ConsumerGroup].default("kafkaInternalConsumerGroupsTopic-ConsumerGroupName"), - env("CONSUMER_GROUPS_CONSUMER_ENABLED").as[Boolean].default(true) + env("HYDRA_CONSUMER_GROUPS_COMMON_CONSUMER_GROUP").as[ConsumerGroup].default("kafkaInternalConsumerGroupsTopic-ConsumerGroupName"), + env("CONSUMER_GROUPS_CONSUMER_ENABLED").as[Boolean].default(true), + env("CONSUMER_GROUPS_INTERNAL_TOPIC_LAG_PUBLISH_INTERVAL").as[FiniteDuration].default(1.minutes) ).parMapN(ConsumerGroupsAlgebraConfig) final case class IngestConfig( diff --git a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala index 8ee343e41..c5e81b4af 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala @@ -64,7 +64,8 @@ object Algebras { kafkaClientAlgebra = kafkaClient, kAA = kafkaAdmin, sra = schemaRegistry, - config.kafkaClientSecurityConfig + config.kafkaClientSecurityConfig, + config.consumerGroupsAlgebraConfig.lagPublishInterval ) awsIamClient <- AwsIamClient.make awsStsClient <- AwsStsClient.make diff --git a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala index f8c4fcecc..b6d29137a 100644 --- a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala @@ -33,6 +33,7 @@ import scalacache.Cache import scalacache.guava.GuavaCache import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt @@ -147,7 +148,7 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala implicit val notificationSenderMock: InternalNotificationSender[IO] = getInternalNotificationSenderMock[IO] ConsumerGroupsAlgebra.make("",Subject.createValidated("dvs.blah.blah").get, Subject.createValidated("dvs.heyo.blah").get,"","","", - kafkaClientAlgebra,kafkaAdminAlgebra,schemaAlgebra, kafkaSecurityEmptyConfig) + kafkaClientAlgebra,kafkaAdminAlgebra,schemaAlgebra, kafkaSecurityEmptyConfig, 1.minutes) } "The deletionEndpoint path" should { diff --git a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala index 647799784..858502821 100644 --- a/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala +++ b/ingestors/kafka/src/it/scala/hydra/kafka/algebras/ConsumerGroupsAlgebraSpec.scala @@ -72,7 +72,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl kafkaClient <- KafkaClientAlgebra.live[IO](container.bootstrapServers, "https://schema-registry", schemaRegistry , kafkaSecurityEmptyConfig) consumerGroupAlgebra <- ConsumerGroupsAlgebra.make(internalKafkaConsumerTopic, dvsConsumerTopic, dvsInternalKafkaOffsetsTopic, - container.bootstrapServers, consumerGroup, consumerGroup, kafkaClient, kafkaAdmin, schemaRegistry, kafkaSecurityEmptyConfig) + container.bootstrapServers, consumerGroup, consumerGroup, kafkaClient, kafkaAdmin, schemaRegistry, kafkaSecurityEmptyConfig, 1.minutes) _ <- consumerGroupAlgebra.startConsumer } yield { runTests(consumerGroupAlgebra, schemaRegistry, kafkaClient, kafkaAdmin) @@ -186,7 +186,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl }.unsafeRunSync() } - "test getLagOnDVSConsumerTopic to verify no lag with commit offsets as false" in { + "test getLagOnInternalConsumerTopic to verify no lag with commit offsets as false" in { val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync() @@ -194,10 +194,10 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some - cga.getLagOnDVSConsumerTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_)) + cga.getLagOnDvsInternalCGTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_)) } - "test getLagOnDVSConsumerTopic to verify some lag with commit offsets as false" in { + "test getLagOnInternalConsumerTopic to verify some lag with commit offsets as false" in { val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123") val (key2, value2) = getGenericRecords(dvsConsumerTopic.value, "abcd", "1234") @@ -207,7 +207,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false) .take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some - cga.getLagOnDVSConsumerTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _)) + cga.getLagOnDvsInternalCGTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _)) } } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 799467a4b..520ac34c3 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -1,20 +1,12 @@ package hydra.kafka.algebras -import cats.ApplicativeError - -import java.time.Instant import cats.effect.concurrent.Ref import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, IO, Timer} import cats.implicits._ -import fs2.kafka._ import hydra.avro.registry.SchemaRegistry -import hydra.common.alerting.AlertProtocol.{NotificationMessage, NotificationScope} -import hydra.common.alerting.{NotificationLevel, NotificationType} -import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender} +import hydra.common.alerting.sender.InternalNotificationSender import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig -import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers} import hydra.kafka.algebras.ConsumerGroupsAlgebra._ -import hydra.kafka.algebras.HydraTag.StringJsonFormat import hydra.kafka.algebras.KafkaClientAlgebra.{Offset, Partition, Record} import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Infinite import hydra.kafka.algebras.RetryableFs2Stream._ @@ -23,10 +15,9 @@ import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue} import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented import hydra.kafka.util.ConsumerGroupsOffsetConsumer -import org.apache.avro.generic.GenericRecord import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.extras.Translate.logger +import java.time.Instant import scala.concurrent.duration.{DurationInt, FiniteDuration} trait ConsumerGroupsAlgebra[F[_]] { @@ -48,7 +39,7 @@ trait ConsumerGroupsAlgebra[F[_]] { def getUniquePerNodeConsumerGroup: String - def getLagOnDVSConsumerTopic: F[PartitionOffsetsWithTotalLag] + def getLagOnDvsInternalCGTopic: F[PartitionOffsetsWithTotalLag] } final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKey, (TopicConsumerValue, String)]) extends ConsumerGroupsAlgebra[IO] { @@ -101,7 +92,7 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup" - override def getLagOnDVSConsumerTopic: IO[PartitionOffsetsWithTotalLag] = { + override def getLagOnDvsInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = { IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50, List(PartitionOffset(1, 10, 20, 10), PartitionOffset(2, 10, 20, 10), PartitionOffset(3, 10, 20, 10)) )) @@ -142,7 +133,8 @@ object ConsumerGroupsAlgebra { kafkaClientAlgebra: KafkaClientAlgebra[F], kAA: KafkaAdminAlgebra[F], sra: SchemaRegistry[F], - kafkaClientSecurityConfig: KafkaClientSecurityConfig + kafkaClientSecurityConfig: KafkaClientSecurityConfig, + lagPublishInterval: FiniteDuration ) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = { @@ -160,7 +152,7 @@ object ConsumerGroupsAlgebra { override def getConsumersForTopic(topicName: String): F[TopicConsumers] = consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName))) - override def getLagOnDVSConsumerTopic: F[PartitionOffsetsWithTotalLag] = { + override def getLagOnDvsInternalCGTopic: F[PartitionOffsetsWithTotalLag] = { def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = offsetMap.get(partition) match { @@ -217,7 +209,7 @@ object ConsumerGroupsAlgebra { ConsumerGroupsOffsetConsumer.start(kafkaClientAlgebra, kAA, sra, uniquePerNodeConsumerGroup, consumerOffsetsOffsetsTopicConfig, kafkaInternalTopic, dvsConsumersTopic, bootstrapServers, commonConsumerGroup, kafkaClientSecurityConfig) } - _ <- fs2.Stream.awakeEvery[F](1.minutes).evalMap(_ => getLagOnDVSConsumerTopic.flatMap( + _ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap( lagInfo => Logger[F].info( s"""Total Offset Lag on ${dvsConsumersTopic} = ${lagInfo.totalLag.toString} , Lag percentage = ${lagInfo.lagPercentage.toString} , From e1837711fdea62a2a67bf732b9d7e7155ece068d Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 6 Jun 2024 13:32:04 +0530 Subject: [PATCH 15/18] Removed code from ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala --- .../marshallers/ConsumerGroupMarshallers.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala index 4ea6d960b..99e074791 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala @@ -56,19 +56,6 @@ trait ConsumerGroupMarshallers extends DefaultJsonProtocol with SprayJsonSupport override def read(json: JsValue): ConsumerGroupsAlgebra.Consumer = throw IntentionallyUnimplemented } - implicit object totalOffsetsWithLag extends RootJsonFormat[PartitionOffsetsWithTotalLag] { - override def write(partitionOffsetsWithTotalLag: PartitionOffsetsWithTotalLag): JsValue = JsObject(List( - Some("totalGroupOffset" -> JsNumber(partitionOffsetsWithTotalLag.totalGroupOffset)), - Some("totalLargestOffset" -> JsNumber(partitionOffsetsWithTotalLag.totalLargestOffset)), - Some("totalLag" -> JsNumber(partitionOffsetsWithTotalLag.totalLag)), - Some("lagPercentage" -> DoubleJsonFormat.write(partitionOffsetsWithTotalLag.lagPercentage)), - if (partitionOffsetsWithTotalLag.partitionOffsets.isEmpty) None - else Some( - "partitionOffsets" -> JsArray(partitionOffsetsWithTotalLag.partitionOffsets.sortBy(_.partition).map(partitionOffset.write).toVector) - )).flatten.toMap) - - override def read(json: JsValue): PartitionOffsetsWithTotalLag = throw IntentionallyUnimplemented - } implicit val consumerTopicsFormat: RootJsonFormat[ConsumerTopics] = jsonFormat2(ConsumerTopics) implicit val topicConsumersFormat: RootJsonFormat[TopicConsumers] = jsonFormat2(TopicConsumers) From 8212ae245a4544ba59cc8b9e68410b990ba57bda Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 6 Jun 2024 13:34:41 +0530 Subject: [PATCH 16/18] removed earlier change --- .../hydra/kafka/marshallers/ConsumerGroupMarshallers.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala index 99e074791..9527d4fcf 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/marshallers/ConsumerGroupMarshallers.scala @@ -5,7 +5,7 @@ import spray.json.{RootJsonFormat, _} import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import hydra.core.transport.AckStrategy import hydra.kafka.algebras.{ConsumerGroupsAlgebra, KafkaAdminAlgebra} -import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, PartitionOffsetsWithTotalLag, TopicConsumers} +import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers} import hydra.kafka.algebras.KafkaAdminAlgebra.{LagOffsets, Offset, TopicAndPartition} import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented @@ -56,7 +56,6 @@ trait ConsumerGroupMarshallers extends DefaultJsonProtocol with SprayJsonSupport override def read(json: JsValue): ConsumerGroupsAlgebra.Consumer = throw IntentionallyUnimplemented } - implicit val consumerTopicsFormat: RootJsonFormat[ConsumerTopics] = jsonFormat2(ConsumerTopics) implicit val topicConsumersFormat: RootJsonFormat[TopicConsumers] = jsonFormat2(TopicConsumers) From 74f962d5dcdc1d5d4a68e5dff892490e735d0204 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Mon, 10 Jun 2024 16:27:06 +0530 Subject: [PATCH 17/18] Added a comment and Added active partition count in the logger --- .../hydra/kafka/algebras/ConsumerGroupsAlgebra.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 520ac34c3..b9f5aeeaf 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -156,7 +156,7 @@ object ConsumerGroupsAlgebra { def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = offsetMap.get(partition) match { - case Some(value) => value + 1.toLong + case Some(value) => value + 1.toLong //Adding offsets by 1 as Kafka Admin algebra's getLatestOffsets method is having the same behaviour. case _ => 0 } @@ -211,10 +211,11 @@ object ConsumerGroupsAlgebra { } _ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap( lagInfo => Logger[F].info( - s"""Total Offset Lag on ${dvsConsumersTopic} = ${lagInfo.totalLag.toString} , - Lag percentage = ${lagInfo.lagPercentage.toString} , - Total_Group_Offset = ${lagInfo.totalGroupOffset} , - Total_Largest_Offset = ${lagInfo.totalLargestOffset}""" + s"Lag details on ${dvsConsumersTopic}. Total_Offset_Lag = ${lagInfo.totalLag.toString}, " + + s"Lag_percentage = ${lagInfo.lagPercentage.toString}, " + + s"Total_Group_Offset = ${lagInfo.totalGroupOffset}, " + + s"Total_Largest_Offset = ${lagInfo.totalLargestOffset}, " + + s"Total_active_partitions = ${Option(lagInfo.partitionOffsets).map(_.size).getOrElse(0)}" ))).compile.drain } yield () } From 35eb28d0a61d3b280f2b7609a2ab75c22c2bfa53 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Tue, 11 Jun 2024 11:57:16 +0530 Subject: [PATCH 18/18] limitting decimal places in the lag logger --- .../main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index b9f5aeeaf..146de1e26 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -212,7 +212,7 @@ object ConsumerGroupsAlgebra { _ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap( lagInfo => Logger[F].info( s"Lag details on ${dvsConsumersTopic}. Total_Offset_Lag = ${lagInfo.totalLag.toString}, " + - s"Lag_percentage = ${lagInfo.lagPercentage.toString}, " + + f"Lag_percentage = ${lagInfo.lagPercentage}%2.4f, " + s"Total_Group_Offset = ${lagInfo.totalGroupOffset}, " + s"Total_Largest_Offset = ${lagInfo.totalLargestOffset}, " + s"Total_active_partitions = ${Option(lagInfo.partitionOffsets).map(_.size).getOrElse(0)}"