outBoundMessages = new ConcurrentLinkedDeque<>();
-
- @Getter(AccessLevel.PROTECTED)
- protected AtomicXorShiftRandom random = new AtomicXorShiftRandom();
-
- @Getter
- private long pulseDelayMillies = 0L;
-
- /**
- * Highest sane timestamp value encountered so far.
- *
- * This timestamp is used for automatic self-preservation.
- */
- @Getter
- @Setter(AccessLevel.PROTECTED)
- private long maximumTimestamp;
-
-
- Insect(S settings, boolean onlyTrustedRemotes)
- {
- super(InsectMessage::new, settings);
-
- this.settings = settings;
- this.onlyTrustedRemotes = onlyTrustedRemotes;
-
- this.remoteOnLocalHost = settings.getRemotes().stream()
- .map(InetSocketAddress::getAddress)
- .anyMatch(NetworkUtil::isLocalAddress);
-
- this.pulseDelayCutoff = TimeUnit.MILLISECONDS.toNanos(settings.getPulseDelay() + (settings.getPulseDelay() / 2));
- }
-
- /**
- * Get an empty route.
- */
- protected InsectCollection emptyRoute()
- {
- return EMPTY_ROUTE;
- }
-
-
- @Override
- public void run()
- {
- pulseDelayMillies = getSettings().getPulseDelay();
- routeToInsects.clear();
- outBoundMessages.clear();
-
- try
- {
- // run MessageExchange loop
- super.run();
- }
- catch (Exception e)
- {
- log.error("insect shutdown because of critical error", e);
- }
- }
-
-
- /**
- * Override to perform actions based on a pulse at or below the pulse delay.
- *
- * @param nowNanos The current monotonic clock value as returned by System.nanoTime().
- * @return Maximum time to wait for next pulse in nanoseconds.
- */
- protected long handlePulse(long nowNanos)
- {
- // find and remove all timed out instances
- val deadlineNanos = maximumTimestamp - (pulseDelayCutoff * 2);
- val limitRemovedCount = Math.max(1, routeToInsects.size() / 5); // remove max 20% of instances in one go
-
- int removed = 0;
- for (val alternatives : routeToInsects.values())
- {
- removed += alternatives.truncateOlderThan(deadlineNanos, this::handleTimeout);
- if (removed >= limitRemovedCount)
- break;
- }
-
- // reduce batch size a little by splitting up work over time
- return pulseDelayMillies / 4;
- }
-
-
- @Override
- public void close()
- {
- prepareShutdown();
-
- super.close();
- }
-
-
- /**
- * Override if additional actions are required before shutting down.
- */
- protected void prepareShutdown()
- {
- }
-
-
- /**
- * Override to implement additional logic after a mapping message has been handled by the default handler.
- */
- protected void postHandleMapping(InsectState state, Mapping mapping, boolean isNewMapping, boolean isDependencyMapping)
- {
-
- }
-
-
- /**
- * Override to do something when new dependencies are published.
- */
- protected void handleDependenciesChanged(InsectState state)
- {
-
- }
-
-
- /**
- * Override to do something when an insect timed out and got removed from the pool.
- */
- protected void handleTimeout(InsectState timedOutState)
- {
-
- }
-
-
- /**
- * Override to implement remote shutdown.
- */
- protected void handleShutdown()
- {
-
- }
-
-
- /**
- * Override to implement invalidate (slave needs to send critical information again).
- */
- protected void handleInvalidate()
- {
-
- }
-
-
- /**
- * Override to handle metrics (usually not relayed).
- */
- protected void handleMetrics(Metrics metrics)
- {
-
- }
-
-
- /**
- * Pack and send a message with the specified payload to the given destination.
- */
- protected void addMessage(InetSocketAddress destination, Payload payload)
- {
- val queueControl = getQueueControl();
- if (queueControl != null)
- {
- // general case:
- // message was created from within processMessages() context (same thread)
- ((InsectMessage) queueControl.addOutbound(destination)).setPayload(payload);
- }
- else
- {
- // rare case:
- // queue outbound message from other thread before construction in processMessages()
- outBoundMessages.offerLast(new OutBoundMessage(destination, payload));
-
- // schedule call to processMessages() which actually sends our message
- wakeup();
- }
- }
-
-
- private boolean checkSenderTrust(InetSocketAddress remote)
- {
- return (remote.getAddress().isLoopbackAddress() && remoteOnLocalHost)
- || getSettings().getRemotes().contains(remote);
- }
-
-
- private void handleMessage(long nowNanos, InsectMessage message)
- {
- val remote = message.getRemoteAddress();
- val isTrustedServer = remote.getAddress().isLoopbackAddress() || getSettings().getRemotes().contains(remote);
- if (isTrustedServer || !onlyTrustedRemotes)
- {
- try
- {
- val payload = message.getPayload();
- switch (payload.getType())
- {
- case Mapping.TYPE_MAPPING:
- {
- // validate client address (avoid message spoofing)
- if (isTrustedServer || ((Mapping) payload).isAuthorative(remote))
- {
- handleMapping(nowNanos, (Mapping) payload);
- }
- else
- {
- log.warn("possible spoofing: remote {} not authorized to send message: {}", remote, payload);
- }
- break;
- }
-
- case Invalidate.TYPE_INVALIDATE:
- {
- log.debug("received invalidate message from remote {}", remote);
- handleInvalidate();
- break;
- }
-
- case Shutdown.TYPE_SHUTDOWN:
- {
- log.debug("received shutdown message from remote {}", remote);
- handleShutdown();
- break;
- }
-
- case Metrics.TYPE_METRIC:
- {
- handleMetrics((Metrics) payload);
- }
- }
- }
- catch (CharacterCodingException | IndexOutOfBoundsException e)
- {
- log.warn("received malformed message from: {}", remote);
- }
- }
- else
- {
- log.warn("rejected message from untrusted source: {}", remote);
- }
- }
-
- private void handleMapping(long nowNanos, Mapping mapping)
- {
- val alternatives = routeToInsects.computeIfAbsent(mapping.getRoute(), r -> new InsectCollection(pulseDelayCutoff));
-
- // do we have an existing entry for this slave?
- val nextState = alternatives.compute(mapping.getSocketAddress(), state ->
- {
- val remoteTimestamp = mapping.getTimestamp();
- val nextStateBuilder = InsectState.builder()
- .name(mapping.getName());
-
- final InetSocketAddress socketAddress;
- if (state != null)
- {
- // merge new dependency with those already known
- nextStateBuilder.dependencies(state.getDependencies());
-
- // out-of-service is sticky
- nextStateBuilder.isOutOfService(state.isOutOfService());
-
- // do not keep resolving the same host:port combo
- val oldHost = state.getSocketAddress().getHostString();
- val oldPort = state.getSocketAddress().getPort();
- socketAddress = (Objects.equals(mapping.getHost(), oldHost) && oldPort == mapping.getPort())
- ? state.getSocketAddress()
- : null;
-
- // perform timestamp calculation magic
- // the point is to use the heartbeat timestamp from the REMOTE
- // as a measure of latency/CPU load on that service instance
- val remoteEpoch = state.getTimestampEpochRemote();
- val localEpoch = state.getTimestampEpochLocal();
- val adjustedTimestamp = localEpoch + (remoteTimestamp - remoteEpoch);
- val previousAdjustedTimestamp = state.getTimestamp();
- val pulseDelayNanos = TimeUnit.MILLISECONDS.toNanos(getPulseDelayMillies());
- if (adjustedTimestamp < previousAdjustedTimestamp || adjustedTimestamp > (previousAdjustedTimestamp + pulseDelayNanos + (pulseDelayNanos >>> 1)))
- {
- // missed heartbeat package or service restarted, need to reset epoch
- nextStateBuilder.newEpoch(nowNanos, remoteTimestamp);
- }
- else
- {
- nextStateBuilder.timestampEpochLocal(localEpoch)
- .timestampEpochRemote(remoteEpoch)
- .timestamp(localEpoch + (remoteTimestamp - remoteEpoch));
- }
- }
- else
- {
- socketAddress = null;
-
- nextStateBuilder.newEpoch(nowNanos, remoteTimestamp);
- }
-
- // resolve once
- nextStateBuilder.socketAddress((socketAddress != null) ? socketAddress : new InetSocketAddress(mapping.getHost(), mapping.getPort()));
-
- val newDependency = mapping.getDependency();
- if (!newDependency.isEmpty())
- {
- nextStateBuilder.dependency(newDependency);
- }
-
- // InsectState is key and value at the same time
- return nextStateBuilder.build();
- });
-
- // let descendants add more actions
- final boolean isNewMapping;
- if (nextState != null)
- {
- maximumTimestamp = nextState.getTimestamp();
- isNewMapping = mapping.getTimestamp() == nextState.getTimestampEpochRemote();
- }
- else
- {
- isNewMapping = false;
- }
-
- val isDependencyMapping = !Strings.isNullOrEmpty(mapping.getDependency());
- postHandleMapping(nextState, mapping, isNewMapping, isDependencyMapping);
-
- if (isNewMapping || isDependencyMapping)
- {
- // inform about changed dependencies
- handleDependenciesChanged(nextState);
- }
- }
-
- /**
- * Handle inbound and send outbound messages.
- */
- @Override
- protected long processMessages(MessageQueueControl control)
- {
- // add already queued up messages
- OutBoundMessage outBoundMessage;
- while ((outBoundMessage = outBoundMessages.poll()) != null)
- {
- try
- {
- addMessage(outBoundMessage.getDestination(), outBoundMessage.getPayload());
- }
- catch (Throwable e)
- {
- log.error("error adding outbound message of type {} for remote {}: {}", outBoundMessage.getClass().getSimpleName(), outBoundMessage.getDestination().getHostString(), e.getMessage());
- }
- }
-
- val nowNanos = System.nanoTime();
- InsectMessage inBoundMessage;
- while ((inBoundMessage = control.pollInbound()) != null)
- {
- try
- {
- handleMessage(nowNanos, inBoundMessage);
- }
- catch (Throwable e)
- {
- log.error("error handling message from remote {}: {}: {}", inBoundMessage.getRemoteAddress(), e.getClass().getSimpleName(), e.getMessage());
- }
- }
-
- // call handlePulse() as soon as the last queued inbound message has been processed
- return handlePulse(nowNanos);
- }
-
-
- /**
- * Initialize the specified route with and empty InsectCollection.
- */
- protected InsectCollection initializeRoute(String route)
- {
- return routeToInsects.computeIfAbsent(route, (key) -> new InsectCollection(pulseDelayCutoff));
- }
-
-
- @FunctionalInterface
- protected interface InsectStateUpdateFunction extends Function
- {
-
- }
-
-
- /**
- * Holder for queued up outbound message content.
- */
- @Value
- private static class OutBoundMessage
- {
- private final InetSocketAddress destination;
-
- private final Payload payload;
- }
-
-
- private static final class InsectPool
- {
- private static final InsectPool EMPTY = new InsectPool(new InsectState[0], 0);
-
- private final InsectState[] insects;
-
- private final List allInsects;
-
- private final List activeInsects;
-
-
- private InsectPool(InsectState[] insects, int activeEndIndex)
- {
- this.insects = insects;
- this.allInsects = Arrays.asList(insects);
- this.activeInsects = allInsects.subList(0, activeEndIndex);
- }
- }
-
-
- protected static class InsectCollection
- {
- private static final int ACTION_REPLACE = 0;
-
- private static final int ACTION_ADD = 1;
-
- private static final int ACTION_REMOVE = -1;
-
- private final AtomicReference insects = new AtomicReference<>(InsectPool.EMPTY);
-
- private final long pulseDelayCutoff;
-
-
- private InsectCollection(long pulseDelayCutoff)
- {
- this.pulseDelayCutoff = pulseDelayCutoff;
- }
-
-
- private static int indexOf(InsectState[] array, InetSocketAddress socketAddress)
- {
- for (int i = 0; i < array.length; ++i)
- {
- val otherAddress = array[i].getSocketAddress();
- if (socketAddress.getPort() == otherAddress.getPort())
- {
- val hostString = socketAddress.getHostString();
- val otherHostString = otherAddress.getHostString();
- if (Objects.equals(hostString, otherHostString))
- {
- return i;
- }
- }
- }
-
- return -1;
- }
-
-
- public List getActive()
- {
- return insects.get().activeInsects;
- }
-
-
- public List getAll()
- {
- return insects.get().allInsects;
- }
-
-
- void clear()
- {
- insects.set(InsectPool.EMPTY);
- }
-
-
- /**
- * Truncate the collection, removing all insects whose timestamp is older than the specified deadline.
- *
- * @return Number of timed-out instances that have been removed.
- */
- int truncateOlderThan(long deadlineNanos, Consumer removedConsumer)
- {
- InsectPool pool;
- InsectPool nextPool;
-
- int count = 0;
- do
- {
- pool = insects.get(); // acquire
- val activeEndIndex = pool.activeInsects.size();
- val existing = pool.insects;
-
- int i = existing.length - 1;
- while (i >= 0 && existing[i].getTimestamp() < deadlineNanos)
- {
- --i;
- }
-
- val nextLength = i + 1;
- count = existing.length - nextLength;
- if (nextLength == existing.length)
- {
- // nothing to do
- break;
- }
- else if (nextLength > 0)
- {
- val next = new InsectState[nextLength];
- System.arraycopy(existing, 0, next, 0, Math.min(existing.length, nextLength));
-
- nextPool = new InsectPool(next, activeEndIndex);
- }
- else
- {
- nextPool = InsectPool.EMPTY;
- }
- }
- while (!insects.compareAndSet(pool, nextPool));
-
- // notify about removed instances
- for (int i = pool.insects.length - 1; i >= pool.insects.length - count; --i)
- {
- removedConsumer.accept(pool.insects[i]);
- }
-
- return count;
- }
-
-
- /**
- * Updates this collection of insects.
- *
- * @param key The address of the insect to compute.
- * @param updateFunction Function that will create a new immutable InsectState value, based on the previous value.
- * @return The new value or null if none exists (the previous item has been removed or didn't exist in the first place).
- */
- InsectState compute(InetSocketAddress key, InsectStateUpdateFunction updateFunction)
- {
- InsectPool pool;
- InsectPool nextPool;
- InsectState nextState;
- do
- {
- pool = insects.get(); // acquire
-
- val existing = pool.insects;
- val index = indexOf(existing, key);
- nextState = updateFunction.apply(index >= 0 ? existing[index] : null);
-
- final int action;
- if (index >= 0)
- {
- if (nextState != null)
- {
- action = ACTION_REPLACE;
- }
- else
- {
- action = ACTION_REMOVE;
- }
- }
- else if (nextState != null)
- {
- // add
- action = ACTION_ADD;
- }
- else
- {
- // no-op
- return null;
- }
-
- val nextLength = existing.length + action;
- if (nextLength > 0)
- {
- val next = new InsectState[nextLength];
- System.arraycopy(existing, 0, next, 0, Math.min(existing.length, nextLength));
- next[action == ACTION_ADD ? existing.length : index] = (action != ACTION_REMOVE) ? nextState : existing[existing.length - 1];
-
- // copy of last array has been updated now
-
- // sort by timestamp descending (to allow faster lookup by slave)
- Arrays.sort(next, (s1, s2) -> -Long.compare(s1.getTimestamp(), s2.getTimestamp()));
-
- // find first timed-out insect
- val timestampCutOff = next[0].getTimestamp() - pulseDelayCutoff;
- int i;
- for (i = 1; i < next.length; ++i)
- {
- if (next[i].getTimestamp() < timestampCutOff)
- {
- // cutoff reached
- break;
- }
- }
-
- nextPool = new InsectPool(next, i);
- }
- else
- {
- nextPool = InsectPool.EMPTY;
- }
- }
- while (!insects.compareAndSet(pool, nextPool));
-
- return nextState;
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/InsectModule.java b/src/main/java/net/talpidae/base/insect/InsectModule.java
deleted file mode 100644
index 94b0798..0000000
--- a/src/main/java/net/talpidae/base/insect/InsectModule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.multibindings.OptionalBinder;
-
-import net.talpidae.base.insect.config.DefaultQueenSettings;
-import net.talpidae.base.insect.config.DefaultSlaveSettings;
-import net.talpidae.base.insect.config.QueenSettings;
-import net.talpidae.base.insect.config.SlaveSettings;
-import net.talpidae.base.insect.metrics.MetricsSink;
-
-
-public class InsectModule extends AbstractModule
-{
- @Override
- protected void configure()
- {
- OptionalBinder.newOptionalBinder(binder(), QueenSettings.class).setDefault().to(DefaultQueenSettings.class);
- OptionalBinder.newOptionalBinder(binder(), Queen.class).setDefault().to(AsyncQueen.class);
-
- OptionalBinder.newOptionalBinder(binder(), SlaveSettings.class).setDefault().to(DefaultSlaveSettings.class);
- OptionalBinder.newOptionalBinder(binder(), Slave.class).setDefault().to(AsyncSlave.class);
-
- OptionalBinder.newOptionalBinder(binder(), MetricsSink.class);
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/InsectWorker.java b/src/main/java/net/talpidae/base/insect/InsectWorker.java
deleted file mode 100644
index 680c597..0000000
--- a/src/main/java/net/talpidae/base/insect/InsectWorker.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect;
-
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-import java.util.concurrent.TimeUnit;
-
-import static java.lang.Thread.State.TERMINATED;
-
-
-@Slf4j
-class InsectWorker extends Thread
-{
- private static final long WORKER_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(5000L);
-
-
- private InsectWorker(Runnable task)
- {
- super(task);
- }
-
- public static InsectWorker start(CloseableRunnable task, String name)
- {
- val worker = new InsectWorker(task);
- worker.setPriority(Thread.MIN_PRIORITY);
- worker.setName(name);
-
- val before = System.nanoTime();
- val timeout = before + WORKER_TIMEOUT;
- worker.start();
- try
- {
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (task)
- {
- while (!task.isRunning())
- {
- val remaining = System.nanoTime() - timeout;
- if (remaining <= 0)
- break;
-
- task.wait(remaining);
- }
- }
-
- log.debug("worker {} started within {}ms", name, Math.max(0L, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - before)));
- }
- catch (InterruptedException e)
- {
- log.warn("wait for startup of {} was interrupted", name);
- }
-
- return worker;
- }
-
- public boolean shutdown()
- {
- try
- {
- interrupt();
- join(WORKER_TIMEOUT);
- }
- catch (InterruptedException e)
- {
- interrupt();
- }
-
- val isTerminated = getState() == TERMINATED;
- if (!isTerminated)
- {
- log.warn("failed to stop worker {}", getName());
- }
-
- return isTerminated;
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/Queen.java b/src/main/java/net/talpidae/base/insect/Queen.java
deleted file mode 100644
index 53936b7..0000000
--- a/src/main/java/net/talpidae/base/insect/Queen.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect;
-
-import net.talpidae.base.insect.state.InsectState;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.stream.Stream;
-
-
-public interface Queen extends CloseableRunnable
-{
- /**
- * Initialize the state before calling run().
- */
- void initializeInsectState(Stream> stateStream);
-
-
- /**
- * Get a (live) stream of all current service state.
- */
- Stream getLiveInsectState();
-
- /**
- * Send a shutdown request to a slave.
- */
- void sendShutdown(InetSocketAddress remote);
-
- /**
- * Set a services out-of-service flag.
- */
- void setIsOutOfService(String route, InetSocketAddress socketAddress, boolean isOutOfService);
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/Slave.java b/src/main/java/net/talpidae/base/insect/Slave.java
deleted file mode 100644
index 6e4656e..0000000
--- a/src/main/java/net/talpidae/base/insect/Slave.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect;
-
-import net.talpidae.base.insect.state.ServiceState;
-import net.talpidae.base.util.performance.Metric;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Queue;
-
-
-public interface Slave extends CloseableRunnable
-{
- /**
- * Try to find a service for route, register route as a dependency and block in case it isn't available immediately.
- */
- InetSocketAddress findService(String route) throws InterruptedException;
-
- /**
- * Try to find a service for route, register route as a dependency and block in case it isn't available immediately.
- *
- * @return Address of discovered service if one was discovered before a timeout occurred, null otherwise.
- */
- InetSocketAddress findService(String route, long timeoutMillies) throws InterruptedException;
-
- /**
- * Return all known services for route, register route as a dependency and block in case there are none available immediately.
- *
- * @return Discovered services if any were discovered before a timeout occurred, null otherwise.
- */
- List extends ServiceState> findServices(String route, long timeoutMillies) throws InterruptedException;
-
-
- /**
- * Send some of the specified metrics from the front of the queue.
- */
- void forwardMetrics(Queue metricQueue);
-}
diff --git a/src/main/java/net/talpidae/base/insect/SyncQueen.java b/src/main/java/net/talpidae/base/insect/SyncQueen.java
deleted file mode 100644
index f022717..0000000
--- a/src/main/java/net/talpidae/base/insect/SyncQueen.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-
-import net.talpidae.base.event.ServerShutdown;
-import net.talpidae.base.insect.config.QueenSettings;
-import net.talpidae.base.insect.message.payload.Invalidate;
-import net.talpidae.base.insect.message.payload.Mapping;
-import net.talpidae.base.insect.message.payload.Shutdown;
-import net.talpidae.base.insect.state.InsectState;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-
-@Singleton
-@Slf4j
-public class SyncQueen extends Insect implements Queen
-{
- @Getter(AccessLevel.PROTECTED)
- private final EventBus eventBus;
-
-
- @Inject
- public SyncQueen(QueenSettings settings, EventBus eventBus)
- {
- super(settings, false);
-
- this.eventBus = eventBus;
-
- eventBus.register(this);
- }
-
- private static Mapping createMappingFromState(InsectState state, String route)
- {
- return Mapping.builder()
- .name(state.getName())
- .host(state.getSocketAddress().getHostString())
- .port(state.getSocketAddress().getPort())
- .timestamp(state.getTimestamp())
- .route(route)
- .socketAddress(state.getSocketAddress())
- .build();
- }
-
- @Override
- public void initializeInsectState(Stream> stateStream)
- {
- if (!isRunning())
- {
- stateStream.forEach(entry ->
- {
- initializeRoute(entry.getKey())
- .compute(entry.getValue().getSocketAddress(), state ->
- (state != null) ? state : entry.getValue()
- );
- });
- }
- else
- {
- throw new IllegalStateException("queen is already running");
- }
- }
-
- /**
- * Get a (live) stream of all current service state.
- */
- @Override
- public Stream getLiveInsectState()
- {
- return getRouteToInsects().values()
- .stream()
- .map(InsectCollection::getAll)
- .flatMap(Collection::stream);
- }
-
- /**
- * Send a shutdown request to a slave.
- */
- @Override
- public void sendShutdown(InetSocketAddress remote)
- {
- val shutdown = (Shutdown) Shutdown.builder()
- .type(Shutdown.TYPE_SHUTDOWN)
- .magic(Shutdown.MAGIC)
- .build();
-
- addMessage(remote, shutdown);
- }
-
- @Override
- public void setIsOutOfService(String route, InetSocketAddress socketAddress, boolean isOutOfService)
- {
- getRouteToInsects().getOrDefault(route, emptyRoute())
- .compute(socketAddress, state ->
- (state != null) ?
- // copy everything but the isOutOfService flag
- InsectState.builder()
- .name(state.getName())
- .isOutOfService(isOutOfService)
- .timestampEpochRemote(state.getTimestampEpochRemote())
- .timestamp(state.getTimestamp())
- .timestampEpochLocal(state.getTimestampEpochLocal())
- .dependencies(state.getDependencies())
- .socketAddress(state.getSocketAddress())
- .build()
- : null
- );
- }
-
-
- @Override
- protected void postHandleMapping(InsectState state, Mapping mapping, boolean isNewMapping, boolean isDependencyMapping)
- {
- if (isNewMapping)
- {
- sendInvalidate(state.getSocketAddress());
- }
-
- if (isDependencyMapping)
- {
- handleDependencyRequest(state, mapping);
- }
- else
- {
- relayMapping(state, mapping);
- }
- }
-
- /**
- * Send an invalidate request to a slave.
- */
- private void sendInvalidate(InetSocketAddress remote)
- {
- val invalidate = (Invalidate) Invalidate.builder()
- .type(Invalidate.TYPE_INVALIDATE)
- .magic(Invalidate.MAGIC)
- .build();
-
- addMessage(remote, invalidate);
- }
-
-
- @Subscribe
- protected void onServerShutdown(ServerShutdown serverShutdown)
- {
- eventBus.unregister(this);
- close();
- }
-
-
- /**
- * Immediately respond with one of the mappings we have in case a dependency was published.
- * Otherwise the sender would have to wait for the next pulse to reach it.
- */
- private void handleDependencyRequest(InsectState state, Mapping mapping)
- {
- val alternatives = getRouteToInsects().getOrDefault(mapping.getDependency(), emptyRoute()).getActive();
- val size = alternatives.size();
- if (size > 0)
- {
- // find random non out-of-service insect
- val startIndex = getRandom().nextInt(size);
- for (int i = 0; i < size; ++i)
- {
- val candidate = alternatives.get((i + startIndex) % size);
- if (!candidate.isOutOfService())
- {
- addMessage(candidate.getSocketAddress(), createMappingFromState(candidate, mapping.getDependency()));
- }
- }
- }
- }
-
- /**
- * Relay updates to all interested services (those that have this services route in their dependencies).
- */
- private void relayMapping(final InsectState state, final Mapping mapping)
- {
- if (state.isOutOfService())
- {
- // do relay mappings for out-of-service services
- return;
- }
-
- getRouteToInsects().forEach((route, states) ->
- {
- val mappingRoute = mapping.getRoute();
- if (route != null)
- {
- for (val s : states.getActive())
- {
- if (s.getDependencies().contains(mappingRoute))
- {
- val destination = s.getSocketAddress();
-
- if (destination.isUnresolved())
- {
- log.debug("unresolved address {}", destination);
- }
-
- addMessage(destination, mapping);
- }
- }
- }
- });
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/SyncSlave.java b/src/main/java/net/talpidae/base/insect/SyncSlave.java
deleted file mode 100644
index a32148c..0000000
--- a/src/main/java/net/talpidae/base/insect/SyncSlave.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect;
-
-import com.google.common.base.Strings;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.inject.Singleton;
-
-import net.talpidae.base.event.Invalidate;
-import net.talpidae.base.event.ServerShutdown;
-import net.talpidae.base.event.Shutdown;
-import net.talpidae.base.insect.config.SlaveSettings;
-import net.talpidae.base.insect.message.payload.Mapping;
-import net.talpidae.base.insect.message.payload.Metrics;
-import net.talpidae.base.insect.message.payload.Payload;
-import net.talpidae.base.insect.state.InsectState;
-import net.talpidae.base.insect.state.ServiceState;
-import net.talpidae.base.util.network.NetworkUtil;
-import net.talpidae.base.util.performance.Metric;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiFunction;
-
-import javax.inject.Inject;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-
-@Singleton
-@Slf4j
-public class SyncSlave extends Insect implements Slave
-{
- private static final long DEPENDENCY_RESEND_MILLIES_MIN = TimeUnit.MILLISECONDS.toMillis(100);
-
- private static final long DEPENDENCY_RESEND_MILLIES_MAX = TimeUnit.SECONDS.toMillis(12);
-
- private final Map dependencies = new ConcurrentHashMap<>();
-
- private final EventBus eventBus;
-
- private final NetworkUtil networkUtil;
-
- private long nextHeartBeatNanos = 0L;
-
- @Getter
- private volatile boolean isRunning = false;
-
- @Inject
- public SyncSlave(SlaveSettings settings, EventBus eventBus, NetworkUtil networkUtil)
- {
- super(settings, true);
-
- this.eventBus = eventBus;
- this.networkUtil = networkUtil;
-
- eventBus.register(this);
- }
-
-
- @Override
- public void run()
- {
- try
- {
- synchronized (this)
- {
- isRunning = true;
- notifyAll();
- }
-
- if (Strings.isNullOrEmpty(getSettings().getRoute()))
- {
- log.debug("argument for parameter \"route\" is empty, won't publish anything");
- }
-
- nextHeartBeatNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(getSettings().getPulseDelay());
- super.run();
- }
- finally
- {
- isRunning = false;
- }
- }
-
- /**
- * Try to find a service for route, register route as a dependency and block in case it isn't available immediately.
- */
- @Override
- public InetSocketAddress findService(String route) throws InterruptedException
- {
- return findService(route, Long.MAX_VALUE);
- }
-
- /**
- * Try to find a service for route, register route as a dependency and block in case it isn't available immediately.
- *
- * @return Address of discovered service if one was discovered before a timeout occurred, null otherwise.
- */
- @Override
- public InetSocketAddress findService(String route, long timeoutMillies) throws InterruptedException
- {
- // we may occasionally get an empty collection from findServices()
- val alternatives = findServices(route, timeoutMillies);
- val size = alternatives.size();
- if (size > 0)
- {
- // pick a random service from the pool
- return alternatives.get(getRandom().nextInt(size)).getSocketAddress();
- }
-
- // timeout
- return null;
- }
-
- /**
- * Return all known services for route, register route as a dependency and block in case there are none available immediately.
- *
- * @return Discovered services if any were discovered before a timeout occurred, empty list otherwise.
- */
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- @Override
- public List extends ServiceState> findServices(String route, long timeoutMillies) throws InterruptedException
- {
- List extends ServiceState> alternatives = lookupServices(route);
- if (!alternatives.isEmpty())
- {
- // fast path
- return alternatives;
- }
-
- long nowNanos = System.nanoTime();
- val timeout = (timeoutMillies >= 0) ? TimeUnit.NANOSECONDS.toMillis(nowNanos) + timeoutMillies : Long.MAX_VALUE;
- long waitInterval = DEPENDENCY_RESEND_MILLIES_MIN;
-
- val routeWaiter = dependencies.computeIfAbsent(route, k -> new RouteWaiter());
- do
- {
- // indicate that we are waiting for this route to be discovered
- switch (routeWaiter.advanceDiscoveryState(nowNanos))
- {
- case SEND:
- // send out discovery request
- requestDependency(nowNanos, route);
-
- // fall-through
-
- case DONE:
- alternatives = lookupServices(route);
- if (!alternatives.isEmpty())
- {
- routeWaiter.setDiscoveryComplete();
- dependencies.remove(route);
-
- return alternatives;
- }
- }
-
- // wait for news on this route
- val maxRemainingMillies = timeout - TimeUnit.NANOSECONDS.toMillis(nowNanos);
- val waitMillies = Math.min(Math.min(waitInterval, maxRemainingMillies), DEPENDENCY_RESEND_MILLIES_MAX);
- if (waitMillies >= 0L)
- {
- synchronized (routeWaiter)
- {
- routeWaiter.wait(waitMillies);
- }
- }
- else
- {
- break;
- }
-
- waitInterval = waitInterval * 2;
- nowNanos = System.nanoTime();
- }
- while (true);
-
- log.warn("findService(): timeout for route: {}", route);
- return alternatives;
- }
-
-
- @Override
- public void forwardMetrics(Queue metricQueue)
- {
- val metrics = Metrics.builder()
- .metrics(metricQueue)
- .build();
-
- sendToRemotes((settings, remote) -> metrics);
- }
-
-
- private List extends ServiceState> lookupServices(String route)
- {
- return getRouteToInsects().getOrDefault(route, emptyRoute()).getActive();
- }
-
-
- @Override
- protected void postHandleMapping(InsectState state, Mapping mapping, boolean isNewMapping, boolean isDependencyMapping)
- {
- if (isNewMapping)
- {
- // notify findService() callers blocking for route discovery
- val routeWaiter = dependencies.get(mapping.getRoute());
- if (routeWaiter != null)
- {
- routeWaiter.setDiscoveryComplete();
- dependencies.remove(mapping.getRoute());
- }
- }
- }
-
- @Override
- protected void handleShutdown()
- {
- // tell listeners that we received a shutdown request
- eventBus.post(new Shutdown());
- }
-
-
- @Subscribe
- protected void onServerShutdown(ServerShutdown serverShutdown)
- {
- eventBus.unregister(this);
- close();
- }
-
-
- @Override
- protected void handleInvalidate()
- {
- // drop cached remotes
- getRouteToInsects().values().forEach(InsectCollection::clear);
-
- // tell listeners that we received an invalidate request
- eventBus.post(new Invalidate());
- }
-
-
- @Override
- protected long handlePulse(long nowNanos)
- {
- val maximumWaitTime = super.handlePulse(nowNanos);
-
- if (nowNanos >= nextHeartBeatNanos)
- {
- sendHeartbeat(nowNanos);
-
- // scheduled next heartbeat, taking overshoot (delay) of this heartbeat into account
- nextHeartBeatNanos = nowNanos + Math.max(0, (TimeUnit.MILLISECONDS.toNanos(getPulseDelayMillies()) - Math.max(0L, (nowNanos - nextHeartBeatNanos))));
- }
-
- return Math.max(1L, Math.min(maximumWaitTime, TimeUnit.NANOSECONDS.toMillis(nextHeartBeatNanos - nowNanos)));
- }
-
-
- private void sendHeartbeat(long nowNanos)
- {
- val bindSocketAddress = getSettings().getBindAddress();
- val hostAddress = getSettings().getBindAddress().getAddress();
- val port = getSettings().getBindAddress().getPort();
-
- sendToRemotes((settings, remote) ->
- {
- val remoteAddress = remote.getAddress();
-
- final String host = (hostAddress != null)
- ? networkUtil.getReachableLocalAddress(hostAddress, remoteAddress).getHostAddress()
- : bindSocketAddress.getHostString();
-
- return Mapping.builder()
- .host(host)
- .port(port)
- .timestamp(nowNanos)
- .route(settings.getRoute())
- .name(settings.getName())
- .socketAddress(InetSocketAddress.createUnresolved(host, port))
- .build();
- });
- }
-
-
- private void requestDependency(long nowNanos, String requestedRoute)
- {
- val bindSocketAddress = getSettings().getBindAddress();
- val hostAddress = getSettings().getBindAddress().getAddress();
- val port = getSettings().getBindAddress().getPort();
-
- sendToRemotes((settings, remote) ->
- {
- val remoteAddress = remote.getAddress();
-
- final String host = (hostAddress != null)
- ? networkUtil.getReachableLocalAddress(hostAddress, remoteAddress).getHostAddress()
- : bindSocketAddress.getHostString();
-
- return Mapping.builder()
- .host(host)
- .port(port)
- .timestamp(nowNanos)
- .route(settings.getRoute())
- .name(settings.getName())
- .dependency(requestedRoute)
- .socketAddress(InetSocketAddress.createUnresolved(host, port))
- .build();
- });
- }
-
-
- private void sendToRemotes(BiFunction payloadProducer)
- {
- val settings = getSettings();
- for (val remote : settings.getRemotes())
- {
- addMessage(remote, payloadProducer.apply(settings, remote));
- }
- }
-
-
- private static final class RouteWaiter
- {
- private final AtomicLong discoveryState = new AtomicLong(0L);
-
- private volatile long resendNanos = TimeUnit.MILLISECONDS.toNanos(DEPENDENCY_RESEND_MILLIES_MIN);
-
-
- RouteWaiter()
- {
- }
-
-
- State advanceDiscoveryState(long currentNanos)
- {
- val state = discoveryState.get();
- if (state <= currentNanos - resendNanos)
- {
- if (discoveryState.compareAndSet(state, currentNanos))
- {
- resendNanos = Math.min(resendNanos * 2, DEPENDENCY_RESEND_MILLIES_MAX);
- return State.SEND;
- }
- }
- else if (state == Long.MAX_VALUE)
- {
- // discovery complete
- return State.DONE;
- }
-
- return State.WAIT;
- }
-
-
- State getDiscoveryState()
- {
- return discoveryState.get() == Long.MAX_VALUE ? State.DONE : State.WAIT;
- }
-
-
- void setDiscoveryComplete()
- {
- discoveryState.set(Long.MAX_VALUE);
-
- synchronized (this)
- {
- notifyAll();
- }
- }
-
-
- enum State
- {
- WAIT,
- DONE,
- SEND
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/config/DefaultQueenSettings.java b/src/main/java/net/talpidae/base/insect/config/DefaultQueenSettings.java
deleted file mode 100644
index dc8a9b4..0000000
--- a/src/main/java/net/talpidae/base/insect/config/DefaultQueenSettings.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.config;
-
-import com.google.inject.Singleton;
-import lombok.Getter;
-import lombok.NonNull;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import net.talpidae.base.server.ServerConfig;
-import net.talpidae.base.util.log.LoggingConfigurer;
-import net.talpidae.base.util.names.InsectNameGenerator;
-
-import javax.inject.Inject;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.Set;
-
-import static net.talpidae.base.util.log.LoggingConfigurer.CONTEXT_INSECT_NAME_KEY;
-
-
-@Slf4j
-@Singleton
-@Setter
-@Getter
-public class DefaultQueenSettings implements QueenSettings
-{
- @Setter
- @Getter
- private String name;
-
- @NonNull
- private InetSocketAddress bindAddress;
-
- @NonNull
- private Set remotes;
-
- private long pulseDelay = DEFAULT_PULSE_DELAY;
-
- private long restInPeaceTimeout = DEFAULT_REST_IN_PEACE_TIMEOUT;
-
- @Inject
- public DefaultQueenSettings(ServerConfig serverConfig, LoggingConfigurer loggingConfigurer, InsectNameGenerator insectNameGenerator)
- {
- name = insectNameGenerator.compose().replace(' ', '-').intern();
- bindAddress = new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort());
- remotes = Collections.emptySet();
-
- loggingConfigurer.putContext(CONTEXT_INSECT_NAME_KEY, name);
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/config/DefaultSlaveSettings.java b/src/main/java/net/talpidae/base/insect/config/DefaultSlaveSettings.java
deleted file mode 100644
index 03db680..0000000
--- a/src/main/java/net/talpidae/base/insect/config/DefaultSlaveSettings.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.config;
-
-import com.google.common.base.Strings;
-import com.google.common.net.HostAndPort;
-import com.google.inject.Singleton;
-
-import net.talpidae.base.server.ServerConfig;
-import net.talpidae.base.util.BaseArguments;
-import net.talpidae.base.util.log.LoggingConfigurer;
-
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import lombok.Getter;
-import lombok.NonNull;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-import static net.talpidae.base.util.log.LoggingConfigurer.CONTEXT_INSECT_NAME_KEY;
-
-
-@Singleton
-@Setter
-@Getter
-@Slf4j
-public class DefaultSlaveSettings implements SlaveSettings
-{
- @Setter
- @Getter
- private String name;
-
- @NonNull
- private InetSocketAddress bindAddress;
-
- @NonNull
- private Set remotes;
-
- @NonNull
- private String route;
-
- private long pulseDelay = DEFAULT_PULSE_DELAY;
-
- private long restInPeaceTimeout;
-
-
- @Inject
- public DefaultSlaveSettings(ServerConfig serverConfig, BaseArguments baseArguments, LoggingConfigurer loggingConfigurer)
- {
- this.bindAddress = new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort());
-
- val parser = baseArguments.getOptionParser();
- val nameOption = parser.accepts("insect.name").withRequiredArg().required();
- val remoteOption = parser.accepts("insect.slave.remote").withRequiredArg().required();
- val timeoutOption = parser.accepts("insect.slave.timeout").withRequiredArg().ofType(Long.class).defaultsTo(DEFAULT_REST_IN_PEACE_TIMEOUT);
- val options = baseArguments.parse();
-
- this.name = options.valueOf(nameOption).intern();
- this.restInPeaceTimeout = options.valueOf(timeoutOption);
-
- val remotes = new HashSet();
- for (val remoteValue : options.valuesOf(remoteOption))
- {
- val remote = HostAndPort.fromString(remoteValue);
- try
- {
- val host = remote.getHost();
- val port = remote.getPortOrDefault(QueenSettings.DEFAULT_PORT);
- if (!Strings.isNullOrEmpty(host))
- {
- val socketAddress = new InetSocketAddress(host.intern(), port);
- if (socketAddress.isUnresolved())
- {
- throw new IllegalArgumentException("failed to resolve remote host: " + socketAddress.getHostString());
- }
-
- remotes.add(socketAddress);
- continue;
- }
- }
- catch (ArrayIndexOutOfBoundsException | NumberFormatException e)
- {
- // throw below
- }
-
- throw new IllegalArgumentException("invalid host[:port] pair specified: " + remoteValue);
- }
-
- this.remotes = Collections.unmodifiableSet(remotes);
-
- loggingConfigurer.putContext(CONTEXT_INSECT_NAME_KEY, name);
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/config/InsectSettings.java b/src/main/java/net/talpidae/base/insect/config/InsectSettings.java
deleted file mode 100644
index 2e7dcbb..0000000
--- a/src/main/java/net/talpidae/base/insect/config/InsectSettings.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.config;
-
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-
-public interface InsectSettings
-{
- long DEFAULT_PULSE_DELAY = 1001; // 1001ms
- long DEFAULT_REST_IN_PEACE_TIMEOUT = 5 * 60 * 1000; // 5 min
-
- /**
- * Unique name of this instance.
- */
- String getName();
-
- void setName(String name);
-
- /**
- * InetSocketAddress to bind to.
- */
- InetSocketAddress getBindAddress();
-
- void setBindAddress(InetSocketAddress bindAddress);
-
- /**
- * Remote servers that are authorized to update mappings they do not own themselves
- * and are informed about our services.
- */
- Set getRemotes();
-
- void setRemotes(Set remotes);
-
- /**
- * Heart beat / pulse delay.
- */
- long getPulseDelay();
-
- void setPulseDelay(long pulseDelay);
-
- /**
- * Timeout after which a service is declared dead and purged from the mapping.
- * Since we always pick the youngest per route, this can be pretty high and will define
- * the maximum permitted down-time of upstream servers (Queen instances).
- */
- long getRestInPeaceTimeout();
-
- void setRestInPeaceTimeout(long restInPeaceTimeout);
-}
diff --git a/src/main/java/net/talpidae/base/insect/config/QueenSettings.java b/src/main/java/net/talpidae/base/insect/config/QueenSettings.java
deleted file mode 100644
index 02c3f5a..0000000
--- a/src/main/java/net/talpidae/base/insect/config/QueenSettings.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.config;
-
-
-public interface QueenSettings extends InsectSettings
-{
- int DEFAULT_PORT = 13300;
-}
diff --git a/src/main/java/net/talpidae/base/insect/config/SlaveSettings.java b/src/main/java/net/talpidae/base/insect/config/SlaveSettings.java
deleted file mode 100644
index 54b8bc9..0000000
--- a/src/main/java/net/talpidae/base/insect/config/SlaveSettings.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.config;
-
-public interface SlaveSettings extends InsectSettings
-{
- /**
- * The route this slave provides.
- */
- String getRoute();
-
- void setRoute(String route);
-}
diff --git a/src/main/java/net/talpidae/base/insect/exchange/BaseMessage.java b/src/main/java/net/talpidae/base/insect/exchange/BaseMessage.java
deleted file mode 100644
index 519c24d..0000000
--- a/src/main/java/net/talpidae/base/insect/exchange/BaseMessage.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.exchange;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-
-
-public class BaseMessage
-{
- @Getter(AccessLevel.PROTECTED)
- private final ByteBuffer buffer;
-
- @Getter
- private InetSocketAddress remoteAddress;
-
-
- protected BaseMessage(int maximumSize)
- {
- buffer = ByteBuffer.allocateDirect(maximumSize);
- }
-
-
- /**
- * Override this to perform additional cleanup.
- */
- protected void clear()
- {
-
- }
-
-
- boolean receiveFrom(DatagramChannel channel) throws IOException
- {
- remoteAddress = (InetSocketAddress) channel.receive(buffer);
- if (remoteAddress != null)
- {
- buffer.flip();
- return true;
- }
-
- return false;
- }
-
-
- void setRemoteAddress(InetSocketAddress remoteAddress)
- {
- this.remoteAddress = remoteAddress;
- }
-
-
- boolean sendTo(DatagramChannel channel) throws IOException
- {
- return channel.send(buffer, remoteAddress) != 0;
- }
-
-
- void passivate()
- {
- clear(); // may be overridden
-
- buffer.clear();
- remoteAddress = null;
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/exchange/MessageExchange.java b/src/main/java/net/talpidae/base/insect/exchange/MessageExchange.java
deleted file mode 100644
index 4f9fe60..0000000
--- a/src/main/java/net/talpidae/base/insect/exchange/MessageExchange.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.exchange;
-
-import net.talpidae.base.insect.CloseableRunnable;
-import net.talpidae.base.insect.config.InsectSettings;
-import net.talpidae.base.util.pool.SoftReferenceObjectPool;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-import static com.google.common.base.Strings.nullToEmpty;
-
-
-@Slf4j
-public abstract class MessageExchange implements CloseableRunnable
-{
- private static final int MESSAGE_POOL_HARD_LIMIT = 512;
-
- private final InsectSettings settings;
-
- private final SoftReferenceObjectPool messagePool;
-
- private final Queue inbound = new ArrayDeque<>();
-
- private final Queue outbound = new ArrayDeque<>();
-
- private final ExchangeMessageQueueControl queueControl = new ExchangeMessageQueueControl();
-
- private final AtomicLong runThreadId = new AtomicLong(-1);
-
- private Selector selector;
-
- private int activeInterestOps = SelectionKey.OP_READ;
-
- @Getter
- private int port = 0;
-
- private String lastErrorMessage;
-
-
- public MessageExchange(Supplier messageFactory, InsectSettings settings)
- {
- this.messagePool = new SoftReferenceObjectPool<>(messageFactory, MESSAGE_POOL_HARD_LIMIT);
- this.settings = settings;
- }
-
-
- /**
- * Wakeup exchange to force sending outbound messages and/or processing.
- */
- protected void wakeup() throws IllegalStateException
- {
- // make sure our raw is processed as soon as possible
- if (selector != null)
- {
- selector.wakeup();
- }
- }
-
- /**
- * Override this to process incoming messages and react to periodic events.
- *
- * This allows to handle almost all logic on the same thread as the exchange and thus avoid
- * extensive locking for thread-safety.
- *
- * @return Returns the maximum delay im milliseconds until the next invocation of this method.
- */
- protected abstract long processMessages(MessageQueueControl control);
-
- public void run()
- {
- try
- {
- if (!runThreadId.compareAndSet(-1L, Thread.currentThread().getId()))
- {
- log.debug("already running, won't enter run again");
- return;
- }
-
- synchronized (this)
- {
- notifyAll();
- }
-
- val key = setup();
- val channel = (DatagramChannel) key.channel();
-
- log.info("MessageExchange running on {}", settings.getBindAddress().toString());
-
- long maxWaitMillies;
- M outboundMessage = null;
- while (!Thread.interrupted())
- {
- try
- {
- maxWaitMillies = processMessages(queueControl);
-
- // recycle messages removed from the inbound queue by processMessages()
- queueControl.recycleConsumedMessages();
-
- if (outboundMessage == null)
- {
- // look for new messages
- outboundMessage = pollAndUpdateInterestSet(key);
- }
-
- selector.select(maxWaitMillies);
- if (key.isValid())
- {
- boolean mayReadMore = key.isReadable();
- boolean mayWriteMore = key.isWritable() && outboundMessage != null;
-
- while (mayReadMore || mayWriteMore)
- {
- mayReadMore = mayReadMore && tryReceive(channel);
-
- mayWriteMore = mayWriteMore
- && trySend(channel, outboundMessage)
- && ((outboundMessage = outbound.poll()) != null);
- }
- }
- }
- catch (IOException e)
- {
- handleSelectError(e);
- }
- }
- }
- catch (Exception e)
- {
- handleRunError(e);
- }
- finally
- {
- close();
- runThreadId.set(-1);
- }
- }
-
-
- public boolean isRunning()
- {
- return runThreadId.get() != -1L;
- }
-
-
- /**
- * Get access to the current queue control.
- *
- * @return Queue control object if currently executing processMessages() and inside the same thread, null otherwise.
- */
- protected MessageQueueControl getQueueControl()
- {
- val threadId = Thread.currentThread().getId();
- if (threadId == runThreadId.get())
- {
- return queueControl;
- }
-
- return null;
- }
-
- @Override
- public void close()
- {
- if (selector != null)
- {
- try { selector.close(); } catch (IOException e) { /* ignore */ }
- }
- }
-
- /**
- * Rate limit by exception message.
- *
- * @return The exceptions message as obtained from getMessage() if not rate-limited, null otherwise.
- */
- private String rateLimitByMessage(Exception e)
- {
- val originalMessage = e.getMessage();
- val message = originalMessage != null ? originalMessage : e.getClass().getName();
- if (lastErrorMessage == null || !lastErrorMessage.equals(message))
- {
- lastErrorMessage = message;
- return message;
- }
-
- // rate-limited
- return null;
- }
-
- /**
- * Handle error in main run() method.
- */
- private void handleRunError(Exception e)
- {
- if (e instanceof ClosedSelectorException
- || e instanceof ClosedChannelException
- || e instanceof CancelledKeyException
- || e instanceof InterruptedException)
- {
- log.info("shutting down, reason: {}: {}", e.getClass().getSimpleName(), e.getMessage());
- }
- else
- {
- log.error("error during socket operation", e);
- }
- }
-
-
- /**
- * Handle error during select().
- */
- private void handleSelectError(IOException e)
- {
- val message = rateLimitByMessage(e);
- if (message != null)
- {
- log.error("error selecting keys: {}", message);
- }
- }
-
- /**
- * Handle error during receive().
- */
- private void handleReceiveError(IOException e)
- {
- val message = rateLimitByMessage(e);
- if (message != null)
- {
- log.error("error during receive: {}", message);
- }
- }
-
- /**
- * Log a send error and information about the dropped message.
- */
- private void handleSendError(Exception e, M outboundMessage)
- {
- val message = rateLimitByMessage(e);
- if (message != null)
- {
- if (message.equals("Invalid argument"))
- {
- log.error("error during send: {}, socket bound to {}", message, settings.getBindAddress());
- }
- else
- {
- log.error("error during send: {}", message);
- }
- }
-
- // last error was same, drop this message
- log.error("dropping outbound {} to {}",
- outboundMessage.getClass().getSimpleName(),
- outboundMessage.getRemoteAddress());
- }
-
-
- private SelectionKey setup() throws IOException
- {
- outbound.clear();
- inbound.clear();
- lastErrorMessage = null;
- selector = Selector.open();
-
- val channel = DatagramChannel.open();
-
- val bindAddress = settings.getBindAddress();
- channel.socket().bind(bindAddress);
- port = channel.socket().getLocalPort();
-
- channel.configureBlocking(false);
-
- return channel.register(selector, activeInterestOps);
- }
-
-
- private boolean tryReceive(DatagramChannel channel)
- {
- val message = messagePool.borrow();
- try
- {
- if (message.receiveFrom(channel))
- {
- inbound.add(message);
- return true;
- }
- }
- catch (IOException e)
- {
- handleReceiveError(e);
- }
-
- // nothing received, return message to pool
- recycleMessage(message);
- return false;
- }
-
-
- private boolean trySend(DatagramChannel channel, M message)
- {
- try
- {
- if (!message.sendTo(channel))
- {
- // not ready for writing, try again later
- return false;
- }
- }
- catch (IOException | UnresolvedAddressException e)
- {
- handleSendError(e, message);
- }
-
- // message handled or dropped because of send error
- recycleMessage(message);
- return true;
- }
-
-
- private M pollAndUpdateInterestSet(SelectionKey key)
- {
- final M message = outbound.poll();
-
- final int interestOps;
- if (message != null)
- {
- // we have packets to write and will still tryReceive
- interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
- }
- else
- {
- // nothing to trySend, still interested inbound receiving
- interestOps = SelectionKey.OP_READ;
- }
-
- if (interestOps != activeInterestOps)
- {
- activeInterestOps = interestOps;
- key.interestOps(interestOps);
- }
-
- return message;
- }
-
-
- private void recycleMessage(M message)
- {
- message.passivate();
- messagePool.recycle(message);
- }
-
-
- private class ExchangeMessageQueueControl implements MessageQueueControl
- {
- private final List consumedInboundMessages = new ArrayList<>();
-
-
- @Override
- public M addOutbound(InetSocketAddress remoteAddress)
- {
- val message = messagePool.borrow();
-
- message.setRemoteAddress(remoteAddress);
- outbound.add(message);
-
- // caller may fill message now
- return message;
- }
-
- @Override
- public M pollInbound()
- {
- val message = inbound.poll();
- if (message != null)
- {
- consumedInboundMessages.add(message);
- }
-
- return message;
- }
-
- void recycleConsumedMessages()
- {
- for (int i = consumedInboundMessages.size() - 1; i >= 0; --i)
- {
- recycleMessage(consumedInboundMessages.remove(i));
- }
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/exchange/MessageQueueControl.java b/src/main/java/net/talpidae/base/insect/exchange/MessageQueueControl.java
deleted file mode 100644
index 196db4d..0000000
--- a/src/main/java/net/talpidae/base/insect/exchange/MessageQueueControl.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.exchange;
-
-import java.net.InetSocketAddress;
-
-
-public interface MessageQueueControl
-{
- /**
- * Create and add a new outbound message.
- */
- M addOutbound(InetSocketAddress remoteAddress);
-
-
- /**
- * Poll for new inbound messages.
- *
- * @return Reference to an inbound message, valid until the next call to pollInbound().
- * Returns null if there are no inbound messages.
- */
- M pollInbound();
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/message/InsectMessage.java b/src/main/java/net/talpidae/base/insect/message/InsectMessage.java
deleted file mode 100644
index bdab16e..0000000
--- a/src/main/java/net/talpidae/base/insect/message/InsectMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message;
-
-import net.talpidae.base.insect.exchange.BaseMessage;
-import net.talpidae.base.insect.message.payload.Payload;
-import net.talpidae.base.insect.message.payload.PayloadFactory;
-
-import java.nio.charset.CharacterCodingException;
-
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-
-@Slf4j
-public class InsectMessage extends BaseMessage
-{
- private Payload payload;
-
-
- public InsectMessage()
- {
- super(PayloadFactory.getMaximumSerializedSize());
- }
-
-
- public Payload getPayload() throws IndexOutOfBoundsException, CharacterCodingException
- {
- if (payload == null)
- {
- val newMapping = PayloadFactory.unpackPayload(getBuffer(), 0);
- if (newMapping != null)
- {
- payload = newMapping;
- }
- }
-
- return payload;
- }
-
-
- public void setPayload(Payload newPayload)
- {
- if (payload != null)
- {
- throw new IllegalStateException("payload has already been set");
- }
-
- payload = newPayload;
- payload.to(getBuffer());
-
- getBuffer().flip();
- }
-
-
- @Override
- protected void clear()
- {
- payload = null;
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Invalidate.java b/src/main/java/net/talpidae/base/insect/message/payload/Invalidate.java
deleted file mode 100644
index 86c53aa..0000000
--- a/src/main/java/net/talpidae/base/insect/message/payload/Invalidate.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message.payload;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-import java.nio.ByteBuffer;
-
-
-@Slf4j
-@Builder
-public class Invalidate extends Payload
-{
- public static final int MAXIMUM_SERIALIZED_SIZE = 2;
-
- public static final int TYPE_INVALIDATE = 0x3;
-
- public static final int MAGIC = 0x73;
-
- @Getter
- @Builder.Default
- private final int type = TYPE_INVALIDATE; // 0x3: invalidate known remotes
-
- @Getter
- @Builder.Default
- private final int magic = MAGIC; // magic byte: 0x73
-
-
- static Invalidate from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException
- {
- val type = buffer.get(offset) & 0xFF;
- if (type != TYPE_INVALIDATE)
- {
- return null;
- }
-
- val magic = buffer.get(offset + 1) & 0xFF;
- if (magic != MAGIC)
- {
- log.debug("encountered invalidate payload with invalid magic");
- return null;
- }
-
- return Invalidate.builder()
- .type(type)
- .magic(magic)
- .build();
- }
-
-
- public void to(ByteBuffer buffer)
- {
- buffer.put((byte) type);
- buffer.put((byte) magic);
- }
-
-
- @Override
- public int getMaximumSize()
- {
- return MAXIMUM_SERIALIZED_SIZE;
- }
-
-
- @Override
- public String toString()
- {
- return Integer.toHexString(getType());
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Mapping.java b/src/main/java/net/talpidae/base/insect/message/payload/Mapping.java
deleted file mode 100644
index 70196d8..0000000
--- a/src/main/java/net/talpidae/base/insect/message/payload/Mapping.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message.payload;
-
-import com.google.common.base.Strings;
-
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-import static net.talpidae.base.util.protocol.BinaryProtocolHelper.extractString;
-import static net.talpidae.base.util.protocol.BinaryProtocolHelper.putTruncatedUTF8;
-
-
-@Slf4j
-@Builder
-public class Mapping extends Payload
-{
- public static final int MAXIMUM_SERIALIZED_SIZE = 1036;
-
- public static final int TYPE_MAPPING = 0x1;
-
- private static final int STRING_SIZE_MAX = 255;
-
- @Getter
- @Builder.Default
- private final int type = TYPE_MAPPING; // 0x1: mapping
-
- @Builder.Default
- @Getter
- private final int flags = 0; // 0x0
-
- @Getter
- private final long timestamp; // client System.nanoTime()
-
- @Getter
- private final int port; // client port
-
- @Getter
- private final String host; // client IPv4 address
-
- @Getter
- private final String route; // route
-
- @Getter
- private final String name; // name (unique service instance ID)
-
- @Builder.Default
- @Getter
- private final String dependency = ""; // empty or one of the services dependencies
-
- @Getter
- @NonNull
- private final InetSocketAddress socketAddress;
-
-
- static Mapping from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException, CharacterCodingException
- {
- val type = buffer.get(offset) & 0xFF;
- if (type != TYPE_MAPPING)
- {
- return null;
- }
-
- val hostOffset = offset + 16;
- val hostLength = buffer.get(offset + 2) & 0xFF; // length of host
- val routeOffset = hostOffset + hostLength;
- val routeLength = buffer.get(offset + 3) & 0xFF; // length of route
- val port = buffer.getShort(offset + 12) & 0xFFFF;
- val nameOffset = routeOffset + routeLength;
- val nameLength = buffer.get(offset + 14) & 0xFF; // length of name
- val dependencyOffset = nameOffset + nameLength;
- val dependencyLength = buffer.get(offset + 15) & 0xFF; // length of dependency
-
- val host = extractString(buffer, hostOffset, hostLength).intern();
- return Mapping.builder()
- .type(type)
- .flags(buffer.get(offset + 1) & 0xFF)
- .timestamp(buffer.getLong(offset + 4))
- .port(port)
- .host(host)
- .route(extractString(buffer, routeOffset, routeLength).intern())
- .name(extractString(buffer, nameOffset, nameLength).intern())
- .dependency(extractString(buffer, dependencyOffset, dependencyLength).intern())
- .socketAddress(InetSocketAddress.createUnresolved(host, port))
- .build();
- }
-
-
- @Override
- public void to(ByteBuffer buffer)
- {
- int begin = buffer.position();
-
- // start writing dynamic fields behind static fields first
- int offset = begin + 16;
- val hostLength = putTruncatedUTF8(buffer, offset, host, STRING_SIZE_MAX);
- val routeLength = putTruncatedUTF8(buffer, offset += hostLength, route, STRING_SIZE_MAX);
- val nameLength = putTruncatedUTF8(buffer, offset += routeLength, name, STRING_SIZE_MAX);
- val dependencyLength = putTruncatedUTF8(buffer, offset += nameLength, dependency, STRING_SIZE_MAX);
- offset += dependencyLength;
-
- buffer.put(begin, (byte) type);
- buffer.put(begin + 1, (byte) flags);
- buffer.put(begin + 2, (byte) hostLength);
- buffer.put(begin + 3, (byte) routeLength);
- buffer.putLong(begin + 4, timestamp);
- buffer.putShort(begin + 12, (short) port);
- buffer.put(begin + 14, (byte) nameLength);
- buffer.put(begin + 15, (byte) dependencyLength);
-
- buffer.position(offset); // include dynamic part
- }
-
-
- /**
- * Check subject address against sender address.
- */
- public boolean isAuthorative(InetSocketAddress remoteAddress)
- {
- val authorizedHostOrAddress = getSocketAddress().getHostString();
- val authorizedPort = getSocketAddress().getPort();
-
- return authorizedPort == remoteAddress.getPort()
- && (authorizedHostOrAddress.equals(remoteAddress.getHostString())
- || authorizedHostOrAddress.equals(remoteAddress.getAddress().getHostAddress()));
- }
-
- @Override
- public int getMaximumSize()
- {
- return MAXIMUM_SERIALIZED_SIZE;
- }
-
-
- @Override
- public String toString()
- {
- val dependency = getDependency();
- return Integer.toHexString(getType()) + ", " +
- Integer.toHexString(flags) + ", " +
- getTimestamp() + ", " +
- getPort() + ", " +
- getRoute() + ", " +
- getName() +
- ((Strings.isNullOrEmpty(dependency)) ? "" : ", " + dependency);
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Metrics.java b/src/main/java/net/talpidae/base/insect/message/payload/Metrics.java
deleted file mode 100644
index 478d370..0000000
--- a/src/main/java/net/talpidae/base/insect/message/payload/Metrics.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message.payload;
-
-import com.google.common.base.Utf8;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-import net.talpidae.base.util.performance.Metric;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-
-import static net.talpidae.base.util.protocol.BinaryProtocolHelper.extractString;
-import static net.talpidae.base.util.protocol.BinaryProtocolHelper.putTruncatedUTF8;
-
-
-@Slf4j
-@Builder
-public class Metrics extends Payload
-{
- public static final int MAXIMUM_SERIALIZED_SIZE = 1472;
-
- public static final int TYPE_METRIC = 0x4;
-
- private static final int METRIC_COUNT_MAX = 255;
-
- private static final int STRING_SIZE_MAX = 255;
-
-
- @Getter
- @Builder.Default
- private final int type = TYPE_METRIC; // 0x4: metrics
-
- @Getter
- private final List metrics; // metrics
-
-
- /**
- * Create a Metrics instance from binary data.
- *
- * Basic binary layout is: tnGET/signup/statusCodeTTTTTTTTvvvvvvvv
- */
- static Metrics from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException, CharacterCodingException
- {
- val type = buffer.get(offset) & 0xFF;
- if (type != TYPE_METRIC)
- {
- return null;
- }
-
- // extract metrics
- val count = buffer.get(++offset) & 0xFF;
- if (count > 0)
- {
- val metrics = new Metric[count];
- ++offset;
- int i;
- for (i = 0; i < count; ++i)
- {
- // extract one (path, value) pair
- val pathOffset = offset + 1;
- val pathLength = buffer.get(offset) & 0xFF;
- val tsOffset = pathOffset + pathLength;
- val valueOffset = tsOffset + 8;
- metrics[i] = Metric.builder()
- .path(extractString(buffer, pathOffset, pathLength)) // don't intern, may differ
- .ts(buffer.getLong(tsOffset))
- .value(buffer.getDouble(valueOffset))
- .build();
-
- offset = valueOffset + 8;
- }
-
- return new Metrics(type, Arrays.asList((i == count) ? metrics : Arrays.copyOf(metrics, i)));
- }
- else
- {
- return new Metrics(type, Collections.emptyList());
- }
- }
-
-
- @Override
- public void to(ByteBuffer buffer)
- {
- val limit = buffer.limit();
-
- // start writing dynamic fields behind static fields first
- // encode as many metrics as fit into one message, just drop the rest
- int offset = 2;
- val countMax = Math.min(metrics.size(), METRIC_COUNT_MAX);
- int count;
- for (count = 0; count < countMax; ++count)
- {
- val metric = metrics.get(count);
- val pathOffset = offset + 1;
-
- // optimistically write dynamic part (automatically limited by buffer's limit)
- val pathLength = putTruncatedUTF8(buffer, pathOffset, metric.getPath(), STRING_SIZE_MAX);
- if (pathLength < metric.getPath().length())
- {
- // encoded path size can't be shorter than the length in characters, it just doesn't fit anymore
- break;
- }
-
- val tsOffset = pathOffset + pathLength;
- val valueOffset = tsOffset + 8;
- val nextMetricOffset = valueOffset + 8;
- if (nextMetricOffset >= limit)
- {
- // doesn't fit anymore, sorry
- break;
- }
-
- // write static part
- buffer.put(offset, (byte) pathLength);
- buffer.putLong(tsOffset, metric.getTs());
- buffer.putDouble(valueOffset, metric.getValue());
-
- // advance
- offset = nextMetricOffset;
- }
-
- buffer.put(0, (byte) type);
- buffer.put(1, (byte) count);
-
- buffer.position(offset); // include dynamic part
- }
-
-
- @Override
- public int getMaximumSize()
- {
- return MAXIMUM_SERIALIZED_SIZE;
- }
-
-
- @Override
- public String toString()
- {
- return Integer.toHexString(getType());
- }
-
-
- public static class MetricsBuilder
- {
- private static int calculateMetricSize(Metric metric)
- {
- return 1 // path length
- + Math.min(STRING_SIZE_MAX, Utf8.encodedLength(metric.getPath()))
- + 8 // timestamp
- + 8; // value
- }
-
- public MetricsBuilder metrics(Queue metrics)
- {
- int totalSize = 2; // header
-
- // estimate serialized size
- val acceptedMetrics = new Metric[METRIC_COUNT_MAX];
- int i = 0;
- for (Metric metric = metrics.peek(); metric != null; metric = metrics.peek())
- {
- totalSize += calculateMetricSize(metric);
-
- // stop if the next metric will definetely not fit anymore
- if (totalSize >= MAXIMUM_SERIALIZED_SIZE)
- {
- break;
- }
-
- acceptedMetrics[i] = metrics.remove();
- ++i;
- }
-
- this.metrics = Arrays.asList(Arrays.copyOf(acceptedMetrics, i));
-
- return this;
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Payload.java b/src/main/java/net/talpidae/base/insect/message/payload/Payload.java
deleted file mode 100644
index 6d1d03d..0000000
--- a/src/main/java/net/talpidae/base/insect/message/payload/Payload.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message.payload;
-
-import java.nio.ByteBuffer;
-
-
-public abstract class Payload
-{
- /**
- * Return this payloads unique type ID.
- */
- public abstract int getType();
-
- public abstract void to(ByteBuffer buffer);
-
- public abstract int getMaximumSize();
-
- public abstract String toString();
-}
diff --git a/src/main/java/net/talpidae/base/insect/message/payload/PayloadFactory.java b/src/main/java/net/talpidae/base/insect/message/payload/PayloadFactory.java
deleted file mode 100644
index ea46f51..0000000
--- a/src/main/java/net/talpidae/base/insect/message/payload/PayloadFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message.payload;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-
-
-public final class PayloadFactory
-{
- private PayloadFactory()
- {
-
- }
-
- /**
- * Identify and unpack the content stored inside buffer (@offset).
- */
- public static Payload unpackPayload(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException, CharacterCodingException
- {
- Payload payload;
-
- // probe message types (most frequent first)
- if ((payload = Metrics.from(buffer, offset)) != null
- || (payload = Mapping.from(buffer, offset)) != null
- || (payload = Invalidate.from(buffer, offset)) != null
- || (payload = Shutdown.from(buffer, offset)) != null)
- {
- return payload;
- }
-
- return null;
- }
-
-
- /**
- * Get maximum buffer size needed to accommodate all known message types.
- */
- public static int getMaximumSerializedSize()
- {
- return Math.max(Metrics.MAXIMUM_SERIALIZED_SIZE,
- Math.max(Mapping.MAXIMUM_SERIALIZED_SIZE,
- Math.max(Invalidate.MAXIMUM_SERIALIZED_SIZE, Shutdown.MAXIMUM_SERIALIZED_SIZE)));
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/message/payload/Shutdown.java b/src/main/java/net/talpidae/base/insect/message/payload/Shutdown.java
deleted file mode 100644
index d74042f..0000000
--- a/src/main/java/net/talpidae/base/insect/message/payload/Shutdown.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.message.payload;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-import java.nio.ByteBuffer;
-
-
-@Slf4j
-@Builder
-public class Shutdown extends Payload
-{
- public static final int MAXIMUM_SERIALIZED_SIZE = 2;
-
- public static final int TYPE_SHUTDOWN = 0x2;
-
- public static final int MAGIC = 0x86;
-
- @Getter
- @Builder.Default
- private final int type = TYPE_SHUTDOWN; // 0x2: shutdown
-
- @Getter
- @Builder.Default
- private final int magic = MAGIC; // magic byte: 0x86
-
-
- static Shutdown from(ByteBuffer buffer, int offset) throws IndexOutOfBoundsException
- {
- val type = buffer.get(offset) & 0xFF;
- if (type != TYPE_SHUTDOWN)
- {
- return null;
- }
-
- val magic = buffer.get(offset + 1) & 0xFF;
- if (magic != MAGIC)
- {
- log.debug("encountered shutdown payload with invalid magic");
- return null;
- }
-
- return Shutdown.builder()
- .type(type)
- .magic(magic)
- .build();
- }
-
-
- public void to(ByteBuffer buffer)
- {
- buffer.put((byte) type);
- buffer.put((byte) magic);
- }
-
-
- @Override
- public int getMaximumSize()
- {
- return MAXIMUM_SERIALIZED_SIZE;
- }
-
-
- @Override
- public String toString()
- {
- return Integer.toHexString(getType());
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/metrics/MetricsSink.java b/src/main/java/net/talpidae/base/insect/metrics/MetricsSink.java
deleted file mode 100644
index 2b20ee0..0000000
--- a/src/main/java/net/talpidae/base/insect/metrics/MetricsSink.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package net.talpidae.base.insect.metrics;
-
-import lombok.NonNull;
-
-
-public interface MetricsSink
-{
- /**
- * Form a metric tuple from the specified path and value and forward it to remote listeners.
- */
- void forward(@NonNull String path, long timestampMillies, double value);
-}
diff --git a/src/main/java/net/talpidae/base/insect/metrics/QueuedMetricsSink.java b/src/main/java/net/talpidae/base/insect/metrics/QueuedMetricsSink.java
deleted file mode 100644
index a8a51bd..0000000
--- a/src/main/java/net/talpidae/base/insect/metrics/QueuedMetricsSink.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package net.talpidae.base.insect.metrics;
-
-import net.talpidae.base.insect.Slave;
-import net.talpidae.base.insect.config.SlaveSettings;
-import net.talpidae.base.util.performance.Metric;
-import net.talpidae.base.util.thread.GeneralScheduler;
-
-import java.util.ArrayDeque;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-
-@Slf4j
-@Singleton
-public class QueuedMetricsSink implements MetricsSink
-{
- private static final long MAXIMUM_ENQUEUE_DELAY_NANOS = TimeUnit.SECONDS.toNanos(4);
-
- private static final int MAXIMUM_QUEUE_LENGTH = 2048;
-
- /**
- * How many metric elements to leave in the metricQueue without immediately trying to send again.
- */
- private static final int MAXIMUM_DELAYED_ITEM_COUNT = 30;
-
- private final Slave slave;
-
- private final ArrayDeque metricQueue = new ArrayDeque<>(1024);
-
- private final String pathPrefix;
-
- private long lastEnqueueNanos = 0L;
-
-
- @Inject
- public QueuedMetricsSink(Slave slave, SlaveSettings slaveSettings, GeneralScheduler scheduler)
- {
- this.slave = slave;
- this.pathPrefix = "/" + slaveSettings.getName();
-
- scheduler.scheduleWithFixedDelay(this::sendAllMetrics, MAXIMUM_ENQUEUE_DELAY_NANOS, MAXIMUM_ENQUEUE_DELAY_NANOS, TimeUnit.NANOSECONDS);
- }
-
-
- /**
- * Form a metric tuple from the specified path and value and enqueue it for forwarding.
- */
- @Override
- public void forward(@NonNull String path, long timestampMillies, double value)
- {
- val absolutePath = pathPrefix + path;
- val metric = Metric.builder()
- .path(absolutePath.replace("//", "/"))
- .ts(timestampMillies)
- .value(value)
- .build();
-
- synchronized (metricQueue)
- {
- if (metricQueue.size() < MAXIMUM_QUEUE_LENGTH)
- {
- metricQueue.addLast(metric);
- }
- }
- }
-
-
- private int getQueueLength()
- {
- synchronized (metricQueue)
- {
- return metricQueue.size();
- }
- }
-
-
- private boolean isQueueEmpty()
- {
- synchronized (metricQueue)
- {
- return metricQueue.isEmpty();
- }
- }
-
-
- private void sendAllMetrics()
- {
- val now = System.nanoTime();
- if (lastEnqueueNanos < now - MAXIMUM_ENQUEUE_DELAY_NANOS
- && !isQueueEmpty())
- {
- lastEnqueueNanos = now;
-
- try
- {
- do
- {
- synchronized (metricQueue)
- {
- slave.forwardMetrics(metricQueue);
- }
- }
- while (getQueueLength() >= MAXIMUM_DELAYED_ITEM_COUNT);
- }
- catch (Throwable t)
- {
- log.error("failed to forward metrics", t);
- }
- }
- }
-}
diff --git a/src/main/java/net/talpidae/base/insect/state/InsectState.java b/src/main/java/net/talpidae/base/insect/state/InsectState.java
deleted file mode 100644
index 14b1b30..0000000
--- a/src/main/java/net/talpidae/base/insect/state/InsectState.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (C) 2017 Jonas Zeiger
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-package net.talpidae.base.insect.state;
-
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-import lombok.Builder;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Singular;
-import lombok.ToString;
-
-
-/**
- * Things the queen knows about each insect.
- */
-@Getter
-@EqualsAndHashCode
-@ToString
-@Builder
-@RequiredArgsConstructor
-public class InsectState implements ServiceState
-{
- /**
- * InetSocketAddress representing host and port.
- */
- @Getter
- private final InetSocketAddress socketAddress;
-
- /**
- * This service instance's unique name.
- */
- @Getter
- private final transient String name;
-
- /**
- * Service monotonic clock timestamp epoch (remote).
- */
- @Getter
- private final transient long timestampEpochRemote;
-
- /**
- * Service monotonic clock timestamp epoch (local).
- */
- @Getter
- private final transient long timestampEpochLocal;
-
- /**
- * Service monotonic clock timestamp (relative to timestampEpoch).
- */
- @Getter
- private final transient long timestamp;
-
- /**
- * Routes of other services this service depends on.
- */
- @Getter
- @Singular
- private final transient Set dependencies;
-
-
- /**
- * Is this insect out-of-service (ie. won't not be propagated to slaves)?
- */
- @Getter
- private final transient boolean isOutOfService;
-
-
- public static class InsectStateBuilder
- {
- public InsectStateBuilder newEpoch(long nowNanos, long remoteTimestampEpoch)
- {
- return timestampEpochLocal(nowNanos)
- .timestampEpochRemote(remoteTimestampEpoch)
- .timestamp(nowNanos);
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/net/talpidae/base/insect/state/ServiceState.java b/src/main/java/net/talpidae/base/insect/state/ServiceState.java
deleted file mode 100644
index 2e90223..0000000
--- a/src/main/java/net/talpidae/base/insect/state/ServiceState.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package net.talpidae.base.insect.state;
-
-import java.net.InetSocketAddress;
-
-
-public interface ServiceState
-{
- long getTimestamp();
-
- InetSocketAddress getSocketAddress();
-}
diff --git a/src/main/java/net/talpidae/base/server/UndertowServer.java b/src/main/java/net/talpidae/base/server/UndertowServer.java
index 1eb09dc..7b80531 100644
--- a/src/main/java/net/talpidae/base/server/UndertowServer.java
+++ b/src/main/java/net/talpidae/base/server/UndertowServer.java
@@ -46,10 +46,8 @@
import net.talpidae.base.event.ServerShutdown;
import net.talpidae.base.event.ServerStarted;
import net.talpidae.base.event.Shutdown;
-import net.talpidae.base.insect.metrics.MetricsSink;
import net.talpidae.base.server.cors.CORSFilter;
-import net.talpidae.base.server.performance.MemoryMetricCollector;
-import net.talpidae.base.server.performance.MetricsHandler;
+import net.talpidae.base.server.logging.RequestLoggingHandler;
import net.talpidae.base.util.ssl.SslContextFactory;
import net.talpidae.base.util.thread.GeneralScheduler;
import org.xnio.OptionMap;
@@ -89,8 +87,6 @@ public class UndertowServer implements Server
private final ServerEndpointConfig.Configurator defaultServerEndpointConfigurator;
- private final MetricsSink metricsSink;
-
private final GeneralScheduler scheduler;
private Undertow server = null;
@@ -106,7 +102,6 @@ public UndertowServer(EventBus eventBus,
Optional> annotatedEndpointClass,
Optional programmaticEndpointConfig,
Optional defaultServerEndpointConfigurator,
- Optional metricsSink,
GeneralScheduler scheduler)
{
this.serverConfig = serverConfig;
@@ -114,7 +109,6 @@ public UndertowServer(EventBus eventBus,
this.annotatedEndpointClass = annotatedEndpointClass.orElse(null);
this.programmaticEndpointConfig = programmaticEndpointConfig.orElse(null);
this.defaultServerEndpointConfigurator = defaultServerEndpointConfigurator.orElse(null);
- this.metricsSink = metricsSink.orElse(null);
this.eventBus = eventBus;
this.scheduler = scheduler;
@@ -392,7 +386,7 @@ else if (programmaticEndpointConfig != null)
if (serverConfig.isLoggingFeatureEnabled())
{
// enable extensive logging (make sure to disable for production)
- rootHandler = Handlers.requestDump(rootHandler);
+ rootHandler = new RequestLoggingHandler(rootHandler);
}
if (serverConfig.getCorsOriginPattern() != null)
@@ -400,15 +394,6 @@ else if (programmaticEndpointConfig != null)
rootHandler = new CORSFilter(rootHandler, serverConfig);
}
- if (metricsSink != null)
- {
- // enable metrics
- rootHandler = new MetricsHandler(rootHandler, metricsSink);
-
- // TODO Put this somewhere else.
- scheduler.scheduleWithFixedDelay(new MemoryMetricCollector(metricsSink), MEMORY_METRICS_INTERVAL_SECONDS, MEMORY_METRICS_INTERVAL_SECONDS, TimeUnit.SECONDS);
- }
-
// finally, enhance handler with graceful shutdown capability
builder.setHandler(this.rootHandler = Handlers.gracefulShutdown(rootHandler));
}
diff --git a/src/main/java/net/talpidae/base/server/logging/RequestLoggingHandler.java b/src/main/java/net/talpidae/base/server/logging/RequestLoggingHandler.java
new file mode 100644
index 0000000..93c3545
--- /dev/null
+++ b/src/main/java/net/talpidae/base/server/logging/RequestLoggingHandler.java
@@ -0,0 +1,138 @@
+package net.talpidae.base.server.logging;
+
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.Headers;
+import io.undertow.util.HttpString;
+import lombok.extern.java.Log;
+import lombok.val;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+
+/**
+ * Similar to RequestDumpingHandler but avoids DNS reverse lookups caused by InetSocketAddress.getHostName().
+ */
+@Log
+public class RequestLoggingHandler implements HttpHandler
+{
+ private static final HttpString USER_AGENT = new HttpString("user-agent");
+
+ private static final HttpString AUTHORIZATION = new HttpString("authorization");
+
+ private final HttpHandler next;
+
+ public RequestLoggingHandler(final HttpHandler next)
+ {
+ this.next = next;
+ }
+
+ @Override
+ public void handleRequest(final HttpServerExchange exchange) throws Exception
+ {
+ final StringBuilder sb = new StringBuilder();
+
+ val forwardedFor = exchange.getRequestHeaders().getFirst(Headers.X_FORWARDED_FOR);
+ if (forwardedFor != null)
+ {
+ sb.append(forwardedFor);
+ }
+ else
+ {
+ sb.append(exchange.getSourceAddress().getHostString());
+ }
+
+ sb.append(' ');
+ sb.append(exchange.getProtocol());
+ sb.append(' ');
+ sb.append(exchange.getRequestMethod());
+ sb.append(' ');
+ sb.append(exchange.getRequestURI());
+ if (!"".equals(exchange.getQueryString()))
+ {
+ sb.append('?');
+ sb.append(exchange.getQueryString());
+ }
+
+ exchange.addExchangeCompleteListener((completedExchange, nextListener) -> {
+
+ sb.append(" -> status=");
+ sb.append(completedExchange.getStatusCode());
+ sb.append(" tx=");
+ sb.append(completedExchange.getResponseBytesSent());
+
+ val userAgent = completedExchange.getRequestHeaders().getFirst(USER_AGENT);
+ if (userAgent != null)
+ {
+ sb.append(" ua=");
+ sb.append(userAgent);
+ }
+
+ // extra a possible "sub" (subject) field from an "Authoriztation: Bearer" JWT token, if one exists
+ val authorization = completedExchange.getRequestHeaders().getFirst(USER_AGENT);
+ if (authorization != null)
+ {
+ // TODO: Also support logging Basic Auth username and other schemes/token formats
+ val subject = extractJwtSub(authorization);
+ if (subject != null)
+ {
+ sb.append(" sub=");
+ sb.append(subject);
+ }
+ }
+
+ nextListener.proceed();
+ log.info(sb.toString());
+ });
+
+ next.handleRequest(exchange);
+ }
+
+
+ private static String extractJwtSub(String authorization)
+ {
+ if (authorization != null)
+ {
+ int begin = authorization.indexOf("Bearer");
+ if (begin >= 0)
+ {
+ begin = authorization.indexOf('.', begin + 6);
+ if (begin > 0)
+ {
+ int end = authorization.indexOf('.', begin + 1);
+ if (end > begin)
+ {
+ try
+ {
+ String jwt = new String(Base64.getDecoder().decode(authorization.substring(begin + 1, end)), StandardCharsets.UTF_8);
+ begin = jwt.indexOf("\"sub\"");
+ if (begin > 0)
+ {
+ begin = jwt.indexOf(":", begin + 5);
+ if (begin > 0)
+ {
+ begin = jwt.indexOf('\"', begin + 1);
+ if (begin > 0)
+ {
+ end = jwt.indexOf('\"', begin + 1);
+ if (end > begin)
+ {
+ return jwt.substring(begin + 1, end);
+ }
+ }
+ }
+ }
+ }
+ catch (RuntimeException e)
+ {
+ // ignore, just return null
+ }
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/src/main/java/net/talpidae/base/server/performance/MemoryMetricCollector.java b/src/main/java/net/talpidae/base/server/performance/MemoryMetricCollector.java
deleted file mode 100644
index 136b1f2..0000000
--- a/src/main/java/net/talpidae/base/server/performance/MemoryMetricCollector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package net.talpidae.base.server.performance;
-
-
-import net.talpidae.base.insect.metrics.MetricsSink;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-
-
-/**
- * Collect heap and non-heap memory statistics and forward them using the specified MetricsSink instance.
- */
-@Slf4j
-public class MemoryMetricCollector implements Runnable
-{
- private final MetricsSink metricsSink;
-
- private final MemoryMXBean memoryMXBean;
-
-
- public MemoryMetricCollector(MetricsSink metricsSink)
- {
- this.metricsSink = metricsSink;
- this.memoryMXBean = ManagementFactory.getMemoryMXBean();
- }
-
-
- @Override
- public void run()
- {
- try
- {
- val ts = System.currentTimeMillis();
- val heapCommitted = ((double) memoryMXBean.getHeapMemoryUsage().getCommitted()) / 1024 / 1024; // MBytes
- val nonHeapCommitted = ((double) memoryMXBean.getNonHeapMemoryUsage().getCommitted()) / 1024 / 1024;
-
- metricsSink.forward("/heapCommitted", ts, heapCommitted);
- metricsSink.forward("/nonHeapCommitted", ts, nonHeapCommitted);
- }
- catch (Throwable t)
- {
- log.error("memory metrics collection failed", t);
- }
- }
-}
diff --git a/src/main/java/net/talpidae/base/server/performance/MetricsHandler.java b/src/main/java/net/talpidae/base/server/performance/MetricsHandler.java
deleted file mode 100644
index f51f50a..0000000
--- a/src/main/java/net/talpidae/base/server/performance/MetricsHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package net.talpidae.base.server.performance;
-
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.AttachmentKey;
-import lombok.Getter;
-import lombok.val;
-import net.talpidae.base.insect.metrics.MetricsSink;
-
-import java.util.concurrent.TimeUnit;
-
-
-public class MetricsHandler implements HttpHandler
-{
- private static final AttachmentKey EXCHANGE_METRIC = AttachmentKey.create(ExchangeMetric.class);
-
- private final HttpHandler next;
-
- private final MetricsSink metricsSink;
-
-
- public MetricsHandler(HttpHandler next, MetricsSink metricsSink)
- {
- this.next = next;
- this.metricsSink = metricsSink;
- }
-
-
- @Override
- public void handleRequest(HttpServerExchange exchange) throws Exception
- {
- if (!exchange.isComplete())
- {
- exchange.putAttachment(EXCHANGE_METRIC, new ExchangeMetric(exchange.getRelativePath()));
-
- exchange.addExchangeCompleteListener((completedExchange, nextListener) ->
- {
- final Runnable finishRequest = () ->
- {
- val exchangeMetric = completedExchange.getAttachment(EXCHANGE_METRIC);
-
- exchangeMetric.complete(completedExchange);
- forwardRequestMetric(exchangeMetric);
-
- nextListener.proceed();
- };
-
- if (completedExchange.isInIoThread())
- {
- completedExchange.dispatch(finishRequest);
- }
- else
- {
- finishRequest.run();
- }
- });
- }
- next.handleRequest(exchange);
- }
-
-
- private void forwardRequestMetric(ExchangeMetric exchangeMetric)
- {
- val ts = exchangeMetric.getTimestampMillies();
- metricsSink.forward(exchangeMetric.getPath() + "/duration", ts, exchangeMetric.getDuration());
- metricsSink.forward(exchangeMetric.getPath() + "/status", ts, exchangeMetric.getStatusCode());
- }
-
-
- private static class ExchangeMetric
- {
- private static final double NANOSECONDS_TO_FRACTIONAL_SECONDS_MULTIPLIER = 1.0 / TimeUnit.SECONDS.toNanos(1);
-
- private final long startNanos;
-
- @Getter
- private final long timestampMillies;
-
- @Getter
- private final String path;
-
- @Getter
- private double duration;
-
- @Getter
- private double statusCode;
-
-
- private ExchangeMetric(String path)
- {
- this.timestampMillies = System.currentTimeMillis();
- this.startNanos = System.nanoTime();
- this.path = path;
- }
-
-
- private void complete(HttpServerExchange exchange)
- {
- val endNanos = System.nanoTime();
-
- duration = (double) (endNanos - startNanos) * NANOSECONDS_TO_FRACTIONAL_SECONDS_MULTIPLIER;
- statusCode = exchange.getStatusCode();
- }
- }
-}
\ No newline at end of file