Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ enum State {

Closing,

Closed,
Closed;

boolean isAuthenticatedState() {
return this == ProxyLookupRequests
|| this == ProxyConnectionToBroker;
}
}

ConnectionPool getConnectionPool() {
Expand Down Expand Up @@ -412,15 +417,7 @@ private synchronized void completeConnect() throws PulsarClientException {

state = State.ProxyLookupRequests;
lookupProxyHandler = service.newLookupProxyHandler(this);
if (service.getConfiguration().isAuthenticationEnabled()
&& service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
authRefreshTask = ctx.executor().scheduleAtFixedRate(
Runnables.catchingAndLoggingThrowables(
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
TimeUnit.SECONDS);
}
startAuthRefreshTaskIfNotStarted();
final ByteBuf msg = Commands.newConnected(protocolVersionToAdvertise, false);
writeAndFlush(msg);
}
Expand All @@ -436,6 +433,10 @@ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, Comman
final ByteBuf msg = Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers());
writeAndFlush(msg);
// Start auth refresh task only if we are not forwarding authorization credentials
if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
startAuthRefreshTaskIfNotStarted();
}
} else {
LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ "Closing connection to broker '{}'.",
Expand Down Expand Up @@ -517,16 +518,44 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) {
}
}

private void startAuthRefreshTaskIfNotStarted() {
if (service.getConfiguration().isAuthenticationEnabled()
&& service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0
&& authRefreshTask == null) {
authRefreshTask = ctx.executor().scheduleAtFixedRate(
Runnables.catchingAndLoggingThrowables(
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
TimeUnit.SECONDS);
}
}

private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
assert ctx.executor().inEventLoop();
if (state != State.ProxyLookupRequests) {
// Happens when an exception is thrown that causes this connection to close.

// Only check expiration in authenticated states
if (!state.isAuthenticatedState()) {
return;
} else if (!authState.isExpired()) {
}

if (!authState.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
}

// If we are not forwarding authorization credentials to the broker, the broker cannot
// refresh the client's credentials. In this case, we must close the connection immediately
// when credentials expire.
if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Closing connection because client credentials have expired and "
+ "forwardAuthorizationCredentials is disabled (broker cannot refresh)", remoteAddress);
}
ctx.close();
return;
}

if (System.nanoTime() - authChallengeSentTime
> TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds())) {
LOG.warn("[{}] Closing connection after timeout on refreshing auth credentials", remoteAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@
*/
package org.apache.pulsar.proxy.server;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
Expand All @@ -44,8 +52,11 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -80,6 +91,7 @@ public boolean hasDataForHttp() {
public Set<Entry<String, String>> getHttpHeaders() {
Map<String, String> headers = new HashMap<>();
headers.put("BasicAuthentication", authParam);
headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication");
return headers.entrySet();
}
}
Expand Down Expand Up @@ -119,6 +131,72 @@ public void start() throws PulsarClientException {
}
}

public static class BasicAuthenticationState implements AuthenticationState {
private final long expiryTimeInMillis;
private final String authRole;
private final AuthenticationDataSource authenticationDataSource;

private static boolean isExpired(long expiryTimeInMillis) {
return System.currentTimeMillis() > expiryTimeInMillis;
}

private static String[] parseAuthData(String commandData) {
JsonObject element = JsonParser.parseString(commandData).getAsJsonObject();
long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
if (isExpired(expiryTimeInMillis)) {
throw new IllegalArgumentException("Credentials have expired");
}
String role = element.get("entityType").getAsString();
return new String[]{role, String.valueOf(expiryTimeInMillis)};
}

public BasicAuthenticationState(AuthenticationDataSource authData) {
this(authData.hasDataFromCommand() ? authData.getCommandData()
: authData.getHttpHeader("BasicAuthentication"));
}

public BasicAuthenticationState(AuthData authData) {
this(new String(authData.getBytes(), StandardCharsets.UTF_8));
}

private BasicAuthenticationState(String commandData) {
String[] parsed = parseAuthData(commandData);
this.authRole = parsed[0];
this.expiryTimeInMillis = Long.parseLong(parsed[1]);
this.authenticationDataSource = new AuthenticationDataCommand(commandData, null, null);
}

@Override
public String getAuthRole() {
return authRole;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
return null; // Authentication complete
}

@Override
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
return CompletableFuture.completedFuture(null); // Authentication complete
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return authRole != null;
}

@Override
public boolean isExpired() {
return isExpired(expiryTimeInMillis);
}
}

public static class BasicAuthenticationProvider implements AuthenticationProvider {

@Override
Expand All @@ -135,26 +213,14 @@ public String getAuthMethodName() {
}

@Override
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
String commandData = null;
if (authData.hasDataFromCommand()) {
commandData = authData.getCommandData();
} else if (authData.hasDataFromHttp()) {
commandData = authData.getHttpHeader("BasicAuthentication");
}
public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession) {
return new BasicAuthenticationState(authData);
}

JsonObject element = JsonParser.parseString(commandData).getAsJsonObject();
log.info("Have log of {}", element);
long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
log.warn("Auth failed due to timeout");
return CompletableFuture
.failedFuture(new AuthenticationException("Authentication data has been expired"));
}
final String result = element.get("entityType").getAsString();
// Run in another thread to attempt to test the async logic
return CompletableFuture.supplyAsync(() -> result);
@Override
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
BasicAuthenticationState basicAuthenticationState = new BasicAuthenticationState(authData);
return CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole);
}
}

Expand Down Expand Up @@ -271,4 +337,75 @@ private PulsarClient createPulsarClient(String proxyServiceUrl, String authParam
.authentication(BasicAuthentication.class.getName(), authParams)
.connectionsPerBroker(numberOfConnections).build();
}

@Test
void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws Exception {
log.info("-- Starting {} test --", methodName);

String namespaceName = "my-property/my-ns";
String topicName = "persistent://my-property/my-ns/my-topic1";

admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

// Important: When forwardAuthorizationCredentials=false, broker should not authenticate original auth data
// because the proxy doesn't forward it. Set authenticateOriginalAuthData=false to match this behavior.
conf.setAuthenticateOriginalAuthData(false);

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2 seconds
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setClusterName(CLUSTER_NAME);

// Proxy auth with long expiry
String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000);
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);

Set<String> providers = new HashSet<>();
providers.add(BasicAuthenticationProvider.class.getName());
proxyConfig.setAuthenticationProviders(providers);
proxyConfig.setForwardAuthorizationCredentials(false);

@Cleanup
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
@Cleanup
final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();
@Cleanup
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);
proxyService.start();
final String proxyServiceUrl = proxyService.getServiceUrl();

// Create client with credentials that will expire in 3 seconds
long clientExpireTime = System.currentTimeMillis() + 3 * 1000;
String clientAuthParams = "entityType:client,expiryTime:" + clientExpireTime;

@Cleanup
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);

@Cleanup
var producer =
proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5, TimeUnit.SECONDS).create();
producer.send("test message".getBytes());

Awaitility.await().untilAsserted(() -> {
assertThatThrownBy(() -> producer.send("test message after expiry".getBytes()))
.isExactlyInstanceOf(PulsarClientException.TimeoutException.class);
});

if (producer instanceof ProducerImpl<byte[]> producerImpl) {
long lastDisconnectedTimestamp = producerImpl.getLastDisconnectedTimestamp();
assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,37 +179,25 @@ public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {

PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
pulsarClient.getPartitionsForTopic(topic).get();
Set<CompletableFuture<ClientCnx>> connections = pulsarClientImpl.getCnxPool().getConnections();

Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
pulsarClient.getPartitionsForTopic(topic).get();
assertTrue(connections.stream().allMatch(n -> {
try {
ClientCnx clientCnx = n.get();
long timestamp = clientCnx.getLastDisconnectedTimestamp();
return timestamp == 0;
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
});
// Verify initial connection state
Set<CompletableFuture<ClientCnx>> connections = pulsarClientImpl.getCnxPool().getConnections();

// Force all connections from proxy to broker to close and therefore require the proxy to re-authenticate with
// the broker. (The client doesn't lose this connection.)
restartBroker();

// Rerun assertion to ensure that it still works
Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
pulsarClient.getPartitionsForTopic(topic).get();
assertTrue(connections.stream().allMatch(n -> {
try {
ClientCnx clientCnx = n.get();
long timestamp = clientCnx.getLastDisconnectedTimestamp();
return timestamp == 0;
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
});
Awaitility.await()
.during(5, SECONDS)
.untilAsserted(() -> {
for (CompletableFuture<ClientCnx> cf : connections) {
try {
ClientCnx clientCnx = cf.get();
long timestamp = clientCnx.getLastDisconnectedTimestamp();
// If forwardAuthData is false, the broker cannot see the client's authentication data.
// As a result, the broker cannot perform any refresh operations on the client's auth data.
// Only the proxy has visibility of the client's connection state.
assertTrue(forwardAuthData ? timestamp == 0 : timestamp > 0);
} catch (Exception e) {
throw new AssertionError("Failed to get connection state", e);
}
}
});
}
}
Loading