diff --git a/CHANGELOG.md b/CHANGELOG.md index bb61c87b..527a82cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 8.1.15 - 2025-11-28 +### Changed +- When sending the events we hash based on db and table name. By default we are taking the newest table name. + We realised this could potentially break the order of operations because it will send the event to a different kafka partition. + ## 8.1.14 - 2025-11-25 ### Changed - Remove metric relocations for apiary-gluesync-listener because Spring doesn't find those in bean initialisation. diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListener.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListener.java index 03339485..2f08b3e7 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListener.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListener.java @@ -43,6 +43,7 @@ import com.google.common.annotations.VisibleForTesting; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryAlterTableEvent; import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEvent; import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEventFactory; import com.expediagroup.apiary.extensions.events.metastore.io.MetaStoreEventSerDe; @@ -50,6 +51,7 @@ import com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaMessageSender; public class KafkaMetaStoreEventListener extends MetaStoreEventListener { + private static final Logger log = LoggerFactory.getLogger(KafkaMetaStoreEventListener.class); private final MetaStoreEventSerDe eventSerDe; @@ -58,7 +60,7 @@ public class KafkaMetaStoreEventListener extends MetaStoreEventListener { public KafkaMetaStoreEventListener(Configuration config) { this(config, new ApiaryListenerEventFactory(), serDeForClassName(stringProperty(config, SERDE_CLASS)), - new KafkaMessageSender(config)); + new KafkaMessageSender(config)); } @VisibleForTesting @@ -74,12 +76,19 @@ public KafkaMetaStoreEventListener(Configuration config) { } private KafkaMessage withPayload(ApiaryListenerEvent event) { + String database = event.getDatabaseName(); + String table = event.getTableName(); + // Ensuring ALTER_TABLE events will use old table name for partition hashing + if (event instanceof ApiaryAlterTableEvent) { + database = ((ApiaryAlterTableEvent) event).getOldTable().getDbName(); + table = ((ApiaryAlterTableEvent) event).getOldTable().getTableName(); + } return KafkaMessage - .builder() - .database(event.getDatabaseName()) - .table(event.getTableName()) - .payload(eventSerDe.marshal(event)) - .build(); + .builder() + .database(database) + .table(table) + .payload(eventSerDe.marshal(event)) + .build(); } @Override @@ -178,5 +187,4 @@ public void onCreateFunction(CreateFunctionEvent fnEvent) {} @Override public void onDropFunction(DropFunctionEvent fnEvent) {} - } diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListener.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListener.java index aaf50311..64cd66b9 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListener.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListener.java @@ -22,12 +22,28 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; -import org.apache.hadoop.hive.metastore.events.*; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryAlterTableEvent; import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEvent; import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEventFactory; import com.expediagroup.apiary.extensions.events.metastore.io.MetaStoreEventSerDe; @@ -35,6 +51,7 @@ import com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.MskMessageSender; public class MskMetaStoreEventListener extends MetaStoreEventListener { + private static final Logger log = LoggerFactory.getLogger(MskMetaStoreEventListener.class); private final MetaStoreEventSerDe eventSerDe; @@ -43,7 +60,7 @@ public class MskMetaStoreEventListener extends MetaStoreEventListener { public MskMetaStoreEventListener(Configuration config) { this(config, new ApiaryListenerEventFactory(), serDeForClassName(stringProperty(config, SERDE_CLASS)), - new MskMessageSender(config)); + new MskMessageSender(config)); } @VisibleForTesting @@ -59,12 +76,19 @@ public MskMetaStoreEventListener(Configuration config) { } private KafkaMessage withPayload(ApiaryListenerEvent event) { + String database = event.getDatabaseName(); + String table = event.getTableName(); + // Ensuring ALTER_TABLE events will use old table name for partition hashing + if (event instanceof ApiaryAlterTableEvent) { + database = ((ApiaryAlterTableEvent) event).getOldTable().getDbName(); + table = ((ApiaryAlterTableEvent) event).getOldTable().getTableName(); + } return KafkaMessage - .builder() - .database(event.getDatabaseName()) - .table(event.getTableName()) - .payload(eventSerDe.marshal(event)) - .build(); + .builder() + .database(database) + .table(table) + .payload(eventSerDe.marshal(event)) + .build(); } @Override @@ -163,5 +187,4 @@ public void onCreateFunction(CreateFunctionEvent fnEvent) {} @Override public void onDropFunction(DropFunctionEvent fnEvent) {} - } diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListenerTest.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListenerTest.java index a38b530f..46da2882 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListenerTest.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/KafkaMetaStoreEventListenerTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -92,8 +93,12 @@ public void onCreateTable() { public void onAlterTable() { AlterTableEvent event = mock(AlterTableEvent.class); ApiaryAlterTableEvent apiaryEvent = mock(ApiaryAlterTableEvent.class); + Table oldTable = mock(Table.class); + when(apiaryEvent.getOldTable()).thenReturn(oldTable); when(apiaryEvent.getDatabaseName()).thenReturn(DATABASE); when(apiaryEvent.getTableName()).thenReturn(TABLE); + when(oldTable.getDbName()).thenReturn(DATABASE); + when(oldTable.getTableName()).thenReturn(TABLE); when(apiaryListenerEventFactory.create(event)).thenReturn(apiaryEvent); listener.onAlterTable(event); verify(kafkaMessageSender).send(any(KafkaMessage.class)); @@ -216,5 +221,4 @@ public void onDropFunction() { verify(kafkaMessageSender, never()).send(any(KafkaMessage.class)); verify(eventSerDe, never()).marshal(any(ApiaryListenerEvent.class)); } - } diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListenerTest.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListenerTest.java index fe77b0a7..cd730a6e 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListenerTest.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/listener/MskMetaStoreEventListenerTest.java @@ -16,17 +16,44 @@ package com.expediagroup.apiary.extensions.events.metastore.kafka.listener; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.events.*; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import com.expediagroup.apiary.extensions.events.metastore.event.*; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryAddPartitionEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryAlterPartitionEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryAlterTableEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryCreateTableEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryDropPartitionEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryDropTableEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryInsertEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEvent; +import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEventFactory; import com.expediagroup.apiary.extensions.events.metastore.io.MetaStoreEventSerDe; import com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaMessage; import com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.MskMessageSender; @@ -66,8 +93,12 @@ public void onCreateTable() { public void onAlterTable() { AlterTableEvent event = mock(AlterTableEvent.class); ApiaryAlterTableEvent apiaryEvent = mock(ApiaryAlterTableEvent.class); + Table oldTable = mock(Table.class); + when(apiaryEvent.getOldTable()).thenReturn(oldTable); when(apiaryEvent.getDatabaseName()).thenReturn(DATABASE); when(apiaryEvent.getTableName()).thenReturn(TABLE); + when(oldTable.getDbName()).thenReturn(DATABASE); + when(oldTable.getTableName()).thenReturn(TABLE); when(apiaryListenerEventFactory.create(event)).thenReturn(apiaryEvent); listener.onAlterTable(event); verify(mskMessageSender).send(any(KafkaMessage.class)); @@ -190,5 +221,4 @@ public void onDropFunction() { verify(mskMessageSender, never()).send(any(KafkaMessage.class)); verify(eventSerDe, never()).marshal(any(ApiaryListenerEvent.class)); } - }