From 343616404f7be6b3fc6b0d2f61efeef0e0e23cfb Mon Sep 17 00:00:00 2001 From: Aaron He Date: Tue, 6 Jan 2026 11:39:22 -0800 Subject: [PATCH] make flink container metric cache concurrent --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 ++-- .../runners/flink/metrics/FlinkMetricContainerBase.java | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index bb22a81ffc20..5c936cf92bcc 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.48' + project.version = '2.45.49' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 5cdcc143165c..07949c59d7f1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.48 -sdk_version=2.45.48 +version=2.45.49 +sdk_version=2.45.49 javaVersion=1.8 diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java index 88c2d7a3f3ca..c0ff0edf6687 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.flink.metrics; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.model.pipeline.v1.MetricsApi; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; @@ -56,9 +56,9 @@ abstract class FlinkMetricContainerBase { private final Map flinkGaugeCache; public FlinkMetricContainerBase() { - this.flinkCounterCache = new HashMap<>(); - this.flinkDistributionGaugeCache = new HashMap<>(); - this.flinkGaugeCache = new HashMap<>(); + this.flinkCounterCache = new ConcurrentHashMap<>(); + this.flinkDistributionGaugeCache = new ConcurrentHashMap<>(); + this.flinkGaugeCache = new ConcurrentHashMap<>(); this.metricsContainers = new MetricsContainerStepMap(); }