diff --git a/.idea/compiler.xml b/.idea/compiler.xml index ce92787..2d77875 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -8,10 +8,11 @@ - + + - + diff --git a/.idea/misc.xml b/.idea/misc.xml index 0d18312..ff8932e 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,9 +1,12 @@ + + + - + \ No newline at end of file diff --git a/build.gradle b/build.gradle index 5127cbc..67cbf4d 100644 --- a/build.gradle +++ b/build.gradle @@ -32,6 +32,9 @@ repositories { mavenLocal() jcenter() mavenCentral() + maven { + url "https://nexus.somnomedics.eu/nexus/repository/3thparty" + } } dependencies { @@ -57,7 +60,7 @@ dependencies { } compile 'org.ow2.asm:asm:7.3.1' compile 'cglib:cglib:3.3.0' - compile 'com.google.inject:guice:4.2.2:classes' + compile 'com.google.inject:guice:4.2.2' // for class-path scanning (auto-register JAX-RS resources) compile 'org.reflections:reflections:0.9.12' @@ -77,7 +80,7 @@ dependencies { compile 'io.jsonwebtoken:jjwt:0.9.1' - compile 'io.undertow:undertow-websockets-jsr:2.0.30.Final' + compile 'io.undertow:undertow-websockets-jsr:2.2.3.Final' compile 'com.fasterxml.jackson.core:jackson-core:2.9.6' compile 'com.fasterxml.jackson.core:jackson-annotations:2.9.6' diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index c5f89b6..41010d2 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ #Fri Mar 27 16:38:54 CET 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.2-bin.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/src/main/java/net/talpidae/base/Base.java b/src/main/java/net/talpidae/base/Base.java index 6774dae..0d6dfcf 100644 --- a/src/main/java/net/talpidae/base/Base.java +++ b/src/main/java/net/talpidae/base/Base.java @@ -27,7 +27,6 @@ import com.google.inject.name.Names; import net.talpidae.base.database.DataBaseModule; -import net.talpidae.base.insect.InsectModule; import net.talpidae.base.mapper.MapperModule; import net.talpidae.base.server.ServerModule; import net.talpidae.base.util.Application; @@ -144,7 +143,6 @@ protected void configure() install(new DataBaseModule()); install(new MapperModule()); install(new ServerModule()); - install(new InsectModule()); OptionalBinder.newOptionalBinder(binder(), ShutdownHook.class).setDefault().to(DefaultShutdownHook.class); } diff --git a/src/main/java/net/talpidae/base/client/ClientModule.java b/src/main/java/net/talpidae/base/client/ClientModule.java index 2bd8dbb..4f27e7e 100644 --- a/src/main/java/net/talpidae/base/client/ClientModule.java +++ b/src/main/java/net/talpidae/base/client/ClientModule.java @@ -60,8 +60,6 @@ protected void configure() bind(ObjectMapperProvider.class); bind(AuthenticationInheritanceRequestFilter.class); bind(AuthScopeTokenForwardRequestFilter.class); - bind(InsectNameUserAgentRequestFilter.class); - bind(LoadBalancingRequestFilter.class); bind(LoadBalancingWebTargetFactory.class); OptionalBinder.newOptionalBinder(binder(), ClientConfiguration.class).setDefault().to(DefaultClientConfig.class); diff --git a/src/main/java/net/talpidae/base/client/InsectNameUserAgentRequestFilter.java b/src/main/java/net/talpidae/base/client/InsectNameUserAgentRequestFilter.java deleted file mode 100644 index 773b556..0000000 --- a/src/main/java/net/talpidae/base/client/InsectNameUserAgentRequestFilter.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.client; - -import net.talpidae.base.insect.config.SlaveSettings; - -import java.io.IOException; -import java.util.Optional; - -import javax.inject.Inject; -import javax.inject.Singleton; -import javax.ws.rs.client.ClientRequestContext; -import javax.ws.rs.client.ClientRequestFilter; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.ext.Provider; - -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import static com.google.common.base.Strings.isNullOrEmpty; - - -/** - * Adds the insect name to the user-agent header value. - */ -@Singleton -@Provider -@Slf4j -public class InsectNameUserAgentRequestFilter implements ClientRequestFilter -{ - private final String insectName; - - - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - @Inject - public InsectNameUserAgentRequestFilter(Optional slaveSettings) - { - insectName = slaveSettings.map(SlaveSettings::getName).orElse(null); - } - - - @Override - public void filter(ClientRequestContext requestContext) throws IOException - { - if (insectName != null) - { - val userAgent = requestContext.getHeaderString(HttpHeaders.USER_AGENT); - val nextUserAgent = isNullOrEmpty(userAgent) ? insectName : insectName + "/" + userAgent; - - requestContext.getHeaders().putSingle(HttpHeaders.USER_AGENT, nextUserAgent); - } - } -} diff --git a/src/main/java/net/talpidae/base/client/LoadBalancingProxyWebTarget.java b/src/main/java/net/talpidae/base/client/LoadBalancingProxyWebTarget.java index a357e03..54257e8 100644 --- a/src/main/java/net/talpidae/base/client/LoadBalancingProxyWebTarget.java +++ b/src/main/java/net/talpidae/base/client/LoadBalancingProxyWebTarget.java @@ -55,7 +55,7 @@ public T getProxyWebTarget() if (resource == null) { // by convention we always use the fully qualified interface name as route - val newResource = webTargetFactoryProvider.get().newWebTarget(serviceInterface.getName()).proxy(serviceInterface); + val newResource = webTargetFactoryProvider.get().newWebTarget(serviceInterface.getSimpleName()).proxy(serviceInterface); if (resourceRef.compareAndSet(null, newResource)) { return newResource; diff --git a/src/main/java/net/talpidae/base/client/LoadBalancingRequestFilter.java b/src/main/java/net/talpidae/base/client/LoadBalancingRequestFilter.java deleted file mode 100644 index 23726f1..0000000 --- a/src/main/java/net/talpidae/base/client/LoadBalancingRequestFilter.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.client; - -import net.talpidae.base.insect.Insect; -import net.talpidae.base.insect.Slave; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -import javax.inject.Inject; -import javax.inject.Singleton; -import javax.ws.rs.client.ClientRequestContext; -import javax.ws.rs.client.ClientRequestFilter; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriBuilderException; -import javax.ws.rs.ext.Provider; - -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -@Singleton -@Provider -@Slf4j -public class LoadBalancingRequestFilter implements ClientRequestFilter -{ - // Must be a valid port that indicates route lookup - public static final String ROUTE_PROPERTY_NAME = Insect.class.getPackage().toString() + ".Route"; - - private final Slave slave; - - @Inject - public LoadBalancingRequestFilter(Slave slave) - { - this.slave = slave; - } - - - private static String getRequestRoute(ClientRequestContext requestContext) - { - val routeObject = requestContext.getConfiguration().getProperty(ROUTE_PROPERTY_NAME); - if (routeObject instanceof String) - { - return (String) routeObject; - } - - return null; - } - - - private URI replaceHostAndPort(URI target, String host, int port) - { - try - { - return UriBuilder.fromUri(target) - .host(host) - .port(port) - .build(); - } - catch (UriBuilderException e) - { - log.error("failed to rewrite request for {}:{}", host, port); - } - - return null; - } - - - @Override - public void filter(ClientRequestContext requestContext) throws IOException - { - // if we have no route property attached there is no action to perform - val route = getRequestRoute(requestContext); - if (route != null) - { - try - { - val address = slave.findService(route); - if (address != null) - { - // successfully redirected request - val rewrittenUri = replaceHostAndPort(requestContext.getUri(), address.getHostString(), address.getPort()); - if (rewrittenUri != null) - { - requestContext.setUri(rewrittenUri); - } - - return; - } - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - - log.debug("failed to lookup route for "); - requestContext.abortWith(Response.status(Response.Status.SERVICE_UNAVAILABLE) - .entity("Unable to lookup service for route: " + route) - .build()); - } - } -} diff --git a/src/main/java/net/talpidae/base/client/LoadBalancingWebTargetFactory.java b/src/main/java/net/talpidae/base/client/LoadBalancingWebTargetFactory.java index d606932..20d36fd 100644 --- a/src/main/java/net/talpidae/base/client/LoadBalancingWebTargetFactory.java +++ b/src/main/java/net/talpidae/base/client/LoadBalancingWebTargetFactory.java @@ -28,6 +28,8 @@ import lombok.NonNull; import lombok.val; +import java.util.Locale; + @Singleton public class LoadBalancingWebTargetFactory @@ -48,8 +50,7 @@ public LoadBalancingWebTargetFactory(@NonNull ClientConfiguration clientConfig) public ResteasyWebTarget newWebTarget(@NonNull String route) { // host/port are replaced later by LoadBalancingRequestFilter - val webTarget = client.target("http://127.0.0.1:0"); - webTarget.property(LoadBalancingRequestFilter.ROUTE_PROPERTY_NAME, route); + val webTarget = client.target("http://" + route.toLowerCase(Locale.ROOT)); return (ResteasyWebTarget) webTarget; } diff --git a/src/main/java/net/talpidae/base/insect/AsyncInsectWrapper.java b/src/main/java/net/talpidae/base/insect/AsyncInsectWrapper.java deleted file mode 100644 index 462dc7d..0000000 --- a/src/main/java/net/talpidae/base/insect/AsyncInsectWrapper.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import net.talpidae.base.util.log.LoggingConfigurer; - -import java.util.Objects; - -import static net.talpidae.base.util.log.LoggingConfigurer.CONTEXT_INSECT_NAME_KEY; - - -@Slf4j -public class AsyncInsectWrapper> implements CloseableRunnable -{ - @Getter - private final T insect; - - private final LoggingConfigurer loggingConfigurer; - - private InsectWorker insectWorker; - - - AsyncInsectWrapper(T insect, LoggingConfigurer loggingConfigurer) - { - this.insect = insect; - this.loggingConfigurer = loggingConfigurer; - } - - - @Override - public void run() - { - // use actual insect name from now on (for logging) - updateLoggingContext(); - - // spawn worker thread - insectWorker = InsectWorker.start(insect, insect.getClass().getSimpleName()); - } - - - @Override - public void close() - { - if (insectWorker.shutdown()) - { - insectWorker = null; - log.debug("worker shut down"); - } - } - - - public boolean isRunning() - { - return insectWorker != null && insect.isRunning(); - } - - - private void updateLoggingContext() - { - val actualInsectName = getInsect().getSettings().getName(); - val currentInsectName = loggingConfigurer.getContext(CONTEXT_INSECT_NAME_KEY); - if (!Objects.equals(actualInsectName, currentInsectName)) - { - log.debug("context: {} changed from {} to {}", - CONTEXT_INSECT_NAME_KEY, - loggingConfigurer.getContext(CONTEXT_INSECT_NAME_KEY), - actualInsectName); - - loggingConfigurer.putContext(CONTEXT_INSECT_NAME_KEY, actualInsectName); - } - } -} diff --git a/src/main/java/net/talpidae/base/insect/AsyncQueen.java b/src/main/java/net/talpidae/base/insect/AsyncQueen.java deleted file mode 100644 index 07f39f3..0000000 --- a/src/main/java/net/talpidae/base/insect/AsyncQueen.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import com.google.inject.Singleton; - -import net.talpidae.base.insect.state.InsectState; -import net.talpidae.base.util.log.LoggingConfigurer; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.stream.Stream; - -import javax.inject.Inject; - -import lombok.extern.slf4j.Slf4j; - - -@Singleton -@Slf4j -public class AsyncQueen extends AsyncInsectWrapper implements Queen -{ - @Inject - public AsyncQueen(SyncQueen syncQueen, LoggingConfigurer loggingConfigurer) - { - super(syncQueen, loggingConfigurer); - } - - @Override - public void initializeInsectState(Stream> stateStream) - { - getInsect().initializeInsectState(stateStream); - } - - @Override - public Stream getLiveInsectState() - { - return getInsect().getLiveInsectState(); - } - - @Override - public void sendShutdown(InetSocketAddress remote) - { - getInsect().sendShutdown(remote); - } - - @Override - public void setIsOutOfService(String route, InetSocketAddress socketAddress, boolean isOutOfService) - { - getInsect().setIsOutOfService(route, socketAddress, isOutOfService); - } -} diff --git a/src/main/java/net/talpidae/base/insect/AsyncSlave.java b/src/main/java/net/talpidae/base/insect/AsyncSlave.java deleted file mode 100644 index 37adf96..0000000 --- a/src/main/java/net/talpidae/base/insect/AsyncSlave.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import com.google.inject.Singleton; - -import net.talpidae.base.insect.state.ServiceState; -import net.talpidae.base.util.log.LoggingConfigurer; -import net.talpidae.base.util.performance.Metric; - -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Queue; - -import javax.inject.Inject; - -import lombok.extern.slf4j.Slf4j; - - -@Singleton -@Slf4j -public class AsyncSlave extends AsyncInsectWrapper implements Slave -{ - @Inject - public AsyncSlave(SyncSlave syncSlave, LoggingConfigurer loggingConfigurer) - { - super(syncSlave, loggingConfigurer); - } - - - @Override - public InetSocketAddress findService(String route) throws InterruptedException - { - return getInsect().findService(route); - } - - @Override - public InetSocketAddress findService(String route, long timeoutMillies) throws InterruptedException - { - return getInsect().findService(route, timeoutMillies); - } - - @Override - public List findServices(String route, long timeoutMillies) throws InterruptedException - { - return getInsect().findServices(route, timeoutMillies); - } - - @Override - public void forwardMetrics(Queue metricQueue) - { - getInsect().forwardMetrics(metricQueue); - } -} diff --git a/src/main/java/net/talpidae/base/insect/CloseableRunnable.java b/src/main/java/net/talpidae/base/insect/CloseableRunnable.java deleted file mode 100644 index e65a518..0000000 --- a/src/main/java/net/talpidae/base/insect/CloseableRunnable.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import java.io.Closeable; - - -public interface CloseableRunnable extends Runnable, Closeable -{ - boolean isRunning(); -} diff --git a/src/main/java/net/talpidae/base/insect/Insect.java b/src/main/java/net/talpidae/base/insect/Insect.java deleted file mode 100644 index 0a8b9ef..0000000 --- a/src/main/java/net/talpidae/base/insect/Insect.java +++ /dev/null @@ -1,693 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import com.google.common.base.Strings; - -import net.talpidae.base.insect.config.InsectSettings; -import net.talpidae.base.insect.exchange.MessageExchange; -import net.talpidae.base.insect.exchange.MessageQueueControl; -import net.talpidae.base.insect.message.InsectMessage; -import net.talpidae.base.insect.message.payload.Invalidate; -import net.talpidae.base.insect.message.payload.Mapping; -import net.talpidae.base.insect.message.payload.Metrics; -import net.talpidae.base.insect.message.payload.Payload; -import net.talpidae.base.insect.message.payload.Shutdown; -import net.talpidae.base.insect.state.InsectState; -import net.talpidae.base.util.network.NetworkUtil; -import net.talpidae.base.util.random.AtomicXorShiftRandom; - -import java.net.InetSocketAddress; -import java.nio.charset.CharacterCodingException; -import java.util.Arrays; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.function.Function; - -import lombok.AccessLevel; -import lombok.Getter; -import lombok.Setter; -import lombok.Value; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -@Slf4j -public abstract class Insect extends MessageExchange implements CloseableRunnable -{ - private static final InsectCollection EMPTY_ROUTE = new InsectCollection(0L) - { - @Override - InsectState compute(InetSocketAddress key, InsectStateUpdateFunction updateFunction) - { - updateFunction.apply(null); - - return null; - } - }; - - private final boolean onlyTrustedRemotes; - - private final boolean remoteOnLocalHost; - - // route -> Set (we use a map to efficiently lookup insects) - @Getter(AccessLevel.PROTECTED) - private final Map routeToInsects = new ConcurrentHashMap<>(); - - @Getter(AccessLevel.PROTECTED) - private final S settings; - - private final long pulseDelayCutoff; - - private final Deque outBoundMessages = new ConcurrentLinkedDeque<>(); - - @Getter(AccessLevel.PROTECTED) - protected AtomicXorShiftRandom random = new AtomicXorShiftRandom(); - - @Getter - private long pulseDelayMillies = 0L; - - /** - * Highest sane timestamp value encountered so far. - *

- * This timestamp is used for automatic self-preservation. - */ - @Getter - @Setter(AccessLevel.PROTECTED) - private long maximumTimestamp; - - - Insect(S settings, boolean onlyTrustedRemotes) - { - super(InsectMessage::new, settings); - - this.settings = settings; - this.onlyTrustedRemotes = onlyTrustedRemotes; - - this.remoteOnLocalHost = settings.getRemotes().stream() - .map(InetSocketAddress::getAddress) - .anyMatch(NetworkUtil::isLocalAddress); - - this.pulseDelayCutoff = TimeUnit.MILLISECONDS.toNanos(settings.getPulseDelay() + (settings.getPulseDelay() / 2)); - } - - /** - * Get an empty route. - */ - protected InsectCollection emptyRoute() - { - return EMPTY_ROUTE; - } - - - @Override - public void run() - { - pulseDelayMillies = getSettings().getPulseDelay(); - routeToInsects.clear(); - outBoundMessages.clear(); - - try - { - // run MessageExchange loop - super.run(); - } - catch (Exception e) - { - log.error("insect shutdown because of critical error", e); - } - } - - - /** - * Override to perform actions based on a pulse at or below the pulse delay. - * - * @param nowNanos The current monotonic clock value as returned by System.nanoTime(). - * @return Maximum time to wait for next pulse in nanoseconds. - */ - protected long handlePulse(long nowNanos) - { - // find and remove all timed out instances - val deadlineNanos = maximumTimestamp - (pulseDelayCutoff * 2); - val limitRemovedCount = Math.max(1, routeToInsects.size() / 5); // remove max 20% of instances in one go - - int removed = 0; - for (val alternatives : routeToInsects.values()) - { - removed += alternatives.truncateOlderThan(deadlineNanos, this::handleTimeout); - if (removed >= limitRemovedCount) - break; - } - - // reduce batch size a little by splitting up work over time - return pulseDelayMillies / 4; - } - - - @Override - public void close() - { - prepareShutdown(); - - super.close(); - } - - - /** - * Override if additional actions are required before shutting down. - */ - protected void prepareShutdown() - { - } - - - /** - * Override to implement additional logic after a mapping message has been handled by the default handler. - */ - protected void postHandleMapping(InsectState state, Mapping mapping, boolean isNewMapping, boolean isDependencyMapping) - { - - } - - - /** - * Override to do something when new dependencies are published. - */ - protected void handleDependenciesChanged(InsectState state) - { - - } - - - /** - * Override to do something when an insect timed out and got removed from the pool. - */ - protected void handleTimeout(InsectState timedOutState) - { - - } - - - /** - * Override to implement remote shutdown. - */ - protected void handleShutdown() - { - - } - - - /** - * Override to implement invalidate (slave needs to send critical information again). - */ - protected void handleInvalidate() - { - - } - - - /** - * Override to handle metrics (usually not relayed). - */ - protected void handleMetrics(Metrics metrics) - { - - } - - - /** - * Pack and send a message with the specified payload to the given destination. - */ - protected void addMessage(InetSocketAddress destination, Payload payload) - { - val queueControl = getQueueControl(); - if (queueControl != null) - { - // general case: - // message was created from within processMessages() context (same thread) - ((InsectMessage) queueControl.addOutbound(destination)).setPayload(payload); - } - else - { - // rare case: - // queue outbound message from other thread before construction in processMessages() - outBoundMessages.offerLast(new OutBoundMessage(destination, payload)); - - // schedule call to processMessages() which actually sends our message - wakeup(); - } - } - - - private boolean checkSenderTrust(InetSocketAddress remote) - { - return (remote.getAddress().isLoopbackAddress() && remoteOnLocalHost) - || getSettings().getRemotes().contains(remote); - } - - - private void handleMessage(long nowNanos, InsectMessage message) - { - val remote = message.getRemoteAddress(); - val isTrustedServer = remote.getAddress().isLoopbackAddress() || getSettings().getRemotes().contains(remote); - if (isTrustedServer || !onlyTrustedRemotes) - { - try - { - val payload = message.getPayload(); - switch (payload.getType()) - { - case Mapping.TYPE_MAPPING: - { - // validate client address (avoid message spoofing) - if (isTrustedServer || ((Mapping) payload).isAuthorative(remote)) - { - handleMapping(nowNanos, (Mapping) payload); - } - else - { - log.warn("possible spoofing: remote {} not authorized to send message: {}", remote, payload); - } - break; - } - - case Invalidate.TYPE_INVALIDATE: - { - log.debug("received invalidate message from remote {}", remote); - handleInvalidate(); - break; - } - - case Shutdown.TYPE_SHUTDOWN: - { - log.debug("received shutdown message from remote {}", remote); - handleShutdown(); - break; - } - - case Metrics.TYPE_METRIC: - { - handleMetrics((Metrics) payload); - } - } - } - catch (CharacterCodingException | IndexOutOfBoundsException e) - { - log.warn("received malformed message from: {}", remote); - } - } - else - { - log.warn("rejected message from untrusted source: {}", remote); - } - } - - private void handleMapping(long nowNanos, Mapping mapping) - { - val alternatives = routeToInsects.computeIfAbsent(mapping.getRoute(), r -> new InsectCollection(pulseDelayCutoff)); - - // do we have an existing entry for this slave? - val nextState = alternatives.compute(mapping.getSocketAddress(), state -> - { - val remoteTimestamp = mapping.getTimestamp(); - val nextStateBuilder = InsectState.builder() - .name(mapping.getName()); - - final InetSocketAddress socketAddress; - if (state != null) - { - // merge new dependency with those already known - nextStateBuilder.dependencies(state.getDependencies()); - - // out-of-service is sticky - nextStateBuilder.isOutOfService(state.isOutOfService()); - - // do not keep resolving the same host:port combo - val oldHost = state.getSocketAddress().getHostString(); - val oldPort = state.getSocketAddress().getPort(); - socketAddress = (Objects.equals(mapping.getHost(), oldHost) && oldPort == mapping.getPort()) - ? state.getSocketAddress() - : null; - - // perform timestamp calculation magic - // the point is to use the heartbeat timestamp from the REMOTE - // as a measure of latency/CPU load on that service instance - val remoteEpoch = state.getTimestampEpochRemote(); - val localEpoch = state.getTimestampEpochLocal(); - val adjustedTimestamp = localEpoch + (remoteTimestamp - remoteEpoch); - val previousAdjustedTimestamp = state.getTimestamp(); - val pulseDelayNanos = TimeUnit.MILLISECONDS.toNanos(getPulseDelayMillies()); - if (adjustedTimestamp < previousAdjustedTimestamp || adjustedTimestamp > (previousAdjustedTimestamp + pulseDelayNanos + (pulseDelayNanos >>> 1))) - { - // missed heartbeat package or service restarted, need to reset epoch - nextStateBuilder.newEpoch(nowNanos, remoteTimestamp); - } - else - { - nextStateBuilder.timestampEpochLocal(localEpoch) - .timestampEpochRemote(remoteEpoch) - .timestamp(localEpoch + (remoteTimestamp - remoteEpoch)); - } - } - else - { - socketAddress = null; - - nextStateBuilder.newEpoch(nowNanos, remoteTimestamp); - } - - // resolve once - nextStateBuilder.socketAddress((socketAddress != null) ? socketAddress : new InetSocketAddress(mapping.getHost(), mapping.getPort())); - - val newDependency = mapping.getDependency(); - if (!newDependency.isEmpty()) - { - nextStateBuilder.dependency(newDependency); - } - - // InsectState is key and value at the same time - return nextStateBuilder.build(); - }); - - // let descendants add more actions - final boolean isNewMapping; - if (nextState != null) - { - maximumTimestamp = nextState.getTimestamp(); - isNewMapping = mapping.getTimestamp() == nextState.getTimestampEpochRemote(); - } - else - { - isNewMapping = false; - } - - val isDependencyMapping = !Strings.isNullOrEmpty(mapping.getDependency()); - postHandleMapping(nextState, mapping, isNewMapping, isDependencyMapping); - - if (isNewMapping || isDependencyMapping) - { - // inform about changed dependencies - handleDependenciesChanged(nextState); - } - } - - /** - * Handle inbound and send outbound messages. - */ - @Override - protected long processMessages(MessageQueueControl control) - { - // add already queued up messages - OutBoundMessage outBoundMessage; - while ((outBoundMessage = outBoundMessages.poll()) != null) - { - try - { - addMessage(outBoundMessage.getDestination(), outBoundMessage.getPayload()); - } - catch (Throwable e) - { - log.error("error adding outbound message of type {} for remote {}: {}", outBoundMessage.getClass().getSimpleName(), outBoundMessage.getDestination().getHostString(), e.getMessage()); - } - } - - val nowNanos = System.nanoTime(); - InsectMessage inBoundMessage; - while ((inBoundMessage = control.pollInbound()) != null) - { - try - { - handleMessage(nowNanos, inBoundMessage); - } - catch (Throwable e) - { - log.error("error handling message from remote {}: {}: {}", inBoundMessage.getRemoteAddress(), e.getClass().getSimpleName(), e.getMessage()); - } - } - - // call handlePulse() as soon as the last queued inbound message has been processed - return handlePulse(nowNanos); - } - - - /** - * Initialize the specified route with and empty InsectCollection. - */ - protected InsectCollection initializeRoute(String route) - { - return routeToInsects.computeIfAbsent(route, (key) -> new InsectCollection(pulseDelayCutoff)); - } - - - @FunctionalInterface - protected interface InsectStateUpdateFunction extends Function - { - - } - - - /** - * Holder for queued up outbound message content. - */ - @Value - private static class OutBoundMessage - { - private final InetSocketAddress destination; - - private final Payload payload; - } - - - private static final class InsectPool - { - private static final InsectPool EMPTY = new InsectPool(new InsectState[0], 0); - - private final InsectState[] insects; - - private final List allInsects; - - private final List activeInsects; - - - private InsectPool(InsectState[] insects, int activeEndIndex) - { - this.insects = insects; - this.allInsects = Arrays.asList(insects); - this.activeInsects = allInsects.subList(0, activeEndIndex); - } - } - - - protected static class InsectCollection - { - private static final int ACTION_REPLACE = 0; - - private static final int ACTION_ADD = 1; - - private static final int ACTION_REMOVE = -1; - - private final AtomicReference insects = new AtomicReference<>(InsectPool.EMPTY); - - private final long pulseDelayCutoff; - - - private InsectCollection(long pulseDelayCutoff) - { - this.pulseDelayCutoff = pulseDelayCutoff; - } - - - private static int indexOf(InsectState[] array, InetSocketAddress socketAddress) - { - for (int i = 0; i < array.length; ++i) - { - val otherAddress = array[i].getSocketAddress(); - if (socketAddress.getPort() == otherAddress.getPort()) - { - val hostString = socketAddress.getHostString(); - val otherHostString = otherAddress.getHostString(); - if (Objects.equals(hostString, otherHostString)) - { - return i; - } - } - } - - return -1; - } - - - public List getActive() - { - return insects.get().activeInsects; - } - - - public List getAll() - { - return insects.get().allInsects; - } - - - void clear() - { - insects.set(InsectPool.EMPTY); - } - - - /** - * Truncate the collection, removing all insects whose timestamp is older than the specified deadline. - * - * @return Number of timed-out instances that have been removed. - */ - int truncateOlderThan(long deadlineNanos, Consumer removedConsumer) - { - InsectPool pool; - InsectPool nextPool; - - int count = 0; - do - { - pool = insects.get(); // acquire - val activeEndIndex = pool.activeInsects.size(); - val existing = pool.insects; - - int i = existing.length - 1; - while (i >= 0 && existing[i].getTimestamp() < deadlineNanos) - { - --i; - } - - val nextLength = i + 1; - count = existing.length - nextLength; - if (nextLength == existing.length) - { - // nothing to do - break; - } - else if (nextLength > 0) - { - val next = new InsectState[nextLength]; - System.arraycopy(existing, 0, next, 0, Math.min(existing.length, nextLength)); - - nextPool = new InsectPool(next, activeEndIndex); - } - else - { - nextPool = InsectPool.EMPTY; - } - } - while (!insects.compareAndSet(pool, nextPool)); - - // notify about removed instances - for (int i = pool.insects.length - 1; i >= pool.insects.length - count; --i) - { - removedConsumer.accept(pool.insects[i]); - } - - return count; - } - - - /** - * Updates this collection of insects. - * - * @param key The address of the insect to compute. - * @param updateFunction Function that will create a new immutable InsectState value, based on the previous value. - * @return The new value or null if none exists (the previous item has been removed or didn't exist in the first place). - */ - InsectState compute(InetSocketAddress key, InsectStateUpdateFunction updateFunction) - { - InsectPool pool; - InsectPool nextPool; - InsectState nextState; - do - { - pool = insects.get(); // acquire - - val existing = pool.insects; - val index = indexOf(existing, key); - nextState = updateFunction.apply(index >= 0 ? existing[index] : null); - - final int action; - if (index >= 0) - { - if (nextState != null) - { - action = ACTION_REPLACE; - } - else - { - action = ACTION_REMOVE; - } - } - else if (nextState != null) - { - // add - action = ACTION_ADD; - } - else - { - // no-op - return null; - } - - val nextLength = existing.length + action; - if (nextLength > 0) - { - val next = new InsectState[nextLength]; - System.arraycopy(existing, 0, next, 0, Math.min(existing.length, nextLength)); - next[action == ACTION_ADD ? existing.length : index] = (action != ACTION_REMOVE) ? nextState : existing[existing.length - 1]; - - // copy of last array has been updated now - - // sort by timestamp descending (to allow faster lookup by slave) - Arrays.sort(next, (s1, s2) -> -Long.compare(s1.getTimestamp(), s2.getTimestamp())); - - // find first timed-out insect - val timestampCutOff = next[0].getTimestamp() - pulseDelayCutoff; - int i; - for (i = 1; i < next.length; ++i) - { - if (next[i].getTimestamp() < timestampCutOff) - { - // cutoff reached - break; - } - } - - nextPool = new InsectPool(next, i); - } - else - { - nextPool = InsectPool.EMPTY; - } - } - while (!insects.compareAndSet(pool, nextPool)); - - return nextState; - } - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/InsectModule.java b/src/main/java/net/talpidae/base/insect/InsectModule.java deleted file mode 100644 index 94b0798..0000000 --- a/src/main/java/net/talpidae/base/insect/InsectModule.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import com.google.inject.AbstractModule; -import com.google.inject.multibindings.OptionalBinder; - -import net.talpidae.base.insect.config.DefaultQueenSettings; -import net.talpidae.base.insect.config.DefaultSlaveSettings; -import net.talpidae.base.insect.config.QueenSettings; -import net.talpidae.base.insect.config.SlaveSettings; -import net.talpidae.base.insect.metrics.MetricsSink; - - -public class InsectModule extends AbstractModule -{ - @Override - protected void configure() - { - OptionalBinder.newOptionalBinder(binder(), QueenSettings.class).setDefault().to(DefaultQueenSettings.class); - OptionalBinder.newOptionalBinder(binder(), Queen.class).setDefault().to(AsyncQueen.class); - - OptionalBinder.newOptionalBinder(binder(), SlaveSettings.class).setDefault().to(DefaultSlaveSettings.class); - OptionalBinder.newOptionalBinder(binder(), Slave.class).setDefault().to(AsyncSlave.class); - - OptionalBinder.newOptionalBinder(binder(), MetricsSink.class); - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/InsectWorker.java b/src/main/java/net/talpidae/base/insect/InsectWorker.java deleted file mode 100644 index 680c597..0000000 --- a/src/main/java/net/talpidae/base/insect/InsectWorker.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import java.util.concurrent.TimeUnit; - -import static java.lang.Thread.State.TERMINATED; - - -@Slf4j -class InsectWorker extends Thread -{ - private static final long WORKER_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(5000L); - - - private InsectWorker(Runnable task) - { - super(task); - } - - public static InsectWorker start(CloseableRunnable task, String name) - { - val worker = new InsectWorker(task); - worker.setPriority(Thread.MIN_PRIORITY); - worker.setName(name); - - val before = System.nanoTime(); - val timeout = before + WORKER_TIMEOUT; - worker.start(); - try - { - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (task) - { - while (!task.isRunning()) - { - val remaining = System.nanoTime() - timeout; - if (remaining <= 0) - break; - - task.wait(remaining); - } - } - - log.debug("worker {} started within {}ms", name, Math.max(0L, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - before))); - } - catch (InterruptedException e) - { - log.warn("wait for startup of {} was interrupted", name); - } - - return worker; - } - - public boolean shutdown() - { - try - { - interrupt(); - join(WORKER_TIMEOUT); - } - catch (InterruptedException e) - { - interrupt(); - } - - val isTerminated = getState() == TERMINATED; - if (!isTerminated) - { - log.warn("failed to stop worker {}", getName()); - } - - return isTerminated; - } -} diff --git a/src/main/java/net/talpidae/base/insect/Queen.java b/src/main/java/net/talpidae/base/insect/Queen.java deleted file mode 100644 index 53936b7..0000000 --- a/src/main/java/net/talpidae/base/insect/Queen.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import net.talpidae.base.insect.state.InsectState; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.stream.Stream; - - -public interface Queen extends CloseableRunnable -{ - /** - * Initialize the state before calling run(). - */ - void initializeInsectState(Stream> stateStream); - - - /** - * Get a (live) stream of all current service state. - */ - Stream getLiveInsectState(); - - /** - * Send a shutdown request to a slave. - */ - void sendShutdown(InetSocketAddress remote); - - /** - * Set a services out-of-service flag. - */ - void setIsOutOfService(String route, InetSocketAddress socketAddress, boolean isOutOfService); -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/Slave.java b/src/main/java/net/talpidae/base/insect/Slave.java deleted file mode 100644 index 6e4656e..0000000 --- a/src/main/java/net/talpidae/base/insect/Slave.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import net.talpidae.base.insect.state.ServiceState; -import net.talpidae.base.util.performance.Metric; - -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Queue; - - -public interface Slave extends CloseableRunnable -{ - /** - * Try to find a service for route, register route as a dependency and block in case it isn't available immediately. - */ - InetSocketAddress findService(String route) throws InterruptedException; - - /** - * Try to find a service for route, register route as a dependency and block in case it isn't available immediately. - * - * @return Address of discovered service if one was discovered before a timeout occurred, null otherwise. - */ - InetSocketAddress findService(String route, long timeoutMillies) throws InterruptedException; - - /** - * Return all known services for route, register route as a dependency and block in case there are none available immediately. - * - * @return Discovered services if any were discovered before a timeout occurred, null otherwise. - */ - List findServices(String route, long timeoutMillies) throws InterruptedException; - - - /** - * Send some of the specified metrics from the front of the queue. - */ - void forwardMetrics(Queue metricQueue); -} diff --git a/src/main/java/net/talpidae/base/insect/SyncQueen.java b/src/main/java/net/talpidae/base/insect/SyncQueen.java deleted file mode 100644 index f022717..0000000 --- a/src/main/java/net/talpidae/base/insect/SyncQueen.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; - -import net.talpidae.base.event.ServerShutdown; -import net.talpidae.base.insect.config.QueenSettings; -import net.talpidae.base.insect.message.payload.Invalidate; -import net.talpidae.base.insect.message.payload.Mapping; -import net.talpidae.base.insect.message.payload.Shutdown; -import net.talpidae.base.insect.state.InsectState; - -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Map; -import java.util.stream.Stream; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import lombok.AccessLevel; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -@Singleton -@Slf4j -public class SyncQueen extends Insect implements Queen -{ - @Getter(AccessLevel.PROTECTED) - private final EventBus eventBus; - - - @Inject - public SyncQueen(QueenSettings settings, EventBus eventBus) - { - super(settings, false); - - this.eventBus = eventBus; - - eventBus.register(this); - } - - private static Mapping createMappingFromState(InsectState state, String route) - { - return Mapping.builder() - .name(state.getName()) - .host(state.getSocketAddress().getHostString()) - .port(state.getSocketAddress().getPort()) - .timestamp(state.getTimestamp()) - .route(route) - .socketAddress(state.getSocketAddress()) - .build(); - } - - @Override - public void initializeInsectState(Stream> stateStream) - { - if (!isRunning()) - { - stateStream.forEach(entry -> - { - initializeRoute(entry.getKey()) - .compute(entry.getValue().getSocketAddress(), state -> - (state != null) ? state : entry.getValue() - ); - }); - } - else - { - throw new IllegalStateException("queen is already running"); - } - } - - /** - * Get a (live) stream of all current service state. - */ - @Override - public Stream getLiveInsectState() - { - return getRouteToInsects().values() - .stream() - .map(InsectCollection::getAll) - .flatMap(Collection::stream); - } - - /** - * Send a shutdown request to a slave. - */ - @Override - public void sendShutdown(InetSocketAddress remote) - { - val shutdown = (Shutdown) Shutdown.builder() - .type(Shutdown.TYPE_SHUTDOWN) - .magic(Shutdown.MAGIC) - .build(); - - addMessage(remote, shutdown); - } - - @Override - public void setIsOutOfService(String route, InetSocketAddress socketAddress, boolean isOutOfService) - { - getRouteToInsects().getOrDefault(route, emptyRoute()) - .compute(socketAddress, state -> - (state != null) ? - // copy everything but the isOutOfService flag - InsectState.builder() - .name(state.getName()) - .isOutOfService(isOutOfService) - .timestampEpochRemote(state.getTimestampEpochRemote()) - .timestamp(state.getTimestamp()) - .timestampEpochLocal(state.getTimestampEpochLocal()) - .dependencies(state.getDependencies()) - .socketAddress(state.getSocketAddress()) - .build() - : null - ); - } - - - @Override - protected void postHandleMapping(InsectState state, Mapping mapping, boolean isNewMapping, boolean isDependencyMapping) - { - if (isNewMapping) - { - sendInvalidate(state.getSocketAddress()); - } - - if (isDependencyMapping) - { - handleDependencyRequest(state, mapping); - } - else - { - relayMapping(state, mapping); - } - } - - /** - * Send an invalidate request to a slave. - */ - private void sendInvalidate(InetSocketAddress remote) - { - val invalidate = (Invalidate) Invalidate.builder() - .type(Invalidate.TYPE_INVALIDATE) - .magic(Invalidate.MAGIC) - .build(); - - addMessage(remote, invalidate); - } - - - @Subscribe - protected void onServerShutdown(ServerShutdown serverShutdown) - { - eventBus.unregister(this); - close(); - } - - - /** - * Immediately respond with one of the mappings we have in case a dependency was published. - * Otherwise the sender would have to wait for the next pulse to reach it. - */ - private void handleDependencyRequest(InsectState state, Mapping mapping) - { - val alternatives = getRouteToInsects().getOrDefault(mapping.getDependency(), emptyRoute()).getActive(); - val size = alternatives.size(); - if (size > 0) - { - // find random non out-of-service insect - val startIndex = getRandom().nextInt(size); - for (int i = 0; i < size; ++i) - { - val candidate = alternatives.get((i + startIndex) % size); - if (!candidate.isOutOfService()) - { - addMessage(candidate.getSocketAddress(), createMappingFromState(candidate, mapping.getDependency())); - } - } - } - } - - /** - * Relay updates to all interested services (those that have this services route in their dependencies). - */ - private void relayMapping(final InsectState state, final Mapping mapping) - { - if (state.isOutOfService()) - { - // do relay mappings for out-of-service services - return; - } - - getRouteToInsects().forEach((route, states) -> - { - val mappingRoute = mapping.getRoute(); - if (route != null) - { - for (val s : states.getActive()) - { - if (s.getDependencies().contains(mappingRoute)) - { - val destination = s.getSocketAddress(); - - if (destination.isUnresolved()) - { - log.debug("unresolved address {}", destination); - } - - addMessage(destination, mapping); - } - } - } - }); - } -} diff --git a/src/main/java/net/talpidae/base/insect/SyncSlave.java b/src/main/java/net/talpidae/base/insect/SyncSlave.java deleted file mode 100644 index a32148c..0000000 --- a/src/main/java/net/talpidae/base/insect/SyncSlave.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect; - -import com.google.common.base.Strings; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; -import com.google.inject.Singleton; - -import net.talpidae.base.event.Invalidate; -import net.talpidae.base.event.ServerShutdown; -import net.talpidae.base.event.Shutdown; -import net.talpidae.base.insect.config.SlaveSettings; -import net.talpidae.base.insect.message.payload.Mapping; -import net.talpidae.base.insect.message.payload.Metrics; -import net.talpidae.base.insect.message.payload.Payload; -import net.talpidae.base.insect.state.InsectState; -import net.talpidae.base.insect.state.ServiceState; -import net.talpidae.base.util.network.NetworkUtil; -import net.talpidae.base.util.performance.Metric; - -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; - -import javax.inject.Inject; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -@Singleton -@Slf4j -public class SyncSlave extends Insect implements Slave -{ - private static final long DEPENDENCY_RESEND_MILLIES_MIN = TimeUnit.MILLISECONDS.toMillis(100); - - private static final long DEPENDENCY_RESEND_MILLIES_MAX = TimeUnit.SECONDS.toMillis(12); - - private final Map dependencies = new ConcurrentHashMap<>(); - - private final EventBus eventBus; - - private final NetworkUtil networkUtil; - - private long nextHeartBeatNanos = 0L; - - @Getter - private volatile boolean isRunning = false; - - @Inject - public SyncSlave(SlaveSettings settings, EventBus eventBus, NetworkUtil networkUtil) - { - super(settings, true); - - this.eventBus = eventBus; - this.networkUtil = networkUtil; - - eventBus.register(this); - } - - - @Override - public void run() - { - try - { - synchronized (this) - { - isRunning = true; - notifyAll(); - } - - if (Strings.isNullOrEmpty(getSettings().getRoute())) - { - log.debug("argument for parameter \"route\" is empty, won't publish anything"); - } - - nextHeartBeatNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(getSettings().getPulseDelay()); - super.run(); - } - finally - { - isRunning = false; - } - } - - /** - * Try to find a service for route, register route as a dependency and block in case it isn't available immediately. - */ - @Override - public InetSocketAddress findService(String route) throws InterruptedException - { - return findService(route, Long.MAX_VALUE); - } - - /** - * Try to find a service for route, register route as a dependency and block in case it isn't available immediately. - * - * @return Address of discovered service if one was discovered before a timeout occurred, null otherwise. - */ - @Override - public InetSocketAddress findService(String route, long timeoutMillies) throws InterruptedException - { - // we may occasionally get an empty collection from findServices() - val alternatives = findServices(route, timeoutMillies); - val size = alternatives.size(); - if (size > 0) - { - // pick a random service from the pool - return alternatives.get(getRandom().nextInt(size)).getSocketAddress(); - } - - // timeout - return null; - } - - /** - * Return all known services for route, register route as a dependency and block in case there are none available immediately. - * - * @return Discovered services if any were discovered before a timeout occurred, empty list otherwise. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - @Override - public List findServices(String route, long timeoutMillies) throws InterruptedException - { - List alternatives = lookupServices(route); - if (!alternatives.isEmpty()) - { - // fast path - return alternatives; - } - - long nowNanos = System.nanoTime(); - val timeout = (timeoutMillies >= 0) ? TimeUnit.NANOSECONDS.toMillis(nowNanos) + timeoutMillies : Long.MAX_VALUE; - long waitInterval = DEPENDENCY_RESEND_MILLIES_MIN; - - val routeWaiter = dependencies.computeIfAbsent(route, k -> new RouteWaiter()); - do - { - // indicate that we are waiting for this route to be discovered - switch (routeWaiter.advanceDiscoveryState(nowNanos)) - { - case SEND: - // send out discovery request - requestDependency(nowNanos, route); - - // fall-through - - case DONE: - alternatives = lookupServices(route); - if (!alternatives.isEmpty()) - { - routeWaiter.setDiscoveryComplete(); - dependencies.remove(route); - - return alternatives; - } - } - - // wait for news on this route - val maxRemainingMillies = timeout - TimeUnit.NANOSECONDS.toMillis(nowNanos); - val waitMillies = Math.min(Math.min(waitInterval, maxRemainingMillies), DEPENDENCY_RESEND_MILLIES_MAX); - if (waitMillies >= 0L) - { - synchronized (routeWaiter) - { - routeWaiter.wait(waitMillies); - } - } - else - { - break; - } - - waitInterval = waitInterval * 2; - nowNanos = System.nanoTime(); - } - while (true); - - log.warn("findService(): timeout for route: {}", route); - return alternatives; - } - - - @Override - public void forwardMetrics(Queue metricQueue) - { - val metrics = Metrics.builder() - .metrics(metricQueue) - .build(); - - sendToRemotes((settings, remote) -> metrics); - } - - - private List lookupServices(String route) - { - return getRouteToInsects().getOrDefault(route, emptyRoute()).getActive(); - } - - - @Override - protected void postHandleMapping(InsectState state, Mapping mapping, boolean isNewMapping, boolean isDependencyMapping) - { - if (isNewMapping) - { - // notify findService() callers blocking for route discovery - val routeWaiter = dependencies.get(mapping.getRoute()); - if (routeWaiter != null) - { - routeWaiter.setDiscoveryComplete(); - dependencies.remove(mapping.getRoute()); - } - } - } - - @Override - protected void handleShutdown() - { - // tell listeners that we received a shutdown request - eventBus.post(new Shutdown()); - } - - - @Subscribe - protected void onServerShutdown(ServerShutdown serverShutdown) - { - eventBus.unregister(this); - close(); - } - - - @Override - protected void handleInvalidate() - { - // drop cached remotes - getRouteToInsects().values().forEach(InsectCollection::clear); - - // tell listeners that we received an invalidate request - eventBus.post(new Invalidate()); - } - - - @Override - protected long handlePulse(long nowNanos) - { - val maximumWaitTime = super.handlePulse(nowNanos); - - if (nowNanos >= nextHeartBeatNanos) - { - sendHeartbeat(nowNanos); - - // scheduled next heartbeat, taking overshoot (delay) of this heartbeat into account - nextHeartBeatNanos = nowNanos + Math.max(0, (TimeUnit.MILLISECONDS.toNanos(getPulseDelayMillies()) - Math.max(0L, (nowNanos - nextHeartBeatNanos)))); - } - - return Math.max(1L, Math.min(maximumWaitTime, TimeUnit.NANOSECONDS.toMillis(nextHeartBeatNanos - nowNanos))); - } - - - private void sendHeartbeat(long nowNanos) - { - val bindSocketAddress = getSettings().getBindAddress(); - val hostAddress = getSettings().getBindAddress().getAddress(); - val port = getSettings().getBindAddress().getPort(); - - sendToRemotes((settings, remote) -> - { - val remoteAddress = remote.getAddress(); - - final String host = (hostAddress != null) - ? networkUtil.getReachableLocalAddress(hostAddress, remoteAddress).getHostAddress() - : bindSocketAddress.getHostString(); - - return Mapping.builder() - .host(host) - .port(port) - .timestamp(nowNanos) - .route(settings.getRoute()) - .name(settings.getName()) - .socketAddress(InetSocketAddress.createUnresolved(host, port)) - .build(); - }); - } - - - private void requestDependency(long nowNanos, String requestedRoute) - { - val bindSocketAddress = getSettings().getBindAddress(); - val hostAddress = getSettings().getBindAddress().getAddress(); - val port = getSettings().getBindAddress().getPort(); - - sendToRemotes((settings, remote) -> - { - val remoteAddress = remote.getAddress(); - - final String host = (hostAddress != null) - ? networkUtil.getReachableLocalAddress(hostAddress, remoteAddress).getHostAddress() - : bindSocketAddress.getHostString(); - - return Mapping.builder() - .host(host) - .port(port) - .timestamp(nowNanos) - .route(settings.getRoute()) - .name(settings.getName()) - .dependency(requestedRoute) - .socketAddress(InetSocketAddress.createUnresolved(host, port)) - .build(); - }); - } - - - private void sendToRemotes(BiFunction payloadProducer) - { - val settings = getSettings(); - for (val remote : settings.getRemotes()) - { - addMessage(remote, payloadProducer.apply(settings, remote)); - } - } - - - private static final class RouteWaiter - { - private final AtomicLong discoveryState = new AtomicLong(0L); - - private volatile long resendNanos = TimeUnit.MILLISECONDS.toNanos(DEPENDENCY_RESEND_MILLIES_MIN); - - - RouteWaiter() - { - } - - - State advanceDiscoveryState(long currentNanos) - { - val state = discoveryState.get(); - if (state <= currentNanos - resendNanos) - { - if (discoveryState.compareAndSet(state, currentNanos)) - { - resendNanos = Math.min(resendNanos * 2, DEPENDENCY_RESEND_MILLIES_MAX); - return State.SEND; - } - } - else if (state == Long.MAX_VALUE) - { - // discovery complete - return State.DONE; - } - - return State.WAIT; - } - - - State getDiscoveryState() - { - return discoveryState.get() == Long.MAX_VALUE ? State.DONE : State.WAIT; - } - - - void setDiscoveryComplete() - { - discoveryState.set(Long.MAX_VALUE); - - synchronized (this) - { - notifyAll(); - } - } - - - enum State - { - WAIT, - DONE, - SEND - } - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/config/DefaultQueenSettings.java b/src/main/java/net/talpidae/base/insect/config/DefaultQueenSettings.java deleted file mode 100644 index dc8a9b4..0000000 --- a/src/main/java/net/talpidae/base/insect/config/DefaultQueenSettings.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.config; - -import com.google.inject.Singleton; -import lombok.Getter; -import lombok.NonNull; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import net.talpidae.base.server.ServerConfig; -import net.talpidae.base.util.log.LoggingConfigurer; -import net.talpidae.base.util.names.InsectNameGenerator; - -import javax.inject.Inject; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.Set; - -import static net.talpidae.base.util.log.LoggingConfigurer.CONTEXT_INSECT_NAME_KEY; - - -@Slf4j -@Singleton -@Setter -@Getter -public class DefaultQueenSettings implements QueenSettings -{ - @Setter - @Getter - private String name; - - @NonNull - private InetSocketAddress bindAddress; - - @NonNull - private Set remotes; - - private long pulseDelay = DEFAULT_PULSE_DELAY; - - private long restInPeaceTimeout = DEFAULT_REST_IN_PEACE_TIMEOUT; - - @Inject - public DefaultQueenSettings(ServerConfig serverConfig, LoggingConfigurer loggingConfigurer, InsectNameGenerator insectNameGenerator) - { - name = insectNameGenerator.compose().replace(' ', '-').intern(); - bindAddress = new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()); - remotes = Collections.emptySet(); - - loggingConfigurer.putContext(CONTEXT_INSECT_NAME_KEY, name); - } -} diff --git a/src/main/java/net/talpidae/base/insect/config/DefaultSlaveSettings.java b/src/main/java/net/talpidae/base/insect/config/DefaultSlaveSettings.java deleted file mode 100644 index 03db680..0000000 --- a/src/main/java/net/talpidae/base/insect/config/DefaultSlaveSettings.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.config; - -import com.google.common.base.Strings; -import com.google.common.net.HostAndPort; -import com.google.inject.Singleton; - -import net.talpidae.base.server.ServerConfig; -import net.talpidae.base.util.BaseArguments; -import net.talpidae.base.util.log.LoggingConfigurer; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import javax.inject.Inject; - -import lombok.Getter; -import lombok.NonNull; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import static net.talpidae.base.util.log.LoggingConfigurer.CONTEXT_INSECT_NAME_KEY; - - -@Singleton -@Setter -@Getter -@Slf4j -public class DefaultSlaveSettings implements SlaveSettings -{ - @Setter - @Getter - private String name; - - @NonNull - private InetSocketAddress bindAddress; - - @NonNull - private Set remotes; - - @NonNull - private String route; - - private long pulseDelay = DEFAULT_PULSE_DELAY; - - private long restInPeaceTimeout; - - - @Inject - public DefaultSlaveSettings(ServerConfig serverConfig, BaseArguments baseArguments, LoggingConfigurer loggingConfigurer) - { - this.bindAddress = new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()); - - val parser = baseArguments.getOptionParser(); - val nameOption = parser.accepts("insect.name").withRequiredArg().required(); - val remoteOption = parser.accepts("insect.slave.remote").withRequiredArg().required(); - val timeoutOption = parser.accepts("insect.slave.timeout").withRequiredArg().ofType(Long.class).defaultsTo(DEFAULT_REST_IN_PEACE_TIMEOUT); - val options = baseArguments.parse(); - - this.name = options.valueOf(nameOption).intern(); - this.restInPeaceTimeout = options.valueOf(timeoutOption); - - val remotes = new HashSet(); - for (val remoteValue : options.valuesOf(remoteOption)) - { - val remote = HostAndPort.fromString(remoteValue); - try - { - val host = remote.getHost(); - val port = remote.getPortOrDefault(QueenSettings.DEFAULT_PORT); - if (!Strings.isNullOrEmpty(host)) - { - val socketAddress = new InetSocketAddress(host.intern(), port); - if (socketAddress.isUnresolved()) - { - throw new IllegalArgumentException("failed to resolve remote host: " + socketAddress.getHostString()); - } - - remotes.add(socketAddress); - continue; - } - } - catch (ArrayIndexOutOfBoundsException | NumberFormatException e) - { - // throw below - } - - throw new IllegalArgumentException("invalid host[:port] pair specified: " + remoteValue); - } - - this.remotes = Collections.unmodifiableSet(remotes); - - loggingConfigurer.putContext(CONTEXT_INSECT_NAME_KEY, name); - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/config/InsectSettings.java b/src/main/java/net/talpidae/base/insect/config/InsectSettings.java deleted file mode 100644 index 2e7dcbb..0000000 --- a/src/main/java/net/talpidae/base/insect/config/InsectSettings.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.config; - -import java.net.InetSocketAddress; -import java.util.Set; - - -public interface InsectSettings -{ - long DEFAULT_PULSE_DELAY = 1001; // 1001ms - long DEFAULT_REST_IN_PEACE_TIMEOUT = 5 * 60 * 1000; // 5 min - - /** - * Unique name of this instance. - */ - String getName(); - - void setName(String name); - - /** - * InetSocketAddress to bind to. - */ - InetSocketAddress getBindAddress(); - - void setBindAddress(InetSocketAddress bindAddress); - - /** - * Remote servers that are authorized to update mappings they do not own themselves - * and are informed about our services. - */ - Set getRemotes(); - - void setRemotes(Set remotes); - - /** - * Heart beat / pulse delay. - */ - long getPulseDelay(); - - void setPulseDelay(long pulseDelay); - - /** - * Timeout after which a service is declared dead and purged from the mapping. - * Since we always pick the youngest per route, this can be pretty high and will define - * the maximum permitted down-time of upstream servers (Queen instances). - */ - long getRestInPeaceTimeout(); - - void setRestInPeaceTimeout(long restInPeaceTimeout); -} diff --git a/src/main/java/net/talpidae/base/insect/config/QueenSettings.java b/src/main/java/net/talpidae/base/insect/config/QueenSettings.java deleted file mode 100644 index 02c3f5a..0000000 --- a/src/main/java/net/talpidae/base/insect/config/QueenSettings.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.config; - - -public interface QueenSettings extends InsectSettings -{ - int DEFAULT_PORT = 13300; -} diff --git a/src/main/java/net/talpidae/base/insect/config/SlaveSettings.java b/src/main/java/net/talpidae/base/insect/config/SlaveSettings.java deleted file mode 100644 index 54b8bc9..0000000 --- a/src/main/java/net/talpidae/base/insect/config/SlaveSettings.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.config; - -public interface SlaveSettings extends InsectSettings -{ - /** - * The route this slave provides. - */ - String getRoute(); - - void setRoute(String route); -} diff --git a/src/main/java/net/talpidae/base/insect/exchange/BaseMessage.java b/src/main/java/net/talpidae/base/insect/exchange/BaseMessage.java deleted file mode 100644 index 519c24d..0000000 --- a/src/main/java/net/talpidae/base/insect/exchange/BaseMessage.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.exchange; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; - -import lombok.AccessLevel; -import lombok.Getter; - - -public class BaseMessage -{ - @Getter(AccessLevel.PROTECTED) - private final ByteBuffer buffer; - - @Getter - private InetSocketAddress remoteAddress; - - - protected BaseMessage(int maximumSize) - { - buffer = ByteBuffer.allocateDirect(maximumSize); - } - - - /** - * Override this to perform additional cleanup. - */ - protected void clear() - { - - } - - - boolean receiveFrom(DatagramChannel channel) throws IOException - { - remoteAddress = (InetSocketAddress) channel.receive(buffer); - if (remoteAddress != null) - { - buffer.flip(); - return true; - } - - return false; - } - - - void setRemoteAddress(InetSocketAddress remoteAddress) - { - this.remoteAddress = remoteAddress; - } - - - boolean sendTo(DatagramChannel channel) throws IOException - { - return channel.send(buffer, remoteAddress) != 0; - } - - - void passivate() - { - clear(); // may be overridden - - buffer.clear(); - remoteAddress = null; - } -} diff --git a/src/main/java/net/talpidae/base/insect/exchange/MessageExchange.java b/src/main/java/net/talpidae/base/insect/exchange/MessageExchange.java deleted file mode 100644 index 4f9fe60..0000000 --- a/src/main/java/net/talpidae/base/insect/exchange/MessageExchange.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.exchange; - -import net.talpidae.base.insect.CloseableRunnable; -import net.talpidae.base.insect.config.InsectSettings; -import net.talpidae.base.util.pool.SoftReferenceObjectPool; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import static com.google.common.base.Strings.nullToEmpty; - - -@Slf4j -public abstract class MessageExchange implements CloseableRunnable -{ - private static final int MESSAGE_POOL_HARD_LIMIT = 512; - - private final InsectSettings settings; - - private final SoftReferenceObjectPool messagePool; - - private final Queue inbound = new ArrayDeque<>(); - - private final Queue outbound = new ArrayDeque<>(); - - private final ExchangeMessageQueueControl queueControl = new ExchangeMessageQueueControl(); - - private final AtomicLong runThreadId = new AtomicLong(-1); - - private Selector selector; - - private int activeInterestOps = SelectionKey.OP_READ; - - @Getter - private int port = 0; - - private String lastErrorMessage; - - - public MessageExchange(Supplier messageFactory, InsectSettings settings) - { - this.messagePool = new SoftReferenceObjectPool<>(messageFactory, MESSAGE_POOL_HARD_LIMIT); - this.settings = settings; - } - - - /** - * Wakeup exchange to force sending outbound messages and/or processing. - */ - protected void wakeup() throws IllegalStateException - { - // make sure our raw is processed as soon as possible - if (selector != null) - { - selector.wakeup(); - } - } - - /** - * Override this to process incoming messages and react to periodic events. - *

- * This allows to handle almost all logic on the same thread as the exchange and thus avoid - * extensive locking for thread-safety. - * - * @return Returns the maximum delay im milliseconds until the next invocation of this method. - */ - protected abstract long processMessages(MessageQueueControl control); - - public void run() - { - try - { - if (!runThreadId.compareAndSet(-1L, Thread.currentThread().getId())) - { - log.debug("already running, won't enter run again"); - return; - } - - synchronized (this) - { - notifyAll(); - } - - val key = setup(); - val channel = (DatagramChannel) key.channel(); - - log.info("MessageExchange running on {}", settings.getBindAddress().toString()); - - long maxWaitMillies; - M outboundMessage = null; - while (!Thread.interrupted()) - { - try - { - maxWaitMillies = processMessages(queueControl); - - // recycle messages removed from the inbound queue by processMessages() - queueControl.recycleConsumedMessages(); - - if (outboundMessage == null) - { - // look for new messages - outboundMessage = pollAndUpdateInterestSet(key); - } - - selector.select(maxWaitMillies); - if (key.isValid()) - { - boolean mayReadMore = key.isReadable(); - boolean mayWriteMore = key.isWritable() && outboundMessage != null; - - while (mayReadMore || mayWriteMore) - { - mayReadMore = mayReadMore && tryReceive(channel); - - mayWriteMore = mayWriteMore - && trySend(channel, outboundMessage) - && ((outboundMessage = outbound.poll()) != null); - } - } - } - catch (IOException e) - { - handleSelectError(e); - } - } - } - catch (Exception e) - { - handleRunError(e); - } - finally - { - close(); - runThreadId.set(-1); - } - } - - - public boolean isRunning() - { - return runThreadId.get() != -1L; - } - - - /** - * Get access to the current queue control. - * - * @return Queue control object if currently executing processMessages() and inside the same thread, null otherwise. - */ - protected MessageQueueControl getQueueControl() - { - val threadId = Thread.currentThread().getId(); - if (threadId == runThreadId.get()) - { - return queueControl; - } - - return null; - } - - @Override - public void close() - { - if (selector != null) - { - try { selector.close(); } catch (IOException e) { /* ignore */ } - } - } - - /** - * Rate limit by exception message. - * - * @return The exceptions message as obtained from getMessage() if not rate-limited, null otherwise. - */ - private String rateLimitByMessage(Exception e) - { - val originalMessage = e.getMessage(); - val message = originalMessage != null ? originalMessage : e.getClass().getName(); - if (lastErrorMessage == null || !lastErrorMessage.equals(message)) - { - lastErrorMessage = message; - return message; - } - - // rate-limited - return null; - } - - /** - * Handle error in main run() method. - */ - private void handleRunError(Exception e) - { - if (e instanceof ClosedSelectorException - || e instanceof ClosedChannelException - || e instanceof CancelledKeyException - || e instanceof InterruptedException) - { - log.info("shutting down, reason: {}: {}", e.getClass().getSimpleName(), e.getMessage()); - } - else - { - log.error("error during socket operation", e); - } - } - - - /** - * Handle error during select(). - */ - private void handleSelectError(IOException e) - { - val message = rateLimitByMessage(e); - if (message != null) - { - log.error("error selecting keys: {}", message); - } - } - - /** - * Handle error during receive(). - */ - private void handleReceiveError(IOException e) - { - val message = rateLimitByMessage(e); - if (message != null) - { - log.error("error during receive: {}", message); - } - } - - /** - * Log a send error and information about the dropped message. - */ - private void handleSendError(Exception e, M outboundMessage) - { - val message = rateLimitByMessage(e); - if (message != null) - { - if (message.equals("Invalid argument")) - { - log.error("error during send: {}, socket bound to {}", message, settings.getBindAddress()); - } - else - { - log.error("error during send: {}", message); - } - } - - // last error was same, drop this message - log.error("dropping outbound {} to {}", - outboundMessage.getClass().getSimpleName(), - outboundMessage.getRemoteAddress()); - } - - - private SelectionKey setup() throws IOException - { - outbound.clear(); - inbound.clear(); - lastErrorMessage = null; - selector = Selector.open(); - - val channel = DatagramChannel.open(); - - val bindAddress = settings.getBindAddress(); - channel.socket().bind(bindAddress); - port = channel.socket().getLocalPort(); - - channel.configureBlocking(false); - - return channel.register(selector, activeInterestOps); - } - - - private boolean tryReceive(DatagramChannel channel) - { - val message = messagePool.borrow(); - try - { - if (message.receiveFrom(channel)) - { - inbound.add(message); - return true; - } - } - catch (IOException e) - { - handleReceiveError(e); - } - - // nothing received, return message to pool - recycleMessage(message); - return false; - } - - - private boolean trySend(DatagramChannel channel, M message) - { - try - { - if (!message.sendTo(channel)) - { - // not ready for writing, try again later - return false; - } - } - catch (IOException | UnresolvedAddressException e) - { - handleSendError(e, message); - } - - // message handled or dropped because of send error - recycleMessage(message); - return true; - } - - - private M pollAndUpdateInterestSet(SelectionKey key) - { - final M message = outbound.poll(); - - final int interestOps; - if (message != null) - { - // we have packets to write and will still tryReceive - interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE; - } - else - { - // nothing to trySend, still interested inbound receiving - interestOps = SelectionKey.OP_READ; - } - - if (interestOps != activeInterestOps) - { - activeInterestOps = interestOps; - key.interestOps(interestOps); - } - - return message; - } - - - private void recycleMessage(M message) - { - message.passivate(); - messagePool.recycle(message); - } - - - private class ExchangeMessageQueueControl implements MessageQueueControl - { - private final List consumedInboundMessages = new ArrayList<>(); - - - @Override - public M addOutbound(InetSocketAddress remoteAddress) - { - val message = messagePool.borrow(); - - message.setRemoteAddress(remoteAddress); - outbound.add(message); - - // caller may fill message now - return message; - } - - @Override - public M pollInbound() - { - val message = inbound.poll(); - if (message != null) - { - consumedInboundMessages.add(message); - } - - return message; - } - - void recycleConsumedMessages() - { - for (int i = consumedInboundMessages.size() - 1; i >= 0; --i) - { - recycleMessage(consumedInboundMessages.remove(i)); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/exchange/MessageQueueControl.java b/src/main/java/net/talpidae/base/insect/exchange/MessageQueueControl.java deleted file mode 100644 index 196db4d..0000000 --- a/src/main/java/net/talpidae/base/insect/exchange/MessageQueueControl.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.exchange; - -import java.net.InetSocketAddress; - - -public interface MessageQueueControl -{ - /** - * Create and add a new outbound message. - */ - M addOutbound(InetSocketAddress remoteAddress); - - - /** - * Poll for new inbound messages. - * - * @return Reference to an inbound message, valid until the next call to pollInbound(). - * Returns null if there are no inbound messages. - */ - M pollInbound(); -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/message/InsectMessage.java b/src/main/java/net/talpidae/base/insect/message/InsectMessage.java deleted file mode 100644 index bdab16e..0000000 --- a/src/main/java/net/talpidae/base/insect/message/InsectMessage.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message; - -import net.talpidae.base.insect.exchange.BaseMessage; -import net.talpidae.base.insect.message.payload.Payload; -import net.talpidae.base.insect.message.payload.PayloadFactory; - -import java.nio.charset.CharacterCodingException; - -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -@Slf4j -public class InsectMessage extends BaseMessage -{ - private Payload payload; - - - public InsectMessage() - { - super(PayloadFactory.getMaximumSerializedSize()); - } - - - public Payload getPayload() throws IndexOutOfBoundsException, CharacterCodingException - { - if (payload == null) - { - val newMapping = PayloadFactory.unpackPayload(getBuffer(), 0); - if (newMapping != null) - { - payload = newMapping; - } - } - - return payload; - } - - - public void setPayload(Payload newPayload) - { - if (payload != null) - { - throw new IllegalStateException("payload has already been set"); - } - - payload = newPayload; - payload.to(getBuffer()); - - getBuffer().flip(); - } - - - @Override - protected void clear() - { - payload = null; - } -} diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Invalidate.java b/src/main/java/net/talpidae/base/insect/message/payload/Invalidate.java deleted file mode 100644 index 86c53aa..0000000 --- a/src/main/java/net/talpidae/base/insect/message/payload/Invalidate.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message.payload; - -import lombok.Builder; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import java.nio.ByteBuffer; - - -@Slf4j -@Builder -public class Invalidate extends Payload -{ - public static final int MAXIMUM_SERIALIZED_SIZE = 2; - - public static final int TYPE_INVALIDATE = 0x3; - - public static final int MAGIC = 0x73; - - @Getter - @Builder.Default - private final int type = TYPE_INVALIDATE; // 0x3: invalidate known remotes - - @Getter - @Builder.Default - private final int magic = MAGIC; // magic byte: 0x73 - - - static Invalidate from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException - { - val type = buffer.get(offset) & 0xFF; - if (type != TYPE_INVALIDATE) - { - return null; - } - - val magic = buffer.get(offset + 1) & 0xFF; - if (magic != MAGIC) - { - log.debug("encountered invalidate payload with invalid magic"); - return null; - } - - return Invalidate.builder() - .type(type) - .magic(magic) - .build(); - } - - - public void to(ByteBuffer buffer) - { - buffer.put((byte) type); - buffer.put((byte) magic); - } - - - @Override - public int getMaximumSize() - { - return MAXIMUM_SERIALIZED_SIZE; - } - - - @Override - public String toString() - { - return Integer.toHexString(getType()); - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Mapping.java b/src/main/java/net/talpidae/base/insect/message/payload/Mapping.java deleted file mode 100644 index 70196d8..0000000 --- a/src/main/java/net/talpidae/base/insect/message/payload/Mapping.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message.payload; - -import com.google.common.base.Strings; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; - -import lombok.Builder; -import lombok.Getter; -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import static net.talpidae.base.util.protocol.BinaryProtocolHelper.extractString; -import static net.talpidae.base.util.protocol.BinaryProtocolHelper.putTruncatedUTF8; - - -@Slf4j -@Builder -public class Mapping extends Payload -{ - public static final int MAXIMUM_SERIALIZED_SIZE = 1036; - - public static final int TYPE_MAPPING = 0x1; - - private static final int STRING_SIZE_MAX = 255; - - @Getter - @Builder.Default - private final int type = TYPE_MAPPING; // 0x1: mapping - - @Builder.Default - @Getter - private final int flags = 0; // 0x0 - - @Getter - private final long timestamp; // client System.nanoTime() - - @Getter - private final int port; // client port - - @Getter - private final String host; // client IPv4 address - - @Getter - private final String route; // route - - @Getter - private final String name; // name (unique service instance ID) - - @Builder.Default - @Getter - private final String dependency = ""; // empty or one of the services dependencies - - @Getter - @NonNull - private final InetSocketAddress socketAddress; - - - static Mapping from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException, CharacterCodingException - { - val type = buffer.get(offset) & 0xFF; - if (type != TYPE_MAPPING) - { - return null; - } - - val hostOffset = offset + 16; - val hostLength = buffer.get(offset + 2) & 0xFF; // length of host - val routeOffset = hostOffset + hostLength; - val routeLength = buffer.get(offset + 3) & 0xFF; // length of route - val port = buffer.getShort(offset + 12) & 0xFFFF; - val nameOffset = routeOffset + routeLength; - val nameLength = buffer.get(offset + 14) & 0xFF; // length of name - val dependencyOffset = nameOffset + nameLength; - val dependencyLength = buffer.get(offset + 15) & 0xFF; // length of dependency - - val host = extractString(buffer, hostOffset, hostLength).intern(); - return Mapping.builder() - .type(type) - .flags(buffer.get(offset + 1) & 0xFF) - .timestamp(buffer.getLong(offset + 4)) - .port(port) - .host(host) - .route(extractString(buffer, routeOffset, routeLength).intern()) - .name(extractString(buffer, nameOffset, nameLength).intern()) - .dependency(extractString(buffer, dependencyOffset, dependencyLength).intern()) - .socketAddress(InetSocketAddress.createUnresolved(host, port)) - .build(); - } - - - @Override - public void to(ByteBuffer buffer) - { - int begin = buffer.position(); - - // start writing dynamic fields behind static fields first - int offset = begin + 16; - val hostLength = putTruncatedUTF8(buffer, offset, host, STRING_SIZE_MAX); - val routeLength = putTruncatedUTF8(buffer, offset += hostLength, route, STRING_SIZE_MAX); - val nameLength = putTruncatedUTF8(buffer, offset += routeLength, name, STRING_SIZE_MAX); - val dependencyLength = putTruncatedUTF8(buffer, offset += nameLength, dependency, STRING_SIZE_MAX); - offset += dependencyLength; - - buffer.put(begin, (byte) type); - buffer.put(begin + 1, (byte) flags); - buffer.put(begin + 2, (byte) hostLength); - buffer.put(begin + 3, (byte) routeLength); - buffer.putLong(begin + 4, timestamp); - buffer.putShort(begin + 12, (short) port); - buffer.put(begin + 14, (byte) nameLength); - buffer.put(begin + 15, (byte) dependencyLength); - - buffer.position(offset); // include dynamic part - } - - - /** - * Check subject address against sender address. - */ - public boolean isAuthorative(InetSocketAddress remoteAddress) - { - val authorizedHostOrAddress = getSocketAddress().getHostString(); - val authorizedPort = getSocketAddress().getPort(); - - return authorizedPort == remoteAddress.getPort() - && (authorizedHostOrAddress.equals(remoteAddress.getHostString()) - || authorizedHostOrAddress.equals(remoteAddress.getAddress().getHostAddress())); - } - - @Override - public int getMaximumSize() - { - return MAXIMUM_SERIALIZED_SIZE; - } - - - @Override - public String toString() - { - val dependency = getDependency(); - return Integer.toHexString(getType()) + ", " + - Integer.toHexString(flags) + ", " + - getTimestamp() + ", " + - getPort() + ", " + - getRoute() + ", " + - getName() + - ((Strings.isNullOrEmpty(dependency)) ? "" : ", " + dependency); - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Metrics.java b/src/main/java/net/talpidae/base/insect/message/payload/Metrics.java deleted file mode 100644 index 478d370..0000000 --- a/src/main/java/net/talpidae/base/insect/message/payload/Metrics.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message.payload; - -import com.google.common.base.Utf8; -import lombok.Builder; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import net.talpidae.base.util.performance.Metric; - -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Queue; - -import static net.talpidae.base.util.protocol.BinaryProtocolHelper.extractString; -import static net.talpidae.base.util.protocol.BinaryProtocolHelper.putTruncatedUTF8; - - -@Slf4j -@Builder -public class Metrics extends Payload -{ - public static final int MAXIMUM_SERIALIZED_SIZE = 1472; - - public static final int TYPE_METRIC = 0x4; - - private static final int METRIC_COUNT_MAX = 255; - - private static final int STRING_SIZE_MAX = 255; - - - @Getter - @Builder.Default - private final int type = TYPE_METRIC; // 0x4: metrics - - @Getter - private final List metrics; // metrics - - - /** - * Create a Metrics instance from binary data. - *

- * Basic binary layout is: tnGET/signup/statusCodeTTTTTTTTvvvvvvvv - */ - static Metrics from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException, CharacterCodingException - { - val type = buffer.get(offset) & 0xFF; - if (type != TYPE_METRIC) - { - return null; - } - - // extract metrics - val count = buffer.get(++offset) & 0xFF; - if (count > 0) - { - val metrics = new Metric[count]; - ++offset; - int i; - for (i = 0; i < count; ++i) - { - // extract one (path, value) pair - val pathOffset = offset + 1; - val pathLength = buffer.get(offset) & 0xFF; - val tsOffset = pathOffset + pathLength; - val valueOffset = tsOffset + 8; - metrics[i] = Metric.builder() - .path(extractString(buffer, pathOffset, pathLength)) // don't intern, may differ - .ts(buffer.getLong(tsOffset)) - .value(buffer.getDouble(valueOffset)) - .build(); - - offset = valueOffset + 8; - } - - return new Metrics(type, Arrays.asList((i == count) ? metrics : Arrays.copyOf(metrics, i))); - } - else - { - return new Metrics(type, Collections.emptyList()); - } - } - - - @Override - public void to(ByteBuffer buffer) - { - val limit = buffer.limit(); - - // start writing dynamic fields behind static fields first - // encode as many metrics as fit into one message, just drop the rest - int offset = 2; - val countMax = Math.min(metrics.size(), METRIC_COUNT_MAX); - int count; - for (count = 0; count < countMax; ++count) - { - val metric = metrics.get(count); - val pathOffset = offset + 1; - - // optimistically write dynamic part (automatically limited by buffer's limit) - val pathLength = putTruncatedUTF8(buffer, pathOffset, metric.getPath(), STRING_SIZE_MAX); - if (pathLength < metric.getPath().length()) - { - // encoded path size can't be shorter than the length in characters, it just doesn't fit anymore - break; - } - - val tsOffset = pathOffset + pathLength; - val valueOffset = tsOffset + 8; - val nextMetricOffset = valueOffset + 8; - if (nextMetricOffset >= limit) - { - // doesn't fit anymore, sorry - break; - } - - // write static part - buffer.put(offset, (byte) pathLength); - buffer.putLong(tsOffset, metric.getTs()); - buffer.putDouble(valueOffset, metric.getValue()); - - // advance - offset = nextMetricOffset; - } - - buffer.put(0, (byte) type); - buffer.put(1, (byte) count); - - buffer.position(offset); // include dynamic part - } - - - @Override - public int getMaximumSize() - { - return MAXIMUM_SERIALIZED_SIZE; - } - - - @Override - public String toString() - { - return Integer.toHexString(getType()); - } - - - public static class MetricsBuilder - { - private static int calculateMetricSize(Metric metric) - { - return 1 // path length - + Math.min(STRING_SIZE_MAX, Utf8.encodedLength(metric.getPath())) - + 8 // timestamp - + 8; // value - } - - public MetricsBuilder metrics(Queue metrics) - { - int totalSize = 2; // header - - // estimate serialized size - val acceptedMetrics = new Metric[METRIC_COUNT_MAX]; - int i = 0; - for (Metric metric = metrics.peek(); metric != null; metric = metrics.peek()) - { - totalSize += calculateMetricSize(metric); - - // stop if the next metric will definetely not fit anymore - if (totalSize >= MAXIMUM_SERIALIZED_SIZE) - { - break; - } - - acceptedMetrics[i] = metrics.remove(); - ++i; - } - - this.metrics = Arrays.asList(Arrays.copyOf(acceptedMetrics, i)); - - return this; - } - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Payload.java b/src/main/java/net/talpidae/base/insect/message/payload/Payload.java deleted file mode 100644 index 6d1d03d..0000000 --- a/src/main/java/net/talpidae/base/insect/message/payload/Payload.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message.payload; - -import java.nio.ByteBuffer; - - -public abstract class Payload -{ - /** - * Return this payloads unique type ID. - */ - public abstract int getType(); - - public abstract void to(ByteBuffer buffer); - - public abstract int getMaximumSize(); - - public abstract String toString(); -} diff --git a/src/main/java/net/talpidae/base/insect/message/payload/PayloadFactory.java b/src/main/java/net/talpidae/base/insect/message/payload/PayloadFactory.java deleted file mode 100644 index ea46f51..0000000 --- a/src/main/java/net/talpidae/base/insect/message/payload/PayloadFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message.payload; - -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; - - -public final class PayloadFactory -{ - private PayloadFactory() - { - - } - - /** - * Identify and unpack the content stored inside buffer (@offset). - */ - public static Payload unpackPayload(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException, CharacterCodingException - { - Payload payload; - - // probe message types (most frequent first) - if ((payload = Metrics.from(buffer, offset)) != null - || (payload = Mapping.from(buffer, offset)) != null - || (payload = Invalidate.from(buffer, offset)) != null - || (payload = Shutdown.from(buffer, offset)) != null) - { - return payload; - } - - return null; - } - - - /** - * Get maximum buffer size needed to accommodate all known message types. - */ - public static int getMaximumSerializedSize() - { - return Math.max(Metrics.MAXIMUM_SERIALIZED_SIZE, - Math.max(Mapping.MAXIMUM_SERIALIZED_SIZE, - Math.max(Invalidate.MAXIMUM_SERIALIZED_SIZE, Shutdown.MAXIMUM_SERIALIZED_SIZE))); - } -} diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Shutdown.java b/src/main/java/net/talpidae/base/insect/message/payload/Shutdown.java deleted file mode 100644 index d74042f..0000000 --- a/src/main/java/net/talpidae/base/insect/message/payload/Shutdown.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.message.payload; - -import lombok.Builder; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -import java.nio.ByteBuffer; - - -@Slf4j -@Builder -public class Shutdown extends Payload -{ - public static final int MAXIMUM_SERIALIZED_SIZE = 2; - - public static final int TYPE_SHUTDOWN = 0x2; - - public static final int MAGIC = 0x86; - - @Getter - @Builder.Default - private final int type = TYPE_SHUTDOWN; // 0x2: shutdown - - @Getter - @Builder.Default - private final int magic = MAGIC; // magic byte: 0x86 - - - static Shutdown from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException - { - val type = buffer.get(offset) & 0xFF; - if (type != TYPE_SHUTDOWN) - { - return null; - } - - val magic = buffer.get(offset + 1) & 0xFF; - if (magic != MAGIC) - { - log.debug("encountered shutdown payload with invalid magic"); - return null; - } - - return Shutdown.builder() - .type(type) - .magic(magic) - .build(); - } - - - public void to(ByteBuffer buffer) - { - buffer.put((byte) type); - buffer.put((byte) magic); - } - - - @Override - public int getMaximumSize() - { - return MAXIMUM_SERIALIZED_SIZE; - } - - - @Override - public String toString() - { - return Integer.toHexString(getType()); - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/metrics/MetricsSink.java b/src/main/java/net/talpidae/base/insect/metrics/MetricsSink.java deleted file mode 100644 index 2b20ee0..0000000 --- a/src/main/java/net/talpidae/base/insect/metrics/MetricsSink.java +++ /dev/null @@ -1,12 +0,0 @@ -package net.talpidae.base.insect.metrics; - -import lombok.NonNull; - - -public interface MetricsSink -{ - /** - * Form a metric tuple from the specified path and value and forward it to remote listeners. - */ - void forward(@NonNull String path, long timestampMillies, double value); -} diff --git a/src/main/java/net/talpidae/base/insect/metrics/QueuedMetricsSink.java b/src/main/java/net/talpidae/base/insect/metrics/QueuedMetricsSink.java deleted file mode 100644 index a8a51bd..0000000 --- a/src/main/java/net/talpidae/base/insect/metrics/QueuedMetricsSink.java +++ /dev/null @@ -1,117 +0,0 @@ -package net.talpidae.base.insect.metrics; - -import net.talpidae.base.insect.Slave; -import net.talpidae.base.insect.config.SlaveSettings; -import net.talpidae.base.util.performance.Metric; -import net.talpidae.base.util.thread.GeneralScheduler; - -import java.util.ArrayDeque; -import java.util.concurrent.TimeUnit; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -@Slf4j -@Singleton -public class QueuedMetricsSink implements MetricsSink -{ - private static final long MAXIMUM_ENQUEUE_DELAY_NANOS = TimeUnit.SECONDS.toNanos(4); - - private static final int MAXIMUM_QUEUE_LENGTH = 2048; - - /** - * How many metric elements to leave in the metricQueue without immediately trying to send again. - */ - private static final int MAXIMUM_DELAYED_ITEM_COUNT = 30; - - private final Slave slave; - - private final ArrayDeque metricQueue = new ArrayDeque<>(1024); - - private final String pathPrefix; - - private long lastEnqueueNanos = 0L; - - - @Inject - public QueuedMetricsSink(Slave slave, SlaveSettings slaveSettings, GeneralScheduler scheduler) - { - this.slave = slave; - this.pathPrefix = "/" + slaveSettings.getName(); - - scheduler.scheduleWithFixedDelay(this::sendAllMetrics, MAXIMUM_ENQUEUE_DELAY_NANOS, MAXIMUM_ENQUEUE_DELAY_NANOS, TimeUnit.NANOSECONDS); - } - - - /** - * Form a metric tuple from the specified path and value and enqueue it for forwarding. - */ - @Override - public void forward(@NonNull String path, long timestampMillies, double value) - { - val absolutePath = pathPrefix + path; - val metric = Metric.builder() - .path(absolutePath.replace("//", "/")) - .ts(timestampMillies) - .value(value) - .build(); - - synchronized (metricQueue) - { - if (metricQueue.size() < MAXIMUM_QUEUE_LENGTH) - { - metricQueue.addLast(metric); - } - } - } - - - private int getQueueLength() - { - synchronized (metricQueue) - { - return metricQueue.size(); - } - } - - - private boolean isQueueEmpty() - { - synchronized (metricQueue) - { - return metricQueue.isEmpty(); - } - } - - - private void sendAllMetrics() - { - val now = System.nanoTime(); - if (lastEnqueueNanos < now - MAXIMUM_ENQUEUE_DELAY_NANOS - && !isQueueEmpty()) - { - lastEnqueueNanos = now; - - try - { - do - { - synchronized (metricQueue) - { - slave.forwardMetrics(metricQueue); - } - } - while (getQueueLength() >= MAXIMUM_DELAYED_ITEM_COUNT); - } - catch (Throwable t) - { - log.error("failed to forward metrics", t); - } - } - } -} diff --git a/src/main/java/net/talpidae/base/insect/state/InsectState.java b/src/main/java/net/talpidae/base/insect/state/InsectState.java deleted file mode 100644 index 14b1b30..0000000 --- a/src/main/java/net/talpidae/base/insect/state/InsectState.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (C) 2017 Jonas Zeiger - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.talpidae.base.insect.state; - -import java.net.InetSocketAddress; -import java.util.Set; - -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.Singular; -import lombok.ToString; - - -/** - * Things the queen knows about each insect. - */ -@Getter -@EqualsAndHashCode -@ToString -@Builder -@RequiredArgsConstructor -public class InsectState implements ServiceState -{ - /** - * InetSocketAddress representing host and port. - */ - @Getter - private final InetSocketAddress socketAddress; - - /** - * This service instance's unique name. - */ - @Getter - private final transient String name; - - /** - * Service monotonic clock timestamp epoch (remote). - */ - @Getter - private final transient long timestampEpochRemote; - - /** - * Service monotonic clock timestamp epoch (local). - */ - @Getter - private final transient long timestampEpochLocal; - - /** - * Service monotonic clock timestamp (relative to timestampEpoch). - */ - @Getter - private final transient long timestamp; - - /** - * Routes of other services this service depends on. - */ - @Getter - @Singular - private final transient Set dependencies; - - - /** - * Is this insect out-of-service (ie. won't not be propagated to slaves)? - */ - @Getter - private final transient boolean isOutOfService; - - - public static class InsectStateBuilder - { - public InsectStateBuilder newEpoch(long nowNanos, long remoteTimestampEpoch) - { - return timestampEpochLocal(nowNanos) - .timestampEpochRemote(remoteTimestampEpoch) - .timestamp(nowNanos); - } - } -} \ No newline at end of file diff --git a/src/main/java/net/talpidae/base/insect/state/ServiceState.java b/src/main/java/net/talpidae/base/insect/state/ServiceState.java deleted file mode 100644 index 2e90223..0000000 --- a/src/main/java/net/talpidae/base/insect/state/ServiceState.java +++ /dev/null @@ -1,11 +0,0 @@ -package net.talpidae.base.insect.state; - -import java.net.InetSocketAddress; - - -public interface ServiceState -{ - long getTimestamp(); - - InetSocketAddress getSocketAddress(); -} diff --git a/src/main/java/net/talpidae/base/server/UndertowServer.java b/src/main/java/net/talpidae/base/server/UndertowServer.java index 1eb09dc..7b80531 100644 --- a/src/main/java/net/talpidae/base/server/UndertowServer.java +++ b/src/main/java/net/talpidae/base/server/UndertowServer.java @@ -46,10 +46,8 @@ import net.talpidae.base.event.ServerShutdown; import net.talpidae.base.event.ServerStarted; import net.talpidae.base.event.Shutdown; -import net.talpidae.base.insect.metrics.MetricsSink; import net.talpidae.base.server.cors.CORSFilter; -import net.talpidae.base.server.performance.MemoryMetricCollector; -import net.talpidae.base.server.performance.MetricsHandler; +import net.talpidae.base.server.logging.RequestLoggingHandler; import net.talpidae.base.util.ssl.SslContextFactory; import net.talpidae.base.util.thread.GeneralScheduler; import org.xnio.OptionMap; @@ -89,8 +87,6 @@ public class UndertowServer implements Server private final ServerEndpointConfig.Configurator defaultServerEndpointConfigurator; - private final MetricsSink metricsSink; - private final GeneralScheduler scheduler; private Undertow server = null; @@ -106,7 +102,6 @@ public UndertowServer(EventBus eventBus, Optional> annotatedEndpointClass, Optional programmaticEndpointConfig, Optional defaultServerEndpointConfigurator, - Optional metricsSink, GeneralScheduler scheduler) { this.serverConfig = serverConfig; @@ -114,7 +109,6 @@ public UndertowServer(EventBus eventBus, this.annotatedEndpointClass = annotatedEndpointClass.orElse(null); this.programmaticEndpointConfig = programmaticEndpointConfig.orElse(null); this.defaultServerEndpointConfigurator = defaultServerEndpointConfigurator.orElse(null); - this.metricsSink = metricsSink.orElse(null); this.eventBus = eventBus; this.scheduler = scheduler; @@ -392,7 +386,7 @@ else if (programmaticEndpointConfig != null) if (serverConfig.isLoggingFeatureEnabled()) { // enable extensive logging (make sure to disable for production) - rootHandler = Handlers.requestDump(rootHandler); + rootHandler = new RequestLoggingHandler(rootHandler); } if (serverConfig.getCorsOriginPattern() != null) @@ -400,15 +394,6 @@ else if (programmaticEndpointConfig != null) rootHandler = new CORSFilter(rootHandler, serverConfig); } - if (metricsSink != null) - { - // enable metrics - rootHandler = new MetricsHandler(rootHandler, metricsSink); - - // TODO Put this somewhere else. - scheduler.scheduleWithFixedDelay(new MemoryMetricCollector(metricsSink), MEMORY_METRICS_INTERVAL_SECONDS, MEMORY_METRICS_INTERVAL_SECONDS, TimeUnit.SECONDS); - } - // finally, enhance handler with graceful shutdown capability builder.setHandler(this.rootHandler = Handlers.gracefulShutdown(rootHandler)); } diff --git a/src/main/java/net/talpidae/base/server/logging/RequestLoggingHandler.java b/src/main/java/net/talpidae/base/server/logging/RequestLoggingHandler.java new file mode 100644 index 0000000..93c3545 --- /dev/null +++ b/src/main/java/net/talpidae/base/server/logging/RequestLoggingHandler.java @@ -0,0 +1,138 @@ +package net.talpidae.base.server.logging; + +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.Headers; +import io.undertow.util.HttpString; +import lombok.extern.java.Log; +import lombok.val; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; + + +/** + * Similar to RequestDumpingHandler but avoids DNS reverse lookups caused by InetSocketAddress.getHostName(). + */ +@Log +public class RequestLoggingHandler implements HttpHandler +{ + private static final HttpString USER_AGENT = new HttpString("user-agent"); + + private static final HttpString AUTHORIZATION = new HttpString("authorization"); + + private final HttpHandler next; + + public RequestLoggingHandler(final HttpHandler next) + { + this.next = next; + } + + @Override + public void handleRequest(final HttpServerExchange exchange) throws Exception + { + final StringBuilder sb = new StringBuilder(); + + val forwardedFor = exchange.getRequestHeaders().getFirst(Headers.X_FORWARDED_FOR); + if (forwardedFor != null) + { + sb.append(forwardedFor); + } + else + { + sb.append(exchange.getSourceAddress().getHostString()); + } + + sb.append(' '); + sb.append(exchange.getProtocol()); + sb.append(' '); + sb.append(exchange.getRequestMethod()); + sb.append(' '); + sb.append(exchange.getRequestURI()); + if (!"".equals(exchange.getQueryString())) + { + sb.append('?'); + sb.append(exchange.getQueryString()); + } + + exchange.addExchangeCompleteListener((completedExchange, nextListener) -> { + + sb.append(" -> status="); + sb.append(completedExchange.getStatusCode()); + sb.append(" tx="); + sb.append(completedExchange.getResponseBytesSent()); + + val userAgent = completedExchange.getRequestHeaders().getFirst(USER_AGENT); + if (userAgent != null) + { + sb.append(" ua="); + sb.append(userAgent); + } + + // extra a possible "sub" (subject) field from an "Authoriztation: Bearer" JWT token, if one exists + val authorization = completedExchange.getRequestHeaders().getFirst(USER_AGENT); + if (authorization != null) + { + // TODO: Also support logging Basic Auth username and other schemes/token formats + val subject = extractJwtSub(authorization); + if (subject != null) + { + sb.append(" sub="); + sb.append(subject); + } + } + + nextListener.proceed(); + log.info(sb.toString()); + }); + + next.handleRequest(exchange); + } + + + private static String extractJwtSub(String authorization) + { + if (authorization != null) + { + int begin = authorization.indexOf("Bearer"); + if (begin >= 0) + { + begin = authorization.indexOf('.', begin + 6); + if (begin > 0) + { + int end = authorization.indexOf('.', begin + 1); + if (end > begin) + { + try + { + String jwt = new String(Base64.getDecoder().decode(authorization.substring(begin + 1, end)), StandardCharsets.UTF_8); + begin = jwt.indexOf("\"sub\""); + if (begin > 0) + { + begin = jwt.indexOf(":", begin + 5); + if (begin > 0) + { + begin = jwt.indexOf('\"', begin + 1); + if (begin > 0) + { + end = jwt.indexOf('\"', begin + 1); + if (end > begin) + { + return jwt.substring(begin + 1, end); + } + } + } + } + } + catch (RuntimeException e) + { + // ignore, just return null + } + } + } + } + } + + return null; + } +} diff --git a/src/main/java/net/talpidae/base/server/performance/MemoryMetricCollector.java b/src/main/java/net/talpidae/base/server/performance/MemoryMetricCollector.java deleted file mode 100644 index 136b1f2..0000000 --- a/src/main/java/net/talpidae/base/server/performance/MemoryMetricCollector.java +++ /dev/null @@ -1,48 +0,0 @@ -package net.talpidae.base.server.performance; - - -import net.talpidae.base.insect.metrics.MetricsSink; - -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; - -import lombok.extern.slf4j.Slf4j; -import lombok.val; - - -/** - * Collect heap and non-heap memory statistics and forward them using the specified MetricsSink instance. - */ -@Slf4j -public class MemoryMetricCollector implements Runnable -{ - private final MetricsSink metricsSink; - - private final MemoryMXBean memoryMXBean; - - - public MemoryMetricCollector(MetricsSink metricsSink) - { - this.metricsSink = metricsSink; - this.memoryMXBean = ManagementFactory.getMemoryMXBean(); - } - - - @Override - public void run() - { - try - { - val ts = System.currentTimeMillis(); - val heapCommitted = ((double) memoryMXBean.getHeapMemoryUsage().getCommitted()) / 1024 / 1024; // MBytes - val nonHeapCommitted = ((double) memoryMXBean.getNonHeapMemoryUsage().getCommitted()) / 1024 / 1024; - - metricsSink.forward("/heapCommitted", ts, heapCommitted); - metricsSink.forward("/nonHeapCommitted", ts, nonHeapCommitted); - } - catch (Throwable t) - { - log.error("memory metrics collection failed", t); - } - } -} diff --git a/src/main/java/net/talpidae/base/server/performance/MetricsHandler.java b/src/main/java/net/talpidae/base/server/performance/MetricsHandler.java deleted file mode 100644 index f51f50a..0000000 --- a/src/main/java/net/talpidae/base/server/performance/MetricsHandler.java +++ /dev/null @@ -1,105 +0,0 @@ -package net.talpidae.base.server.performance; - -import io.undertow.server.HttpHandler; -import io.undertow.server.HttpServerExchange; -import io.undertow.util.AttachmentKey; -import lombok.Getter; -import lombok.val; -import net.talpidae.base.insect.metrics.MetricsSink; - -import java.util.concurrent.TimeUnit; - - -public class MetricsHandler implements HttpHandler -{ - private static final AttachmentKey EXCHANGE_METRIC = AttachmentKey.create(ExchangeMetric.class); - - private final HttpHandler next; - - private final MetricsSink metricsSink; - - - public MetricsHandler(HttpHandler next, MetricsSink metricsSink) - { - this.next = next; - this.metricsSink = metricsSink; - } - - - @Override - public void handleRequest(HttpServerExchange exchange) throws Exception - { - if (!exchange.isComplete()) - { - exchange.putAttachment(EXCHANGE_METRIC, new ExchangeMetric(exchange.getRelativePath())); - - exchange.addExchangeCompleteListener((completedExchange, nextListener) -> - { - final Runnable finishRequest = () -> - { - val exchangeMetric = completedExchange.getAttachment(EXCHANGE_METRIC); - - exchangeMetric.complete(completedExchange); - forwardRequestMetric(exchangeMetric); - - nextListener.proceed(); - }; - - if (completedExchange.isInIoThread()) - { - completedExchange.dispatch(finishRequest); - } - else - { - finishRequest.run(); - } - }); - } - next.handleRequest(exchange); - } - - - private void forwardRequestMetric(ExchangeMetric exchangeMetric) - { - val ts = exchangeMetric.getTimestampMillies(); - metricsSink.forward(exchangeMetric.getPath() + "/duration", ts, exchangeMetric.getDuration()); - metricsSink.forward(exchangeMetric.getPath() + "/status", ts, exchangeMetric.getStatusCode()); - } - - - private static class ExchangeMetric - { - private static final double NANOSECONDS_TO_FRACTIONAL_SECONDS_MULTIPLIER = 1.0 / TimeUnit.SECONDS.toNanos(1); - - private final long startNanos; - - @Getter - private final long timestampMillies; - - @Getter - private final String path; - - @Getter - private double duration; - - @Getter - private double statusCode; - - - private ExchangeMetric(String path) - { - this.timestampMillies = System.currentTimeMillis(); - this.startNanos = System.nanoTime(); - this.path = path; - } - - - private void complete(HttpServerExchange exchange) - { - val endNanos = System.nanoTime(); - - duration = (double) (endNanos - startNanos) * NANOSECONDS_TO_FRACTIONAL_SECONDS_MULTIPLIER; - statusCode = exchange.getStatusCode(); - } - } -} \ No newline at end of file