diff --git a/build.gradle b/build.gradle
index 75a4354cc9439..4dbe7d7907c99 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1453,6 +1453,8 @@ project(':connect:runtime') {
testCompile libs.powermockEasymock
testCompile project(':clients').sourceSets.test.output
+ testCompile project(':core')
+ testCompile project(':core').sourceSets.test.output
testRuntime libs.slf4jlog4j
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c69f94d25bbce..8c98f8d447865 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -347,8 +347,6 @@
-
-
@@ -366,6 +364,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index dd43c37cadda5..a6c6d98facacc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerInfo;
@@ -54,62 +55,26 @@
public class ConnectDistributed {
private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
+ private final Time time = Time.SYSTEM;
+ private final long initStart = time.hiResClockMs();
+
public static void main(String[] args) {
+
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
log.info("Usage: ConnectDistributed worker.properties");
Exit.exit(1);
}
try {
- Time time = Time.SYSTEM;
- log.info("Kafka Connect distributed worker initializing ...");
- long initStart = time.hiResClockMs();
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
String workerPropsFile = args[0];
Map workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
-
- log.info("Scanning for plugin classes. This might take a moment ...");
- Plugins plugins = new Plugins(workerProps);
- plugins.compareAndSwapWithDelegatingLoader();
- DistributedConfig config = new DistributedConfig(workerProps);
-
- String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
- log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
- RestServer rest = new RestServer(config);
- URI advertisedUrl = rest.advertisedUrl();
- String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
-
- KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
- offsetBackingStore.configure(config);
-
- Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
- WorkerConfigTransformer configTransformer = worker.configTransformer();
-
- Converter internalValueConverter = worker.getInternalValueConverter();
- StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
- statusBackingStore.configure(config);
-
- ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
- internalValueConverter,
- config,
- configTransformer);
-
- DistributedHerder herder = new DistributedHerder(config, time, worker,
- kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl.toString());
- final Connect connect = new Connect(herder, rest);
- log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
- try {
- connect.start();
- } catch (Exception e) {
- log.error("Failed to start Connect", e);
- connect.stop();
- Exit.exit(3);
- }
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
+
+ ConnectDistributed connectDistributed = new ConnectDistributed();
+ Connect connect = connectDistributed.startConnect(workerProps);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
@@ -119,4 +84,55 @@ public static void main(String[] args) {
Exit.exit(2);
}
}
+
+ public Connect startConnect(Map workerProps) {
+ log.info("Scanning for plugin classes. This might take a moment ...");
+ Plugins plugins = new Plugins(workerProps);
+ plugins.compareAndSwapWithDelegatingLoader();
+ DistributedConfig config = new DistributedConfig(workerProps);
+
+ String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ log.debug("Kafka cluster ID: {}", kafkaClusterId);
+
+ RestServer rest = new RestServer(config);
+ HerderProvider provider = new HerderProvider();
+ rest.start(provider, plugins);
+
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+ KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+ offsetBackingStore.configure(config);
+
+ Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+ WorkerConfigTransformer configTransformer = worker.configTransformer();
+
+ Converter internalValueConverter = worker.getInternalValueConverter();
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+ statusBackingStore.configure(config);
+
+ ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
+ internalValueConverter,
+ config,
+ configTransformer);
+
+ DistributedHerder herder = new DistributedHerder(config, time, worker,
+ kafkaClusterId, statusBackingStore, configBackingStore,
+ advertisedUrl.toString());
+
+ final Connect connect = new Connect(herder, rest);
+ log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
+ try {
+ connect.start();
+ // herder has initialized now, and ready to be used by the RestServer.
+ provider.setHerder(herder);
+ } catch (Exception e) {
+ log.error("Failed to start Connect", e);
+ connect.stop();
+ Exit.exit(3);
+ }
+
+ return connect;
+ }
+
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 846ed1a883570..965046cccf003 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -20,6 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,7 +51,6 @@ public void start() {
Runtime.getRuntime().addShutdownHook(shutdownHook);
herder.start();
- rest.start(herder);
log.info("Kafka Connect started");
} finally {
@@ -82,6 +82,11 @@ public void awaitStop() {
}
}
+ // Visible for testing
+ public URI restUrl() {
+ return rest.serverUrl();
+ }
+
private class ShutdownHook extends Thread {
@Override
public void run() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
new file mode 100644
index 0000000000000..42c0925a704a2
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A supplier for {@link Herder}s.
+ */
+public class HerderProvider {
+
+ private final CountDownLatch initialized = new CountDownLatch(1);
+ volatile Herder herder = null;
+
+ public HerderProvider() {
+ }
+
+ /**
+ * Create a herder provider with a herder.
+ * @param herder the herder that will be supplied to threads waiting on this provider
+ */
+ public HerderProvider(Herder herder) {
+ this.herder = herder;
+ initialized.countDown();
+ }
+
+ /**
+ * @return the contained herder.
+ * @throws ConnectException if a herder was not available within a duration of calling this method
+ */
+ public Herder get() {
+ try {
+ // wait for herder to be initialized
+ if (!initialized.await(1, TimeUnit.MINUTES)) {
+ throw new ConnectException("Timed out waiting for herder to be initialized.");
+ }
+ } catch (InterruptedException e) {
+ throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
+ }
+ return herder;
+ }
+
+ /**
+ * @param herder set a herder, and signal to all threads waiting on get().
+ */
+ public void setHerder(Herder herder) {
+ this.herder = herder;
+ initialized.countDown();
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index a0f7fdeea9fc8..ea93a72d5006e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -22,7 +22,7 @@
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.Callback;
@@ -34,16 +34,16 @@
public class ConnectClusterStateImpl implements ConnectClusterState {
- private Herder herder;
+ private HerderProvider herderProvider;
- public ConnectClusterStateImpl(Herder herder) {
- this.herder = herder;
+ public ConnectClusterStateImpl(HerderProvider herderProvider) {
+ this.herderProvider = herderProvider;
}
@Override
public Collection connectors() {
final Collection connectors = new ArrayList<>();
- herder.connectors(new Callback>() {
+ herderProvider.get().connectors(new Callback>() {
@Override
public void onCompletion(Throwable error, Collection result) {
connectors.addAll(result);
@@ -55,7 +55,7 @@ public void onCompletion(Throwable error, Collection result) {
@Override
public ConnectorHealth connectorHealth(String connName) {
- ConnectorStateInfo state = herder.connectorStatus(connName);
+ ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
ConnectorState connectorState = new ConnectorState(
state.connector().state(),
state.connector().workerId(),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index c0d83f291039e..5cc31cd5cca3c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -17,14 +17,14 @@
package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -50,6 +50,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -60,9 +62,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.UriBuilder;
-
/**
* Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/
@@ -161,20 +160,20 @@ public Connector createConnector(String listener) {
return connector;
}
- public void start(Herder herder) {
+ public void start(HerderProvider herderProvider, Plugins plugins) {
log.info("Starting REST server");
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider());
- resourceConfig.register(new RootResource(herder));
- resourceConfig.register(new ConnectorsResource(herder, config));
- resourceConfig.register(new ConnectorPluginsResource(herder));
+ resourceConfig.register(new RootResource(herderProvider));
+ resourceConfig.register(new ConnectorsResource(herderProvider, config));
+ resourceConfig.register(new ConnectorPluginsResource(herderProvider));
resourceConfig.register(ConnectExceptionMapper.class);
resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
- registerRestExtensions(herder, resourceConfig);
+ registerRestExtensions(herderProvider, plugins, resourceConfig);
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
@@ -220,7 +219,9 @@ public void start(Herder herder) {
log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
}
-
+ public URI serverUrl() {
+ return jettyServer.getURI();
+ }
public void stop() {
log.info("Stopping REST server");
@@ -264,7 +265,7 @@ else if (serverConnector != null && serverConnector.getHost() != null && serverC
Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
if (advertisedPort != null)
builder.port(advertisedPort);
- else if (serverConnector != null)
+ else if (serverConnector != null && serverConnector.getPort() > 0)
builder.port(serverConnector.getPort());
log.info("Advertised URI: {}", builder.build());
@@ -302,15 +303,15 @@ ServerConnector findConnector(String protocol) {
return null;
}
- void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
- connectRestExtensions = herder.plugins().newPlugins(
+ void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) {
+ connectRestExtensions = plugins.newPlugins(
config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
config, ConnectRestExtension.class);
ConnectRestExtensionContext connectRestExtensionContext =
new ConnectRestExtensionContextImpl(
new ConnectRestConfigurable(resourceConfig),
- new ConnectClusterStateImpl(herder)
+ new ConnectClusterStateImpl(provider)
);
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index 80192ca064489..6280473af964d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -90,7 +90,10 @@ public String trace() {
}
public static class ConnectorState extends AbstractState {
- public ConnectorState(String state, String worker, String msg) {
+ @JsonCreator
+ public ConnectorState(@JsonProperty("state") String state,
+ @JsonProperty("worker_id") String worker,
+ @JsonProperty("msg") String msg) {
super(state, worker, msg);
}
}
@@ -98,7 +101,11 @@ public ConnectorState(String state, String worker, String msg) {
public static class TaskState extends AbstractState implements Comparable {
private final int id;
- public TaskState(int id, String state, String worker, String msg) {
+ @JsonCreator
+ public TaskState(@JsonProperty("id") int id,
+ @JsonProperty("state") String state,
+ @JsonProperty("worker_id") String worker,
+ @JsonProperty("msg") String msg) {
super(state, worker, msg);
this.id = id;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 24eb93b8c0d5f..87f25b29cb51a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -18,7 +18,7 @@
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
@@ -49,7 +49,7 @@
public class ConnectorPluginsResource {
private static final String ALIAS_SUFFIX = "Connector";
- private final Herder herder;
+ private final HerderProvider herderProvider;
private final List connectorPlugins;
private static final List> CONNECTOR_EXCLUDES = Arrays.asList(
@@ -58,8 +58,8 @@ public class ConnectorPluginsResource {
SchemaSourceConnector.class
);
- public ConnectorPluginsResource(Herder herder) {
- this.herder = herder;
+ public ConnectorPluginsResource(HerderProvider herderProvider) {
+ this.herderProvider = herderProvider;
this.connectorPlugins = new ArrayList<>();
}
@@ -78,7 +78,7 @@ public ConfigInfos validateConfigs(
);
}
- return herder.validateConnectorConfig(connectorConfig);
+ return herderProvider.get().validateConnectorConfig(connectorConfig);
}
@GET
@@ -90,7 +90,7 @@ public List listConnectorPlugins() {
// TODO: improve once plugins are allowed to be added/removed during runtime.
private synchronized List getConnectorPlugins() {
if (connectorPlugins.isEmpty()) {
- for (PluginDesc plugin : herder.plugins().connectors()) {
+ for (PluginDesc plugin : herderProvider.get().plugins().connectors()) {
if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
connectorPlugins.add(new ConnectorPluginInfo(plugin));
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index e9661046d31e9..29a8c39028ef7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,6 +19,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
@@ -67,21 +68,25 @@ public class ConnectorsResource {
// but currently a worker simply leaving the group can take this long as well.
private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
- private final Herder herder;
+ private final HerderProvider herderProvider;
private final WorkerConfig config;
@javax.ws.rs.core.Context
private ServletContext context;
- public ConnectorsResource(Herder herder, WorkerConfig config) {
- this.herder = herder;
+ public ConnectorsResource(HerderProvider herder, WorkerConfig config) {
+ this.herderProvider = herder;
this.config = config;
}
+ private Herder herder() {
+ return herderProvider.get();
+ }
+
@GET
@Path("/")
public Collection listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback> cb = new FutureCallback<>();
- herder.connectors(cb);
+ herder().connectors(cb);
return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference>() {
}, forward);
}
@@ -99,7 +104,7 @@ public Response createConnector(final @QueryParam("forward") Boolean forward,
checkAndPutConnectorConfigName(name, configs);
FutureCallback> cb = new FutureCallback<>();
- herder.putConnectorConfig(name, configs, false, cb);
+ herder().putConnectorConfig(name, configs, false, cb);
Herder.Created info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward);
@@ -112,7 +117,7 @@ public Response createConnector(final @QueryParam("forward") Boolean forward,
public ConnectorInfo getConnector(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback cb = new FutureCallback<>();
- herder.connectorInfo(connector, cb);
+ herder().connectorInfo(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
}
@@ -121,14 +126,14 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector
public Map getConnectorConfig(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback