From 72bd67e012b6bacd0290a6f6f51f2c3321810790 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 23 Jan 2026 11:56:57 +0800 Subject: [PATCH 1/3] [fix][proxy] Close client connection immediately when credentials expire and forwardAuthorizationCredentials is disabled --- .../pulsar/proxy/server/ProxyConnection.java | 55 ++++-- .../proxy/server/ProxyAuthenticationTest.java | 174 ++++++++++++++++-- 2 files changed, 197 insertions(+), 32 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index e479b8ee62268..cfe788e522967 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -149,7 +149,12 @@ enum State { Closing, - Closed, + Closed; + + boolean isAuthenticatedState() { + return this == ProxyLookupRequests + || this == ProxyConnectionToBroker; + } } ConnectionPool getConnectionPool() { @@ -412,15 +417,7 @@ private synchronized void completeConnect() throws PulsarClientException { state = State.ProxyLookupRequests; lookupProxyHandler = service.newLookupProxyHandler(this); - if (service.getConfiguration().isAuthenticationEnabled() - && service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) { - authRefreshTask = ctx.executor().scheduleAtFixedRate( - Runnables.catchingAndLoggingThrowables( - this::refreshAuthenticationCredentialsAndCloseIfTooExpired), - service.getConfiguration().getAuthenticationRefreshCheckSeconds(), - service.getConfiguration().getAuthenticationRefreshCheckSeconds(), - TimeUnit.SECONDS); - } + startAuthRefreshTaskIfNotStarted(); final ByteBuf msg = Commands.newConnected(protocolVersionToAdvertise, false); writeAndFlush(msg); } @@ -436,6 +433,10 @@ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, Comman final ByteBuf msg = Commands.newConnected(connected.getProtocolVersion(), maxMessageSize, connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers()); writeAndFlush(msg); + // Start auth refresh task only if we are not forwarding authorization credentials + if (!service.getConfiguration().isForwardAuthorizationCredentials()) { + startAuthRefreshTaskIfNotStarted(); + } } else { LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. " + "Closing connection to broker '{}'.", @@ -517,16 +518,44 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) { } } + private void startAuthRefreshTaskIfNotStarted() { + if (service.getConfiguration().isAuthenticationEnabled() + && service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0 + && authRefreshTask == null) { + authRefreshTask = ctx.executor().scheduleAtFixedRate( + Runnables.catchingAndLoggingThrowables( + this::refreshAuthenticationCredentialsAndCloseIfTooExpired), + service.getConfiguration().getAuthenticationRefreshCheckSeconds(), + service.getConfiguration().getAuthenticationRefreshCheckSeconds(), + TimeUnit.SECONDS); + } + } + private void refreshAuthenticationCredentialsAndCloseIfTooExpired() { assert ctx.executor().inEventLoop(); - if (state != State.ProxyLookupRequests) { - // Happens when an exception is thrown that causes this connection to close. + + // Only check expiration in authenticated states + if (!state.isAuthenticatedState()) { return; - } else if (!authState.isExpired()) { + } + + if (!authState.isExpired()) { // Credentials are still valid. Nothing to do at this point return; } + // If we are not forwarding authorization credentials to the broker, the broker cannot + // refresh the client's credentials. In this case, we must close the connection immediately + // when credentials expire. + if (!service.getConfiguration().isForwardAuthorizationCredentials()) { + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Closing connection because client credentials have expired and " + + "forwardAuthorizationCredentials is disabled (broker cannot refresh)", remoteAddress); + } + ctx.close(); + return; + } + if (System.nanoTime() - authChallengeSentTime > TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds())) { LOG.warn("[{}] Closing connection after timeout on refreshing auth credentials", remoteAddress); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 6887e9ea234c1..f02ee637b3711 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -18,11 +18,15 @@ */ package org.apache.pulsar.proxy.server; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.spy; import com.google.common.collect.Sets; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import java.io.IOException; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -30,12 +34,16 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import javax.naming.AuthenticationException; +import javax.net.ssl.SSLSession; import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -44,8 +52,11 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -80,6 +91,7 @@ public boolean hasDataForHttp() { public Set> getHttpHeaders() { Map headers = new HashMap<>(); headers.put("BasicAuthentication", authParam); + headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication"); return headers.entrySet(); } } @@ -119,6 +131,71 @@ public void start() throws PulsarClientException { } } + public static class BasicAuthenticationState implements AuthenticationState { + private final long expiryTimeInMillis; + private final String authRole; + private final AuthenticationDataSource authenticationDataSource; + + private static boolean isExpired(long expiryTimeInMillis) { + return System.currentTimeMillis() > expiryTimeInMillis; + } + + private static String[] parseAuthData(String commandData) { + JsonObject element = JsonParser.parseString(commandData).getAsJsonObject(); + long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString()); + if (isExpired(expiryTimeInMillis)) { + throw new IllegalArgumentException("Credentials have expired"); + } + String role = element.get("entityType").getAsString(); + return new String[]{role, String.valueOf(expiryTimeInMillis)}; + } + + public BasicAuthenticationState(AuthenticationDataSource authData) { + this(authData.hasDataFromCommand()? authData.getCommandData(): authData.getHttpHeader("BasicAuthentication")); + } + + public BasicAuthenticationState(AuthData authData) { + this(new String(authData.getBytes(), StandardCharsets.UTF_8)); + } + + private BasicAuthenticationState(String commandData) { + String[] parsed = parseAuthData(commandData); + this.authRole = parsed[0]; + this.expiryTimeInMillis = Long.parseLong(parsed[1]); + this.authenticationDataSource = new AuthenticationDataCommand(commandData, null, null); + } + + @Override + public String getAuthRole() { + return authRole; + } + + @Override + public AuthData authenticate(AuthData authData) throws AuthenticationException { + return null; // Authentication complete + } + + @Override + public CompletableFuture authenticateAsync(AuthData authData) { + return CompletableFuture.completedFuture(null); // Authentication complete + } + + @Override + public AuthenticationDataSource getAuthDataSource() { + return authenticationDataSource; + } + + @Override + public boolean isComplete() { + return authRole != null; + } + + @Override + public boolean isExpired() { + return isExpired(expiryTimeInMillis); + } + } + public static class BasicAuthenticationProvider implements AuthenticationProvider { @Override @@ -135,26 +212,14 @@ public String getAuthMethodName() { } @Override - public CompletableFuture authenticateAsync(AuthenticationDataSource authData) { - String commandData = null; - if (authData.hasDataFromCommand()) { - commandData = authData.getCommandData(); - } else if (authData.hasDataFromHttp()) { - commandData = authData.getHttpHeader("BasicAuthentication"); - } + public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession) { + return new BasicAuthenticationState(authData); + } - JsonObject element = JsonParser.parseString(commandData).getAsJsonObject(); - log.info("Have log of {}", element); - long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString()); - long currentTimeInMillis = System.currentTimeMillis(); - if (expiryTimeInMillis < currentTimeInMillis) { - log.warn("Auth failed due to timeout"); - return CompletableFuture - .failedFuture(new AuthenticationException("Authentication data has been expired")); - } - final String result = element.get("entityType").getAsString(); - // Run in another thread to attempt to test the async logic - return CompletableFuture.supplyAsync(() -> result); + @Override + public CompletableFuture authenticateAsync(AuthenticationDataSource authData) { + BasicAuthenticationState basicAuthenticationState = new BasicAuthenticationState(authData); + return CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole); } } @@ -271,4 +336,75 @@ private PulsarClient createPulsarClient(String proxyServiceUrl, String authParam .authentication(BasicAuthentication.class.getName(), authParams) .connectionsPerBroker(numberOfConnections).build(); } + + @Test + void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws Exception { + log.info("-- Starting {} test --", methodName); + + String namespaceName = "my-property/my-ns"; + String topicName = "persistent://my-property/my-ns/my-topic1"; + + admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", + Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", + Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + + // Important: When forwardAuthorizationCredentials=false, broker should not authenticate original auth data + // because the proxy doesn't forward it. Set authenticateOriginalAuthData=false to match this behavior. + conf.setAuthenticateOriginalAuthData(false); + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setAuthenticationEnabled(true); + proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2 seconds + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setClusterName(CLUSTER_NAME); + + // Proxy auth with long expiry + String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000); + proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); + proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); + + Set providers = new HashSet<>(); + providers.add(BasicAuthenticationProvider.class.getName()); + proxyConfig.setAuthenticationProviders(providers); + proxyConfig.setForwardAuthorizationCredentials(false); + + @Cleanup + AuthenticationService authenticationService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = + AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + @Cleanup + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); + proxyService.start(); + final String proxyServiceUrl = proxyService.getServiceUrl(); + + // Create client with credentials that will expire in 3 seconds + long clientExpireTime = System.currentTimeMillis() + 3 * 1000; + String clientAuthParams = "entityType:client,expiryTime:" + clientExpireTime; + + @Cleanup + PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); + + @Cleanup + var producer = + proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5, TimeUnit.SECONDS).create(); + producer.send("test message".getBytes()); + + Awaitility.await().untilAsserted(() -> { + assertThatThrownBy(() -> producer.send("test message after expiry".getBytes())) + .isExactlyInstanceOf(PulsarClientException.TimeoutException.class); + }); + + if (producer instanceof ProducerImpl producerImpl) { + long lastDisconnectedTimestamp = producerImpl.getLastDisconnectedTimestamp(); + assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime); + } + } } From b24899c4a62cc1c8ee16bba8f594dc300121c4d0 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 23 Jan 2026 15:37:18 +0800 Subject: [PATCH 2/3] Fix code style --- .../apache/pulsar/proxy/server/ProxyAuthenticationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index f02ee637b3711..04529629de7f1 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -151,7 +151,8 @@ private static String[] parseAuthData(String commandData) { } public BasicAuthenticationState(AuthenticationDataSource authData) { - this(authData.hasDataFromCommand()? authData.getCommandData(): authData.getHttpHeader("BasicAuthentication")); + this(authData.hasDataFromCommand() ? authData.getCommandData() + : authData.getHttpHeader("BasicAuthentication")); } public BasicAuthenticationState(AuthData authData) { From f0e6ad1cbf95a1ff628539203f6d4c3f16dcfc7d Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 23 Jan 2026 18:16:40 +0800 Subject: [PATCH 3/3] Fix ProxyRefreshAuthTest to correctly assert connection state --- .../proxy/server/ProxyRefreshAuthTest.java | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java index f3ff1362fd895..7501eb9306f98 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java @@ -179,37 +179,25 @@ public void testAuthDataRefresh(boolean forwardAuthData) throws Exception { PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; pulsarClient.getPartitionsForTopic(topic).get(); - Set> connections = pulsarClientImpl.getCnxPool().getConnections(); - Awaitility.await().during(5, SECONDS).untilAsserted(() -> { - pulsarClient.getPartitionsForTopic(topic).get(); - assertTrue(connections.stream().allMatch(n -> { - try { - ClientCnx clientCnx = n.get(); - long timestamp = clientCnx.getLastDisconnectedTimestamp(); - return timestamp == 0; - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - }); + // Verify initial connection state + Set> connections = pulsarClientImpl.getCnxPool().getConnections(); - // Force all connections from proxy to broker to close and therefore require the proxy to re-authenticate with - // the broker. (The client doesn't lose this connection.) - restartBroker(); - - // Rerun assertion to ensure that it still works - Awaitility.await().during(5, SECONDS).untilAsserted(() -> { - pulsarClient.getPartitionsForTopic(topic).get(); - assertTrue(connections.stream().allMatch(n -> { - try { - ClientCnx clientCnx = n.get(); - long timestamp = clientCnx.getLastDisconnectedTimestamp(); - return timestamp == 0; - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - }); + Awaitility.await() + .during(5, SECONDS) + .untilAsserted(() -> { + for (CompletableFuture cf : connections) { + try { + ClientCnx clientCnx = cf.get(); + long timestamp = clientCnx.getLastDisconnectedTimestamp(); + // If forwardAuthData is false, the broker cannot see the client's authentication data. + // As a result, the broker cannot perform any refresh operations on the client's auth data. + // Only the proxy has visibility of the client's connection state. + assertTrue(forwardAuthData ? timestamp == 0 : timestamp > 0); + } catch (Exception e) { + throw new AssertionError("Failed to get connection state", e); + } + } + }); } }