From c3fa704ab12442656be7d26598980371ae4f4bd5 Mon Sep 17 00:00:00 2001 From: ItsKev Date: Fri, 2 Jan 2026 20:17:26 +0100 Subject: [PATCH] fix: improve service discovery to use redis hashes --- .../gg/grounds/discovery/DiscoveryKeys.kt | 6 +-- .../gg/grounds/discovery/PaperServerEntry.kt | 14 +++-- .../gg/grounds/discovery/ValkeyClient.kt | 32 +++-------- .../gg/grounds/discovery/DiscoveryKeysTest.kt | 9 +--- .../grounds/discovery/PaperServerEntryTest.kt | 10 ++-- .../discovery/PaperDiscoveryPublisher.kt | 7 +-- .../grounds/GroundsPluginServerDiscovery.kt | 5 +- .../discovery/VelocityDiscoveryService.kt | 54 +++++++------------ 8 files changed, 51 insertions(+), 86 deletions(-) diff --git a/common/src/main/kotlin/gg/grounds/discovery/DiscoveryKeys.kt b/common/src/main/kotlin/gg/grounds/discovery/DiscoveryKeys.kt index edbd493..dfe09eb 100644 --- a/common/src/main/kotlin/gg/grounds/discovery/DiscoveryKeys.kt +++ b/common/src/main/kotlin/gg/grounds/discovery/DiscoveryKeys.kt @@ -1,9 +1,7 @@ package gg.grounds.discovery object DiscoveryKeys { - private const val PAPER_PREFIX = "grounds:server-discovery:paper:" + private const val PAPER_HASH_KEY = "grounds:server-discovery:paper" - fun paperServerKey(serverName: String): String = "$PAPER_PREFIX$serverName" - - fun paperServerPattern(): String = "$PAPER_PREFIX*" + fun paperServersHashKey(): String = PAPER_HASH_KEY } diff --git a/common/src/main/kotlin/gg/grounds/discovery/PaperServerEntry.kt b/common/src/main/kotlin/gg/grounds/discovery/PaperServerEntry.kt index f27798e..10f69db 100644 --- a/common/src/main/kotlin/gg/grounds/discovery/PaperServerEntry.kt +++ b/common/src/main/kotlin/gg/grounds/discovery/PaperServerEntry.kt @@ -1,8 +1,13 @@ package gg.grounds.discovery -data class PaperServerEntry(val name: String, val host: String, val port: Int) { +data class PaperServerEntry( + val name: String, + val host: String, + val port: Int, + val lastSeenMillis: Long = 0L, +) { - fun encode(): String = "$name|$host|$port" + fun encode(): String = "$name|$host|$port|$lastSeenMillis" companion object { @JvmStatic @@ -12,15 +17,16 @@ data class PaperServerEntry(val name: String, val host: String, val port: Int) { } val parts = value.split('|') - if (parts.size != 3) { + if (parts.size != 4) { return null } val name = parts[0].trim().takeIf { it.isNotEmpty() } ?: return null val host = parts[1].trim().takeIf { it.isNotEmpty() } ?: return null val port = parts[2].trim().toIntOrNull() ?: return null + val lastSeenMillis = parts[3].trim().toLongOrNull() ?: return null - return PaperServerEntry(name, host, port) + return PaperServerEntry(name, host, port, lastSeenMillis) } } } diff --git a/common/src/main/kotlin/gg/grounds/discovery/ValkeyClient.kt b/common/src/main/kotlin/gg/grounds/discovery/ValkeyClient.kt index c2c606f..67c509c 100644 --- a/common/src/main/kotlin/gg/grounds/discovery/ValkeyClient.kt +++ b/common/src/main/kotlin/gg/grounds/discovery/ValkeyClient.kt @@ -2,8 +2,6 @@ package gg.grounds.discovery import redis.clients.jedis.ConnectionPoolConfig import redis.clients.jedis.RedisClient -import redis.clients.jedis.params.ScanParams -import redis.clients.jedis.resps.ScanResult class ValkeyClient(config: ValkeyConfig) : AutoCloseable { @@ -19,34 +17,16 @@ class ValkeyClient(config: ValkeyConfig) : AutoCloseable { ) .build() - fun setWithTtl(key: String, value: String, ttlSeconds: Long) { - client.setex(key, ttlSeconds, value) + fun hset(key: String, field: String, value: String) { + client.hset(key, field, value) } - fun delete(key: String) { - client.del(key) + fun hgetAll(key: String): Map { + return client.hgetAll(key) } - fun scanValues(pattern: String): Map { - val params = ScanParams().match(pattern) - val values = mutableMapOf() - - var cursor = ScanParams.SCAN_POINTER_START - do { - val page: ScanResult = client.scan(cursor, params) - cursor = page.cursor - - val keys = page.result - if (keys.isEmpty()) continue - - val fetchedValues = client.mget(*keys.toTypedArray()) - for (i in keys.indices) { - val value = fetchedValues[i] - if (value != null) values[keys[i]] = value - } - } while (cursor != ScanParams.SCAN_POINTER_START) - - return values + fun hdel(key: String, field: String) { + client.hdel(key, field) } override fun close() { diff --git a/common/src/test/kotlin/gg/grounds/discovery/DiscoveryKeysTest.kt b/common/src/test/kotlin/gg/grounds/discovery/DiscoveryKeysTest.kt index 6f81744..ecb6a66 100644 --- a/common/src/test/kotlin/gg/grounds/discovery/DiscoveryKeysTest.kt +++ b/common/src/test/kotlin/gg/grounds/discovery/DiscoveryKeysTest.kt @@ -5,12 +5,7 @@ import org.junit.jupiter.api.Test class DiscoveryKeysTest { @Test - fun paperServerKeyBuildsExpectedPrefix() { - assertEquals("grounds:server-discovery:paper:lobby", DiscoveryKeys.paperServerKey("lobby")) - } - - @Test - fun paperServerPatternUsesPrefixWildcard() { - assertEquals("grounds:server-discovery:paper:*", DiscoveryKeys.paperServerPattern()) + fun paperServersHashKeyMatchesExpectedValue() { + assertEquals("grounds:server-discovery:paper", DiscoveryKeys.paperServersHashKey()) } } diff --git a/common/src/test/kotlin/gg/grounds/discovery/PaperServerEntryTest.kt b/common/src/test/kotlin/gg/grounds/discovery/PaperServerEntryTest.kt index 62f762d..53c227f 100644 --- a/common/src/test/kotlin/gg/grounds/discovery/PaperServerEntryTest.kt +++ b/common/src/test/kotlin/gg/grounds/discovery/PaperServerEntryTest.kt @@ -8,14 +8,14 @@ class PaperServerEntryTest { @Test fun encodeFormatsWithPipes() { - val entry = PaperServerEntry("lobby", "127.0.0.1", 25565) - assertEquals("lobby|127.0.0.1|25565", entry.encode()) + val entry = PaperServerEntry("lobby", "127.0.0.1", 25565, 1234L) + assertEquals("lobby|127.0.0.1|25565|1234", entry.encode()) } @Test fun decodeTrimsAndParsesValues() { - val decoded = PaperServerEntry.decode(" lobby | 127.0.0.1 | 25565 ") - assertEquals(PaperServerEntry("lobby", "127.0.0.1", 25565), decoded) + val decoded = PaperServerEntry.decode(" lobby | 127.0.0.1 | 25565 | 555 ") + assertEquals(PaperServerEntry("lobby", "127.0.0.1", 25565, 555L), decoded) } @Test @@ -26,5 +26,7 @@ class PaperServerEntryTest { assertNull(PaperServerEntry.decode("||25565")) assertNull(PaperServerEntry.decode("lobby||25565")) assertNull(PaperServerEntry.decode("lobby|host|not-a-number")) + assertNull(PaperServerEntry.decode("lobby|host|25565")) + assertNull(PaperServerEntry.decode("lobby|host|25565|not-a-number")) } } diff --git a/paper/src/main/kotlin/gg/grounds/discovery/PaperDiscoveryPublisher.kt b/paper/src/main/kotlin/gg/grounds/discovery/PaperDiscoveryPublisher.kt index 9818c45..809f6ce 100644 --- a/paper/src/main/kotlin/gg/grounds/discovery/PaperDiscoveryPublisher.kt +++ b/paper/src/main/kotlin/gg/grounds/discovery/PaperDiscoveryPublisher.kt @@ -8,7 +8,7 @@ class PaperDiscoveryPublisher( private val config: PaperDiscoveryConfig, ) { private val client = ValkeyClient(config.valkeyConfig) - private val entryKey = DiscoveryKeys.paperServerKey(config.baseEntry.name) + private val entryHashKey = DiscoveryKeys.paperServersHashKey() private var heartbeatTask: BukkitTask? = null fun start() { @@ -20,7 +20,7 @@ class PaperDiscoveryPublisher( fun stop() { heartbeatTask?.cancel() try { - client.delete(entryKey) + client.hdel(entryHashKey, config.baseEntry.name) } catch (e: Exception) { plugin.logger.warning("Failed to remove discovery entry: ${e.message}") } finally { @@ -34,7 +34,8 @@ class PaperDiscoveryPublisher( private fun updateEntry() { try { - client.setWithTtl(entryKey, config.baseEntry.encode(), 3L) + val entry = config.baseEntry.copy(lastSeenMillis = System.currentTimeMillis()) + client.hset(entryHashKey, entry.name, entry.encode()) } catch (e: Exception) { plugin.logger.warning("Failed to update discovery entry: ${e.message}") } diff --git a/velocity/src/main/kotlin/gg/grounds/GroundsPluginServerDiscovery.kt b/velocity/src/main/kotlin/gg/grounds/GroundsPluginServerDiscovery.kt index 6c3eac0..decc437 100644 --- a/velocity/src/main/kotlin/gg/grounds/GroundsPluginServerDiscovery.kt +++ b/velocity/src/main/kotlin/gg/grounds/GroundsPluginServerDiscovery.kt @@ -42,9 +42,10 @@ constructor(private val proxyServer: ProxyServer, private val logger: Logger) { @Subscribe fun onPlayerChooseInitialServer(event: PlayerChooseInitialServerEvent) { - if (event.initialServer.isPresent) { + if (event.initialServer.isPresent || proxyServer.allServers.isEmpty()) { return } - discoveryService.pickInitialServer().ifPresent(event::setInitialServer) + + event.setInitialServer(proxyServer.allServers.first()) } } diff --git a/velocity/src/main/kotlin/gg/grounds/discovery/VelocityDiscoveryService.kt b/velocity/src/main/kotlin/gg/grounds/discovery/VelocityDiscoveryService.kt index 88c444e..1c056ca 100644 --- a/velocity/src/main/kotlin/gg/grounds/discovery/VelocityDiscoveryService.kt +++ b/velocity/src/main/kotlin/gg/grounds/discovery/VelocityDiscoveryService.kt @@ -1,12 +1,9 @@ package gg.grounds.discovery import com.velocitypowered.api.proxy.ProxyServer -import com.velocitypowered.api.proxy.server.RegisteredServer import com.velocitypowered.api.proxy.server.ServerInfo import com.velocitypowered.api.scheduler.ScheduledTask import java.net.InetSocketAddress -import java.util.* -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import org.slf4j.Logger @@ -16,7 +13,6 @@ class VelocityDiscoveryService( config: VelocityDiscoveryConfig, ) { private val client = ValkeyClient(config.valkeyConfig) - private val registeredServers: MutableMap = ConcurrentHashMap() private var pollTask: ScheduledTask? = null fun start(plugin: Any) { @@ -24,14 +20,13 @@ class VelocityDiscoveryService( pollTask = proxyServer.scheduler .buildTask(plugin, this::refreshServers) - .repeat(5, TimeUnit.SECONDS) + .repeat(1, TimeUnit.SECONDS) .schedule() } fun stop() { pollTask?.cancel() - registeredServers.values.forEach { info -> proxyServer.unregisterServer(info) } - registeredServers.clear() + try { client.close() } catch (e: Exception) { @@ -39,26 +34,29 @@ class VelocityDiscoveryService( } } - fun pickInitialServer(): Optional { - return registeredServers.values.firstOrNull()?.let { proxyServer.getServer(it.name) } - ?: Optional.empty() - } - private fun refreshServers() { val entries = try { - client.scanValues(DiscoveryKeys.paperServerPattern()) + client.hgetAll(DiscoveryKeys.paperServersHashKey()) } catch (e: Exception) { - logger.warn("Failed to scan Valkey for servers: {}", e.message) + logger.warn("Failed to load discovery entries from Valkey: {}", e.message) return } + val now = System.currentTimeMillis() + val staleThresholdMillis = TimeUnit.SECONDS.toMillis(15) val servers = HashSet() - for ((key, value) in entries) { + for ((field, value) in entries) { val parsed = PaperServerEntry.decode(value) if (parsed == null) { - logger.warn("Invalid discovery entry for key {}", key) + logger.warn("Invalid discovery entry for field {}", field) + continue + } + val lastSeen = parsed.lastSeenMillis + if (lastSeen <= 0L || now - lastSeen > staleThresholdMillis) { + logger.info("Removing stale discovery entry for server {}", parsed.name) + client.hdel(DiscoveryKeys.paperServersHashKey(), field) continue } servers.add(parsed.name) @@ -69,13 +67,10 @@ class VelocityDiscoveryService( } private fun unregisterMissing(aliveNames: Set) { - val iterator = registeredServers.entries.iterator() - while (iterator.hasNext()) { - val existing = iterator.next() - if (!aliveNames.contains(existing.key)) { - proxyServer.unregisterServer(existing.value) - iterator.remove() - logger.info("Unregistered stale server {}", existing.key) + for (existing in proxyServer.allServers) { + if (!aliveNames.contains(existing.serverInfo.name)) { + proxyServer.unregisterServer(existing.serverInfo) + logger.info("Unregistered stale server {}", existing.serverInfo.name) } } } @@ -85,17 +80,6 @@ class VelocityDiscoveryService( val address = InetSocketAddress.createUnresolved(entry.host, entry.port) val desiredInfo = ServerInfo(name, address) - val existing = registeredServers[name] - if (existing != null) { - if (existing.address != address) { - proxyServer.unregisterServer(existing) - proxyServer.registerServer(desiredInfo) - registeredServers[name] = desiredInfo - logger.info("Updated discovered server {}", name) - } - return - } - val proxyExisting = proxyServer.getServer(name) if (proxyExisting.isPresent) { val existingInfo = proxyExisting.get().serverInfo @@ -106,12 +90,10 @@ class VelocityDiscoveryService( ) return } - registeredServers[name] = existingInfo return } proxyServer.registerServer(desiredInfo) - registeredServers[name] = desiredInfo logger.info("Registered discovered server {}", name) } }