Skip to content
This repository was archived by the owner on Nov 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d179477
Added an offset stream which will be having all committed offset info…
abhivermaaa Apr 21, 2024
9e43eed
returning empty fs2.stream instead of no implementation
abhivermaaa Apr 21, 2024
7f16c3b
Added chnages for creating a new endpoint for returing hydra internal…
abhivermaaa May 8, 2024
4d6d7d3
Merge pull request #22 from ps-dev/adapt1-1526
abhivermaaa May 8, 2024
d9a0390
Added changes for ingestors/kafka/src/main/scala/hydra/kafka/endpoint…
abhivermaaa May 8, 2024
5052eda
Added changes to ingestors/kafka/src/main/scala/hydra/kafka/algebras/…
abhivermaaa May 8, 2024
ef340e1
Merge pull request #24 from ps-dev/adapt1-1533
abhivermaaa May 8, 2024
d67bd93
Refactored the getOffsetsForInternalCGTopic which is getting offset d…
abhivermaaa May 10, 2024
dba86d2
Created a new response having consolidated lags and lag percentage | …
abhivermaaa May 13, 2024
ad8a210
Added offset change code for starting with 0 | Add code to marshaller…
abhivermaaa May 14, 2024
07bd289
Added code for sending the slack notifictaion having all offset info …
abhivermaaa May 21, 2024
4319877
Added test cases for checking the lag
abhivermaaa May 21, 2024
593acab
Added impl to test TestConsumerGroupsAlgebra
abhivermaaa May 21, 2024
02efd85
Adding changes
abhivermaaa May 29, 2024
bc7c6f7
Refactored code to have a single stream and adjusted updation of cach…
abhivermaaa Jun 5, 2024
cd03d73
Added code in app config in order to pick lagPublishInterval and set …
abhivermaaa Jun 6, 2024
e183771
Removed code from ingestors/kafka/src/main/scala/hydra/kafka/marshall…
abhivermaaa Jun 6, 2024
8212ae2
removed earlier change
abhivermaaa Jun 6, 2024
74f962d
Added a comment and Added active partition count in the logger
abhivermaaa Jun 10, 2024
35eb28d
limitting decimal places in the lag logger
abhivermaaa Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ingest/src/main/scala/hydra.ingest/app/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,16 @@ object AppConfig {
final case class ConsumerGroupsAlgebraConfig(
kafkaInternalConsumerGroupsTopic: String,
commonConsumerGroup: ConsumerGroup,
consumerGroupsConsumerEnabled: Boolean
consumerGroupsConsumerEnabled: Boolean,
lagPublishInterval: FiniteDuration
)

private val consumerGroupAlgebraConfig: ConfigValue[ConsumerGroupsAlgebraConfig] =
(
env("KAFKA_CONSUMER_GROUPS_INTERNAL_TOPIC_NAME").as[String].default("__consumer_offsets"),
env("HYDRA_CONSUMER_GROUPS_COMMON_CONSUMER_GROUP").as[ConsumerGroup].default("kafkaInternalConsumerGroupsTopic-ConsumerGroupName"),
env("CONSUMER_GROUPS_CONSUMER_ENABLED").as[Boolean].default(true)
env("HYDRA_CONSUMER_GROUPS_COMMON_CONSUMER_GROUP").as[ConsumerGroup].default("kafkaInternalConsumerGroupsTopic-ConsumerGroupName"),
env("CONSUMER_GROUPS_CONSUMER_ENABLED").as[Boolean].default(true),
env("CONSUMER_GROUPS_INTERNAL_TOPIC_LAG_PUBLISH_INTERVAL").as[FiniteDuration].default(1.minutes)
).parMapN(ConsumerGroupsAlgebraConfig)

final case class IngestConfig(
Expand Down
3 changes: 2 additions & 1 deletion ingest/src/main/scala/hydra.ingest/modules/Algebras.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ object Algebras {
kafkaClientAlgebra = kafkaClient,
kAA = kafkaAdmin,
sra = schemaRegistry,
config.kafkaClientSecurityConfig
config.kafkaClientSecurityConfig,
config.consumerGroupsAlgebraConfig.lagPublishInterval
)
awsIamClient <- AwsIamClient.make
awsStsClient <- AwsStsClient.make
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import scalacache.Cache
import scalacache.guava.GuavaCache

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt



Expand Down Expand Up @@ -147,7 +148,7 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala
implicit val notificationSenderMock: InternalNotificationSender[IO] = getInternalNotificationSenderMock[IO]
ConsumerGroupsAlgebra.make("",Subject.createValidated("dvs.blah.blah").get,
Subject.createValidated("dvs.heyo.blah").get,"","","",
kafkaClientAlgebra,kafkaAdminAlgebra,schemaAlgebra, kafkaSecurityEmptyConfig)
kafkaClientAlgebra,kafkaAdminAlgebra,schemaAlgebra, kafkaSecurityEmptyConfig, 1.minutes)
}

"The deletionEndpoint path" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import hydra.common.NotificationsTestSuite
import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender}
import hydra.common.config.KafkaConfigUtils.{KafkaClientSecurityConfig, SchemaRegistrySecurityConfig, kafkaSecurityEmptyConfig}
import hydra.common.alerting.sender.InternalNotificationSender
import hydra.kafka.algebras.ConsumerGroupsAlgebra.PartitionOffsetMap
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{PartitionOffsetMap, PartitionOffsetsWithTotalLag}
import hydra.kafka.algebras.KafkaClientAlgebra.{OffsetInfo, Record}
import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue}
import hydra.kafka.model.TopicConsumerOffset.{TopicConsumerOffsetKey, TopicConsumerOffsetValue}
Expand Down Expand Up @@ -72,7 +72,7 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl
kafkaClient <- KafkaClientAlgebra.live[IO](container.bootstrapServers, "https://schema-registry", schemaRegistry , kafkaSecurityEmptyConfig)

consumerGroupAlgebra <- ConsumerGroupsAlgebra.make(internalKafkaConsumerTopic, dvsConsumerTopic, dvsInternalKafkaOffsetsTopic,
container.bootstrapServers, consumerGroup, consumerGroup, kafkaClient, kafkaAdmin, schemaRegistry, kafkaSecurityEmptyConfig)
container.bootstrapServers, consumerGroup, consumerGroup, kafkaClient, kafkaAdmin, schemaRegistry, kafkaSecurityEmptyConfig, 1.minutes)
_ <- consumerGroupAlgebra.startConsumer
} yield {
runTests(consumerGroupAlgebra, schemaRegistry, kafkaClient, kafkaAdmin)
Expand Down Expand Up @@ -185,6 +185,31 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl
case Right(_) => succeed
}.unsafeRunSync()
}

"test getLagOnInternalConsumerTopic to verify no lag with commit offsets as false" in {
val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123")

kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync()

kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false)
.take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some

cga.getLagOnDvsInternalCGTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_))
}

"test getLagOnInternalConsumerTopic to verify some lag with commit offsets as false" in {
val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123")
val (key2, value2) = getGenericRecords(dvsConsumerTopic.value, "abcd", "1234")

kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync()
kafkaClient.publishMessage((key2, Some(value2), None), dvsConsumerTopic.value).unsafeRunSync()

kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false)
.take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some

cga.getLagOnDvsInternalCGTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _))
}

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
package hydra.kafka.algebras

import cats.ApplicativeError

import java.time.Instant
import cats.effect.concurrent.Ref
import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, IO, Timer}
import cats.implicits._
import fs2.kafka._
import hydra.avro.registry.SchemaRegistry
import hydra.common.alerting.AlertProtocol.NotificationMessage
import hydra.common.alerting.NotificationLevel
import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender}
import hydra.common.alerting.sender.InternalNotificationSender
import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers}
import hydra.kafka.algebras.ConsumerGroupsAlgebra._
import hydra.kafka.algebras.HydraTag.StringJsonFormat
import hydra.kafka.algebras.KafkaClientAlgebra.Record
import hydra.kafka.algebras.KafkaClientAlgebra.{Offset, Partition, Record}
import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Infinite
import hydra.kafka.algebras.RetryableFs2Stream._
import hydra.kafka.model.TopicConsumer
import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue}
import hydra.kafka.model.TopicMetadataV2Request.Subject
import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented
import hydra.kafka.util.ConsumerGroupsOffsetConsumer
import org.apache.avro.generic.GenericRecord
import org.typelevel.log4cats.Logger

import java.time.Instant
import scala.concurrent.duration.{DurationInt, FiniteDuration}

trait ConsumerGroupsAlgebra[F[_]] {
def getConsumersForTopic(topicName: String): F[TopicConsumers]

Expand All @@ -44,6 +38,8 @@ trait ConsumerGroupsAlgebra[F[_]] {
def consumerGroupIsActive(str: String): F[(Boolean, String)]

def getUniquePerNodeConsumerGroup: String

def getLagOnDvsInternalCGTopic: F[PartitionOffsetsWithTotalLag]
}

final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKey, (TopicConsumerValue, String)]) extends ConsumerGroupsAlgebra[IO] {
Expand Down Expand Up @@ -96,6 +92,11 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe

override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup"

override def getLagOnDvsInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = {
IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50,
List(PartitionOffset(1, 10, 20, 10), PartitionOffset(2, 10, 20, 10), PartitionOffset(3, 10, 20, 10))
))
}
}

object TestConsumerGroupsAlgebra {
Expand All @@ -106,6 +107,9 @@ object ConsumerGroupsAlgebra {

type PartitionOffsetMap = Map[Int, Long]

final case class PartitionOffsetsWithTotalLag(totalLargestOffset: Long, totalGroupOffset: Long, totalLag: Long,
lagPercentage: Double, partitionOffsets: List[PartitionOffset])

final case class PartitionOffset(partition: Int, groupOffset: Long, largestOffset: Long, partitionLag: Long)

final case class TopicConsumers(topicName: String, consumers: List[Consumer])
Expand All @@ -129,22 +133,56 @@ object ConsumerGroupsAlgebra {
kafkaClientAlgebra: KafkaClientAlgebra[F],
kAA: KafkaAdminAlgebra[F],
sra: SchemaRegistry[F],
kafkaClientSecurityConfig: KafkaClientSecurityConfig
kafkaClientSecurityConfig: KafkaClientSecurityConfig,
lagPublishInterval: FiniteDuration
) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = {

val dvsConsumersStream: fs2.Stream[F, Record] = {
kafkaClientAlgebra.consumeSafelyMessages(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false)

val dvsConsumersStream: fs2.Stream[F, (Record, (Partition, Offset))] = {
kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false)
//Ignore records with errors
.collect { case Right(value) => value }
}

for {
consumerGroupsStorageFacade <- Ref[F].of(ConsumerGroupsStorageFacade.empty)
consumerGroupsOffsetFacade <- Ref[F].of(ConsumerGroupsOffsetFacade.empty)
} yield new ConsumerGroupsAlgebra[F] {

override def getConsumersForTopic(topicName: String): F[TopicConsumers] =
consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName)))

override def getLagOnDvsInternalCGTopic: F[PartitionOffsetsWithTotalLag] = {

def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long =
offsetMap.get(partition) match {
case Some(value) => value + 1.toLong //Adding offsets by 1 as Kafka Admin algebra's getLatestOffsets method is having the same behaviour.
case _ => 0
}

for {
groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getOffsets())

partitionOffsetMapWithLag <- kAA.getLatestOffsets(dvsConsumersTopic.value)
.map(_.toList
.filter(_._2.value > 0.toLong)
.map(latestOffset => PartitionOffset(
latestOffset._1.partition,
getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap),
latestOffset._2.value,
latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap)
)).toList)

(totalLargestOffset, totalGroupOffset) =
(partitionOffsetMapWithLag.map(_.largestOffset).sum, partitionOffsetMapWithLag.map(_.groupOffset).sum)

totalLag = totalLargestOffset - totalGroupOffset

lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100

} yield PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag)
}

private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = {
val detailedF: F[List[Consumer]] = topicConsumers.consumers.traverse { consumer =>
val fState = getConsumerActiveState(consumer.consumerGroupName)
Expand All @@ -166,11 +204,19 @@ object ConsumerGroupsAlgebra {

override def startConsumer: F[Unit] = {
for {
_ <- Concurrent[F].start(consumeDVSConsumersTopicIntoCache(dvsConsumersStream, consumerGroupsStorageFacade))
_ <- Concurrent[F].start(consumeDVSConsumersTopicIntoCache(dvsConsumersStream, consumerGroupsStorageFacade, consumerGroupsOffsetFacade))
_ <- Concurrent[F].start {
ConsumerGroupsOffsetConsumer.start(kafkaClientAlgebra, kAA, sra, uniquePerNodeConsumerGroup, consumerOffsetsOffsetsTopicConfig,
kafkaInternalTopic, dvsConsumersTopic, bootstrapServers, commonConsumerGroup, kafkaClientSecurityConfig)
}
_ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap(
lagInfo => Logger[F].info(
s"Lag details on ${dvsConsumersTopic}. Total_Offset_Lag = ${lagInfo.totalLag.toString}, " +
f"Lag_percentage = ${lagInfo.lagPercentage}%2.4f, " +
s"Total_Group_Offset = ${lagInfo.totalGroupOffset}, " +
s"Total_Largest_Offset = ${lagInfo.totalLargestOffset}, " +
s"Total_active_partitions = ${Option(lagInfo.partitionOffsets).map(_.size).getOrElse(0)}"
))).compile.drain
} yield ()
}

Expand Down Expand Up @@ -206,17 +252,19 @@ object ConsumerGroupsAlgebra {
}

private def consumeDVSConsumersTopicIntoCache[F[_] : ContextShift : ConcurrentEffect : Timer : Logger](
dvsConsumersStream: fs2.Stream[F, Record],
consumerGroupsStorageFacade: Ref[F, ConsumerGroupsStorageFacade]
dvsConsumersStream: fs2.Stream[F, (Record, (Partition, Offset))],
consumerGroupsStorageFacade: Ref[F, ConsumerGroupsStorageFacade],
consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade],
)(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = {
dvsConsumersStream.evalTap { case (key, value, _) =>
TopicConsumer.decode[F](key, value).flatMap {
dvsConsumersStream.evalTap {
case ((key, value, _),(partition, offset)) =>
TopicConsumer.decode[F](key, value).flatMap {
case (topicKey, topicValue) =>
topicValue match {
case Some(tV) =>
consumerGroupsStorageFacade.update(_.addConsumerGroup(topicKey, tV))
consumerGroupsStorageFacade.update(_.addConsumerGroup(topicKey, tV)) *> consumerGroupsOffsetFacade.update(_.addOffset(partition, offset))
case None =>
consumerGroupsStorageFacade.update(_.removeConsumerGroup(topicKey))
consumerGroupsStorageFacade.update(_.removeConsumerGroup(topicKey)) *> consumerGroupsOffsetFacade.update(_.addOffset(partition, offset))
}
}.recoverWith {
case e => Logger[F].error(e)("Error in ConsumergroupsAlgebra consumer")
Expand Down Expand Up @@ -258,3 +306,20 @@ private object ConsumerGroupsStorageFacade {
def empty: ConsumerGroupsStorageFacade = ConsumerGroupsStorageFacade(Map.empty)
}

private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) {

def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade =
this.copy(this.offsetMap + (key -> value))

def getOffsets(): Map[Partition, Offset] =
this.offsetMap

def removeOffset(key: Partition): ConsumerGroupsOffsetFacade =
this.copy(this.offsetMap - key)

}

private object ConsumerGroupsOffsetFacade {
def empty: ConsumerGroupsOffsetFacade = ConsumerGroupsOffsetFacade(Map.empty)
}

Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ object KafkaClientAlgebra {
: fs2.Stream[F, Either[Throwable, (StringRecord, (Partition, Offset))]] = ???

override def consumeSafelyWithOffsetInfo(topicName: TopicName, consumerGroup: ConsumerGroup, commitOffsets: Boolean)
: fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = ???
: fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = fs2.Stream.empty

override def withProducerRecordSizeLimit(sizeLimitBytes: Long): F[KafkaClientAlgebra[F]] = Sync[F].delay {
getTestInstance(cache, schemaRegistryUrl, schemaRegistry, sizeLimitBytes.some)
Expand Down