From 9c3803cb1bb494dfb54bd4700f1243546455aad5 Mon Sep 17 00:00:00 2001 From: Lukas Jost Date: Sun, 25 Jan 2026 23:24:30 +0100 Subject: [PATCH 1/3] feat: add player hearbeat processing --- build.gradle.kts | 3 +- .../grounds/api/PlayerPresenceGrpcService.kt | 107 +++++++++++++++- .../kotlin/gg/grounds/domain/PlayerSession.kt | 2 +- .../persistence/PlayerSessionRepository.kt | 84 +++++++++++-- .../grounds/presence/PlayerSessionCleanup.kt | 27 ++++ src/main/resources/application.properties | 4 + .../migration/V1__create_player_sessions.sql | 6 +- .../api/PlayerPresenceGrpcServiceTest.kt | 116 +++++++++++++++++- src/test/resources/application.properties | 3 +- 9 files changed, 336 insertions(+), 16 deletions(-) create mode 100644 src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt diff --git a/build.gradle.kts b/build.gradle.kts index 625fde9..7353ec3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -21,7 +21,8 @@ dependencies { implementation("io.quarkus:quarkus-jdbc-postgresql") implementation("io.quarkus:quarkus-flyway") implementation("io.quarkus:quarkus-kotlin") - implementation("gg.grounds:library-grpc-contracts-player:0.1.0") + implementation("io.quarkus:quarkus-scheduler") + implementation("gg.grounds:library-grpc-contracts-player:feat-player-heartbeat-SNAPSHOT") compileOnly("com.google.protobuf:protobuf-kotlin") diff --git a/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt b/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt index d187b2f..f129a77 100644 --- a/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt +++ b/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt @@ -2,6 +2,8 @@ package gg.grounds.api import gg.grounds.domain.PlayerSession import gg.grounds.grpc.player.LoginStatus +import gg.grounds.grpc.player.PlayerHeartbeatBatchReply +import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest import gg.grounds.grpc.player.PlayerLoginReply import gg.grounds.grpc.player.PlayerLoginRequest import gg.grounds.grpc.player.PlayerLogoutReply @@ -13,8 +15,10 @@ import io.quarkus.grpc.GrpcService import io.smallrye.common.annotation.Blocking import io.smallrye.mutiny.Uni import jakarta.inject.Inject +import java.time.Duration import java.time.Instant import java.util.UUID +import org.eclipse.microprofile.config.inject.ConfigProperty import org.jboss.logging.Logger @GrpcService @@ -22,6 +26,9 @@ import org.jboss.logging.Logger class PlayerPresenceGrpcService @Inject constructor(private val repository: PlayerSessionRepository) : PlayerPresenceService { + @ConfigProperty(name = "grounds.player.sessions.ttl", defaultValue = "90s") + lateinit var sessionTtl: Duration + override fun tryPlayerLogin(request: PlayerLoginRequest): Uni { return Uni.createFrom().item { handleLogin(request) } } @@ -30,6 +37,12 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer return Uni.createFrom().item { handleLogout(request) } } + override fun playerHeartbeatBatch( + request: PlayerHeartbeatBatchRequest + ): Uni { + return Uni.createFrom().item { handleHeartbeatBatch(request) } + } + private fun handleLogin(request: PlayerLoginRequest): PlayerLoginReply { val playerId = parsePlayerId(request.playerId) @@ -38,10 +51,11 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer .setMessage("player_id must be a UUID") .build() - val session = PlayerSession(playerId, Instant.now()) + val now = Instant.now() + val session = PlayerSession(playerId, now, now) val inserted = repository.insertSession(session) if (inserted) { - LOG.infof("Player %s logged in", playerId) + LOG.infof("Player session created (playerId=%s, result=accepted)", playerId) return PlayerLoginReply.newBuilder() .setStatus(LoginStatus.LOGIN_STATUS_ACCEPTED) .setMessage("player accepted") @@ -50,19 +64,88 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer val existing = repository.findByPlayerId(playerId) if (existing != null) { - LOG.infof("Player %s rejected: already online", playerId) + if (isStale(existing, now)) { + val removed = repository.deleteSession(playerId) + if (removed == DeleteSessionResult.ERROR) { + return PlayerLoginReply.newBuilder() + .setStatus(LoginStatus.LOGIN_STATUS_ERROR) + .setMessage("unable to remove stale player session") + .build() + } + if (removed == DeleteSessionResult.REMOVED) { + LOG.infof( + "Player session expired (playerId=%s, lastSeenAt=%s)", + playerId, + existing.lastSeenAt, + ) + } + if (removed == DeleteSessionResult.NOT_FOUND) { + LOG.infof("Player session missing during stale cleanup (playerId=%s)", playerId) + } + if (repository.insertSession(session)) { + LOG.infof("Player session created (playerId=%s, result=accepted)", playerId) + return PlayerLoginReply.newBuilder() + .setStatus(LoginStatus.LOGIN_STATUS_ACCEPTED) + .setMessage("player accepted") + .build() + } + } + + LOG.infof("Player session rejected (playerId=%s, reason=already_online)", playerId) return PlayerLoginReply.newBuilder() .setStatus(LoginStatus.LOGIN_STATUS_ALREADY_ONLINE) .setMessage("player already online") .build() } + LOG.errorf("Player session verification failed (playerId=%s)", playerId) return PlayerLoginReply.newBuilder() .setStatus(LoginStatus.LOGIN_STATUS_ERROR) .setMessage("unable to verify player session") .build() } + private fun handleHeartbeatBatch( + request: PlayerHeartbeatBatchRequest + ): PlayerHeartbeatBatchReply { + val playerIds = + parsePlayerIds(request.playerIdsList) + ?: return PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(0) + .setMissing(0) + .setMessage("player_ids must be UUIDs") + .also { + LOG.warnf( + "Player heartbeat batch rejected (count=%d, reason=invalid_player_ids)", + request.playerIdsList.size, + ) + } + .build() + + if (playerIds.isEmpty()) { + LOG.debugf("Player heartbeat batch skipped (count=0, reason=empty_request)") + return PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(0) + .setMissing(0) + .setMessage("no player ids provided") + .build() + } + + val updated = repository.touchSessions(playerIds, Instant.now()) + val missing = (playerIds.size - updated).coerceAtLeast(0) + LOG.debugf( + "Player heartbeat batch processed (count=%d, updated=%d, missing=%d)", + playerIds.size, + updated, + missing, + ) + return PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(updated) + .setMissing(missing) + .setMessage("heartbeat accepted") + .build() + } + private fun handleLogout(request: PlayerLogoutRequest): PlayerLogoutReply { val playerId = parsePlayerId(request.playerId) @@ -73,7 +156,7 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer return when (repository.deleteSession(playerId)) { DeleteSessionResult.REMOVED -> { - LOG.infof("Player %s logged out", playerId) + LOG.infof("Player session removed (playerId=%s, result=logout)", playerId) PlayerLogoutReply.newBuilder().setRemoved(true).setMessage("player removed").build() } DeleteSessionResult.NOT_FOUND -> @@ -96,6 +179,22 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer ?.let { runCatching { UUID.fromString(it) }.getOrNull() } } + private fun parsePlayerIds(values: List): List? { + if (values.isEmpty()) { + return emptyList() + } + val trimmed = values.map { it.trim() } + if (trimmed.any { it.isEmpty() }) { + return null + } + val parsed = trimmed.map { runCatching { UUID.fromString(it) }.getOrNull() } + return if (parsed.any { it == null }) null else parsed.filterNotNull() + } + + private fun isStale(session: PlayerSession, now: Instant): Boolean { + return session.lastSeenAt.isBefore(now.minus(sessionTtl)) + } + companion object { private val LOG = Logger.getLogger(PlayerPresenceGrpcService::class.java) } diff --git a/src/main/kotlin/gg/grounds/domain/PlayerSession.kt b/src/main/kotlin/gg/grounds/domain/PlayerSession.kt index 9c75e0c..f2e6598 100644 --- a/src/main/kotlin/gg/grounds/domain/PlayerSession.kt +++ b/src/main/kotlin/gg/grounds/domain/PlayerSession.kt @@ -3,4 +3,4 @@ package gg.grounds.domain import java.time.Instant import java.util.UUID -data class PlayerSession(val playerId: UUID, val connectedAt: Instant) +data class PlayerSession(val playerId: UUID, val connectedAt: Instant, val lastSeenAt: Instant) diff --git a/src/main/kotlin/gg/grounds/persistence/PlayerSessionRepository.kt b/src/main/kotlin/gg/grounds/persistence/PlayerSessionRepository.kt index 3efe97e..fd33d91 100644 --- a/src/main/kotlin/gg/grounds/persistence/PlayerSessionRepository.kt +++ b/src/main/kotlin/gg/grounds/persistence/PlayerSessionRepository.kt @@ -6,6 +6,7 @@ import jakarta.inject.Inject import java.sql.ResultSet import java.sql.SQLException import java.sql.Timestamp +import java.time.Instant import java.util.UUID import javax.sql.DataSource import org.jboss.logging.Logger @@ -24,11 +25,16 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo connection.prepareStatement(INSERT_SESSION).use { statement -> statement.setObject(1, session.playerId) statement.setTimestamp(2, Timestamp.from(session.connectedAt)) + statement.setTimestamp(3, Timestamp.from(session.lastSeenAt)) statement.executeUpdate() > 0 } } } catch (error: SQLException) { - LOG.errorf(error, "Failed to insert player session for %s", session.playerId) + LOG.errorf( + error, + "Player session insert failed (playerId=%s, reason=sql_error)", + session.playerId, + ) false } } @@ -44,7 +50,11 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo } } } catch (error: SQLException) { - LOG.errorf(error, "Failed to fetch player session for %s", playerId) + LOG.errorf( + error, + "Player session fetch failed (playerId=%s, reason=sql_error)", + playerId, + ) null } } @@ -59,11 +69,57 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo } } } catch (error: SQLException) { - LOG.errorf(error, "Failed to delete player session for %s", playerId) + LOG.errorf( + error, + "Player session delete failed (playerId=%s, reason=sql_error)", + playerId, + ) DeleteSessionResult.ERROR } } + fun touchSessions(playerIds: Collection, lastSeenAt: Instant): Int { + if (playerIds.isEmpty()) { + return 0 + } + + return try { + dataSource.connection.use { connection -> + connection.prepareStatement(UPDATE_LAST_SEEN_BATCH).use { statement -> + statement.setTimestamp(1, Timestamp.from(lastSeenAt)) + val array = connection.createArrayOf("uuid", playerIds.toTypedArray()) + statement.setArray(2, array) + statement.executeUpdate() + } + } + } catch (error: SQLException) { + LOG.errorf( + error, + "Player session batch update failed (count=%d, reason=sql_error)", + playerIds.size, + ) + 0 + } + } + + fun deleteStaleSessions(cutoff: Instant): Int { + return try { + dataSource.connection.use { connection -> + connection.prepareStatement(DELETE_STALE).use { statement -> + statement.setTimestamp(1, Timestamp.from(cutoff)) + statement.executeUpdate() + } + } + } catch (error: SQLException) { + LOG.errorf( + error, + "Stale player session cleanup failed (cutoff=%s, reason=sql_error)", + cutoff, + ) + 0 + } + } + private fun mapSession(resultSet: ResultSet): PlayerSession { val playerId = requireNotNull(resultSet.getObject("player_id", UUID::class.java)) { @@ -72,7 +128,10 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo val connectedAt = requireNotNull(resultSet.getTimestamp("connected_at")) { "connected_at is null" } .toInstant() - return PlayerSession(playerId, connectedAt) + val lastSeenAt = + requireNotNull(resultSet.getTimestamp("last_seen_at")) { "last_seen_at is null" } + .toInstant() + return PlayerSession(playerId, connectedAt, lastSeenAt) } companion object { @@ -80,13 +139,13 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo private const val INSERT_SESSION = """ - INSERT INTO player_sessions (player_id, connected_at) - VALUES (?, ?) + INSERT INTO player_sessions (player_id, connected_at, last_seen_at) + VALUES (?, ?, ?) ON CONFLICT (player_id) DO NOTHING """ private const val SELECT_BY_PLAYER = """ - SELECT player_id, connected_at + SELECT player_id, connected_at, last_seen_at FROM player_sessions WHERE player_id = ? """ @@ -95,5 +154,16 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo DELETE FROM player_sessions WHERE player_id = ? """ + private const val UPDATE_LAST_SEEN_BATCH = + """ + UPDATE player_sessions + SET last_seen_at = ? + WHERE player_id = ANY(?) + """ + private const val DELETE_STALE = + """ + DELETE FROM player_sessions + WHERE last_seen_at < ? + """ } } diff --git a/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt b/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt new file mode 100644 index 0000000..c878bd5 --- /dev/null +++ b/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt @@ -0,0 +1,27 @@ +package gg.grounds.presence + +import gg.grounds.persistence.PlayerSessionRepository +import io.quarkus.scheduler.Scheduled +import jakarta.enterprise.context.ApplicationScoped +import jakarta.inject.Inject +import java.time.Duration +import java.time.Instant +import org.eclipse.microprofile.config.inject.ConfigProperty +import org.jboss.logging.Logger + +@ApplicationScoped +class PlayerSessionCleanup @Inject constructor(private val repository: PlayerSessionRepository) { + @ConfigProperty(name = "grounds.player.sessions.ttl", defaultValue = "90s") + private lateinit var sessionTtl: Duration + + @Scheduled(every = "{grounds.player.sessions.cleanup-interval}") + fun expireStaleSessions() { + val cutoff = Instant.now().minus(sessionTtl) + val removed = repository.deleteStaleSessions(cutoff) + LOG.infof("Player session cleanup completed (removed=%d, cutoff=%s)", removed, cutoff) + } + + companion object { + private val LOG = Logger.getLogger(PlayerSessionCleanup::class.java) + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 6d15160..15886c1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,6 +1,7 @@ quarkus.grpc.server.use-separate-server=false quarkus.http.host=0.0.0.0 quarkus.http.port=9000 +quarkus.log.level=DEBUG quarkus.generate-code.grpc.scan-for-proto=gg.grounds:library-grpc-contracts-player @@ -10,3 +11,6 @@ quarkus.datasource.username=${POSTGRES_USER:app} quarkus.datasource.password=${POSTGRES_PASSWORD:app} quarkus.flyway.schemas=player quarkus.flyway.migrate-at-start=true + +grounds.player.sessions.ttl=${PLAYER_SESSIONS_TTL:90s} +grounds.player.sessions.cleanup-interval=${PLAYER_SESSIONS_CLEANUP_INTERVAL:30s} diff --git a/src/main/resources/db/migration/V1__create_player_sessions.sql b/src/main/resources/db/migration/V1__create_player_sessions.sql index 4304aab..716b4d6 100644 --- a/src/main/resources/db/migration/V1__create_player_sessions.sql +++ b/src/main/resources/db/migration/V1__create_player_sessions.sql @@ -1,4 +1,8 @@ CREATE TABLE IF NOT EXISTS player_sessions ( player_id UUID PRIMARY KEY, - connected_at TIMESTAMPTZ NOT NULL + connected_at TIMESTAMPTZ NOT NULL, + last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); + +CREATE INDEX IF NOT EXISTS player_sessions_last_seen_at_idx + ON player_sessions (last_seen_at); diff --git a/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt b/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt index c1b897e..606fc5b 100644 --- a/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt +++ b/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt @@ -2,6 +2,8 @@ package gg.grounds.api import gg.grounds.domain.PlayerSession import gg.grounds.grpc.player.LoginStatus +import gg.grounds.grpc.player.PlayerHeartbeatBatchReply +import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest import gg.grounds.grpc.player.PlayerLoginReply import gg.grounds.grpc.player.PlayerLoginRequest import gg.grounds.grpc.player.PlayerLogoutReply @@ -24,6 +26,7 @@ import org.mockito.Mockito.reset import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.eq +import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.verifyNoInteractions import org.mockito.kotlin.whenever @@ -67,6 +70,7 @@ class PlayerPresenceGrpcServiceTest { verify(repository).insertSession(sessionCaptor.capture()) assertEquals(playerId, sessionCaptor.firstValue.playerId) assertNotNull(sessionCaptor.firstValue.connectedAt) + assertNotNull(sessionCaptor.firstValue.lastSeenAt) } @Test @@ -74,7 +78,7 @@ class PlayerPresenceGrpcServiceTest { val playerId = UUID.randomUUID() whenever(repository.insertSession(any())).thenReturn(false) whenever(repository.findByPlayerId(eq(playerId))) - .thenReturn(PlayerSession(playerId, Instant.EPOCH)) + .thenReturn(PlayerSession(playerId, Instant.EPOCH, Instant.now().minusSeconds(5))) val request = PlayerLoginRequest.newBuilder().setPlayerId(playerId.toString()).build() @@ -85,6 +89,116 @@ class PlayerPresenceGrpcServiceTest { verify(repository).findByPlayerId(playerId) } + @Test + fun loginAcceptsWhenExistingSessionIsStale() { + val playerId = UUID.randomUUID() + whenever(repository.insertSession(any())).thenReturn(false, true) + whenever(repository.findByPlayerId(eq(playerId))) + .thenReturn(PlayerSession(playerId, Instant.EPOCH, Instant.EPOCH)) + whenever(repository.deleteSession(eq(playerId))).thenReturn(DeleteSessionResult.REMOVED) + + val request = PlayerLoginRequest.newBuilder().setPlayerId(playerId.toString()).build() + + val reply: PlayerLoginReply = service.tryPlayerLogin(request).await().indefinitely() + + assertEquals(LoginStatus.LOGIN_STATUS_ACCEPTED, reply.status) + assertEquals("player accepted", reply.message) + verify(repository).deleteSession(playerId) + verify(repository, times(2)).insertSession(any()) + } + + @Test + fun loginAcceptsWhenStaleSessionAlreadyRemoved() { + val playerId = UUID.randomUUID() + whenever(repository.insertSession(any())).thenReturn(false, true) + whenever(repository.findByPlayerId(eq(playerId))) + .thenReturn(PlayerSession(playerId, Instant.EPOCH, Instant.EPOCH)) + whenever(repository.deleteSession(eq(playerId))).thenReturn(DeleteSessionResult.NOT_FOUND) + + val request = PlayerLoginRequest.newBuilder().setPlayerId(playerId.toString()).build() + + val reply: PlayerLoginReply = service.tryPlayerLogin(request).await().indefinitely() + + assertEquals(LoginStatus.LOGIN_STATUS_ACCEPTED, reply.status) + assertEquals("player accepted", reply.message) + verify(repository).deleteSession(playerId) + verify(repository, times(2)).insertSession(any()) + } + + @Test + fun loginReturnsErrorWhenSessionCannotBeVerified() { + val playerId = UUID.randomUUID() + whenever(repository.insertSession(any())).thenReturn(false) + whenever(repository.findByPlayerId(eq(playerId))).thenReturn(null) + + val request = PlayerLoginRequest.newBuilder().setPlayerId(playerId.toString()).build() + + val reply: PlayerLoginReply = service.tryPlayerLogin(request).await().indefinitely() + + assertEquals(LoginStatus.LOGIN_STATUS_ERROR, reply.status) + assertEquals("unable to verify player session", reply.message) + verify(repository).findByPlayerId(playerId) + } + + @Test + fun heartbeatBatchRejectsInvalidPlayerIds() { + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addPlayerIds("bad-id") + .addPlayerIds(UUID.randomUUID().toString()) + .build() + + val reply: PlayerHeartbeatBatchReply = + service.playerHeartbeatBatch(request).await().indefinitely() + + assertEquals(0, reply.updated) + assertEquals(0, reply.missing) + assertEquals("player_ids must be UUIDs", reply.message) + verifyNoInteractions(repository) + } + + @Test + fun heartbeatBatchUpdatesSessions() { + val first = UUID.randomUUID() + val second = UUID.randomUUID() + whenever(repository.touchSessions(eq(listOf(first, second)), any())).thenReturn(2) + + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addPlayerIds(first.toString()) + .addPlayerIds(second.toString()) + .build() + + val reply: PlayerHeartbeatBatchReply = + service.playerHeartbeatBatch(request).await().indefinitely() + + assertEquals(2, reply.updated) + assertEquals(0, reply.missing) + assertEquals("heartbeat accepted", reply.message) + verify(repository).touchSessions(eq(listOf(first, second)), any()) + } + + @Test + fun heartbeatBatchReportsMissingSessions() { + val first = UUID.randomUUID() + val second = UUID.randomUUID() + whenever(repository.touchSessions(eq(listOf(first, second)), any())).thenReturn(1) + + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addPlayerIds(first.toString()) + .addPlayerIds(second.toString()) + .build() + + val reply: PlayerHeartbeatBatchReply = + service.playerHeartbeatBatch(request).await().indefinitely() + + assertEquals(1, reply.updated) + assertEquals(1, reply.missing) + assertEquals("heartbeat accepted", reply.message) + verify(repository).touchSessions(eq(listOf(first, second)), any()) + } + @Test fun logoutRejectsInvalidPlayerId() { val request = PlayerLogoutRequest.newBuilder().setPlayerId("bad-id").build() diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 8c22e95..ef74614 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -2,4 +2,5 @@ quarkus.grpc.clients.player-presence.host=localhost quarkus.grpc.clients.player-presence.port=9001 quarkus.grpc.clients.player-presence.plain-text=true -quarkus.flyway.migrate-at-start=false \ No newline at end of file +quarkus.flyway.migrate-at-start=false +quarkus.scheduler.enabled=false From 4fd23333d877fe581dc5c0c3943a60cea2d25891 Mon Sep 17 00:00:00 2001 From: Lukas Jost Date: Mon, 26 Jan 2026 11:00:52 +0100 Subject: [PATCH 2/3] fix: implement copilots sugesstions --- .../gg/grounds/api/PlayerHeartbeatService.kt | 68 +++++++++++++ .../grounds/api/PlayerPresenceGrpcService.kt | 73 ++++---------- .../grounds/presence/PlayerSessionCleanup.kt | 11 ++- .../kotlin/gg/grounds/time/TimeProvider.kt | 9 ++ src/main/resources/application.properties | 2 +- .../grounds/api/PlayerHeartbeatServiceTest.kt | 99 +++++++++++++++++++ .../api/PlayerPresenceGrpcServiceTest.kt | 80 ++++----------- .../presence/PlayerSessionCleanupTest.kt | 50 ++++++++++ 8 files changed, 271 insertions(+), 121 deletions(-) create mode 100644 src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt create mode 100644 src/main/kotlin/gg/grounds/time/TimeProvider.kt create mode 100644 src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt create mode 100644 src/test/kotlin/gg/grounds/presence/PlayerSessionCleanupTest.kt diff --git a/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt b/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt new file mode 100644 index 0000000..2f11a3c --- /dev/null +++ b/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt @@ -0,0 +1,68 @@ +package gg.grounds.api + +import gg.grounds.grpc.player.PlayerHeartbeatBatchReply +import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest +import gg.grounds.persistence.PlayerSessionRepository +import jakarta.enterprise.context.ApplicationScoped +import jakarta.inject.Inject +import java.time.Instant +import java.util.UUID +import org.jboss.logging.Logger + +@ApplicationScoped +class PlayerHeartbeatService @Inject constructor(private val repository: PlayerSessionRepository) { + fun handleHeartbeatBatch(request: PlayerHeartbeatBatchRequest): PlayerHeartbeatBatchReply { + val playerIds = + parsePlayerIds(request.playerIdsList) + ?: return PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(0) + .setMissing(0) + .setMessage("player_ids must be UUIDs") + .also { + LOG.warnf( + "Player heartbeat batch rejected (count=%d, reason=invalid_player_ids)", + request.playerIdsList.size, + ) + } + .build() + + if (playerIds.isEmpty()) { + LOG.debugf("Player heartbeat batch skipped (count=0, reason=empty_request)") + return PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(0) + .setMissing(0) + .setMessage("no player ids provided") + .build() + } + + val updated = repository.touchSessions(playerIds, Instant.now()) + val missing = (playerIds.size - updated).coerceAtLeast(0) + LOG.debugf( + "Player heartbeat batch processed (count=%d, updated=%d, missing=%d)", + playerIds.size, + updated, + missing, + ) + return PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(updated) + .setMissing(missing) + .setMessage("heartbeat accepted") + .build() + } + + private fun parsePlayerIds(values: List): List? { + if (values.isEmpty()) { + return emptyList() + } + val trimmed = values.map { it.trim() } + if (trimmed.any { it.isEmpty() }) { + return null + } + val parsed = trimmed.map { runCatching { UUID.fromString(it) }.getOrNull() } + return parsed.takeIf { parsedIds -> parsedIds.none { it == null } }?.filterNotNull() + } + + companion object { + private val LOG = Logger.getLogger(PlayerHeartbeatService::class.java) + } +} diff --git a/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt b/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt index f129a77..1e00c78 100644 --- a/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt +++ b/src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt @@ -25,9 +25,12 @@ import org.jboss.logging.Logger @Blocking class PlayerPresenceGrpcService @Inject -constructor(private val repository: PlayerSessionRepository) : PlayerPresenceService { +constructor( + private val repository: PlayerSessionRepository, + private val heartbeatService: PlayerHeartbeatService, +) : PlayerPresenceService { @ConfigProperty(name = "grounds.player.sessions.ttl", defaultValue = "90s") - lateinit var sessionTtl: Duration + private lateinit var sessionTtl: Duration override fun tryPlayerLogin(request: PlayerLoginRequest): Uni { return Uni.createFrom().item { handleLogin(request) } @@ -40,7 +43,7 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer override fun playerHeartbeatBatch( request: PlayerHeartbeatBatchRequest ): Uni { - return Uni.createFrom().item { handleHeartbeatBatch(request) } + return Uni.createFrom().item { heartbeatService.handleHeartbeatBatch(request) } } private fun handleLogin(request: PlayerLoginRequest): PlayerLoginReply { @@ -89,6 +92,17 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer .setMessage("player accepted") .build() } + val recreated = repository.findByPlayerId(playerId) + if (recreated == null) { + LOG.errorf( + "Player session recreation failed (playerId=%s, reason=insert_failed)", + playerId, + ) + return PlayerLoginReply.newBuilder() + .setStatus(LoginStatus.LOGIN_STATUS_ERROR) + .setMessage("unable to create player session after stale cleanup") + .build() + } } LOG.infof("Player session rejected (playerId=%s, reason=already_online)", playerId) @@ -105,47 +119,6 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer .build() } - private fun handleHeartbeatBatch( - request: PlayerHeartbeatBatchRequest - ): PlayerHeartbeatBatchReply { - val playerIds = - parsePlayerIds(request.playerIdsList) - ?: return PlayerHeartbeatBatchReply.newBuilder() - .setUpdated(0) - .setMissing(0) - .setMessage("player_ids must be UUIDs") - .also { - LOG.warnf( - "Player heartbeat batch rejected (count=%d, reason=invalid_player_ids)", - request.playerIdsList.size, - ) - } - .build() - - if (playerIds.isEmpty()) { - LOG.debugf("Player heartbeat batch skipped (count=0, reason=empty_request)") - return PlayerHeartbeatBatchReply.newBuilder() - .setUpdated(0) - .setMissing(0) - .setMessage("no player ids provided") - .build() - } - - val updated = repository.touchSessions(playerIds, Instant.now()) - val missing = (playerIds.size - updated).coerceAtLeast(0) - LOG.debugf( - "Player heartbeat batch processed (count=%d, updated=%d, missing=%d)", - playerIds.size, - updated, - missing, - ) - return PlayerHeartbeatBatchReply.newBuilder() - .setUpdated(updated) - .setMissing(missing) - .setMessage("heartbeat accepted") - .build() - } - private fun handleLogout(request: PlayerLogoutRequest): PlayerLogoutReply { val playerId = parsePlayerId(request.playerId) @@ -179,18 +152,6 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer ?.let { runCatching { UUID.fromString(it) }.getOrNull() } } - private fun parsePlayerIds(values: List): List? { - if (values.isEmpty()) { - return emptyList() - } - val trimmed = values.map { it.trim() } - if (trimmed.any { it.isEmpty() }) { - return null - } - val parsed = trimmed.map { runCatching { UUID.fromString(it) }.getOrNull() } - return if (parsed.any { it == null }) null else parsed.filterNotNull() - } - private fun isStale(session: PlayerSession, now: Instant): Boolean { return session.lastSeenAt.isBefore(now.minus(sessionTtl)) } diff --git a/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt b/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt index c878bd5..0203a30 100644 --- a/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt +++ b/src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt @@ -1,22 +1,27 @@ package gg.grounds.presence import gg.grounds.persistence.PlayerSessionRepository +import gg.grounds.time.TimeProvider import io.quarkus.scheduler.Scheduled import jakarta.enterprise.context.ApplicationScoped import jakarta.inject.Inject import java.time.Duration -import java.time.Instant import org.eclipse.microprofile.config.inject.ConfigProperty import org.jboss.logging.Logger @ApplicationScoped -class PlayerSessionCleanup @Inject constructor(private val repository: PlayerSessionRepository) { +class PlayerSessionCleanup +@Inject +constructor( + private val repository: PlayerSessionRepository, + private val timeProvider: TimeProvider, +) { @ConfigProperty(name = "grounds.player.sessions.ttl", defaultValue = "90s") private lateinit var sessionTtl: Duration @Scheduled(every = "{grounds.player.sessions.cleanup-interval}") fun expireStaleSessions() { - val cutoff = Instant.now().minus(sessionTtl) + val cutoff = timeProvider.now().minus(sessionTtl) val removed = repository.deleteStaleSessions(cutoff) LOG.infof("Player session cleanup completed (removed=%d, cutoff=%s)", removed, cutoff) } diff --git a/src/main/kotlin/gg/grounds/time/TimeProvider.kt b/src/main/kotlin/gg/grounds/time/TimeProvider.kt new file mode 100644 index 0000000..5793a72 --- /dev/null +++ b/src/main/kotlin/gg/grounds/time/TimeProvider.kt @@ -0,0 +1,9 @@ +package gg.grounds.time + +import jakarta.enterprise.context.ApplicationScoped +import java.time.Instant + +@ApplicationScoped +class TimeProvider { + fun now(): Instant = Instant.now() +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 15886c1..b72e1fa 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,7 +1,7 @@ quarkus.grpc.server.use-separate-server=false quarkus.http.host=0.0.0.0 quarkus.http.port=9000 -quarkus.log.level=DEBUG +quarkus.log.level=${QUARKUS_LOG_LEVEL:INFO} quarkus.generate-code.grpc.scan-for-proto=gg.grounds:library-grpc-contracts-player diff --git a/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt b/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt new file mode 100644 index 0000000..4b3d333 --- /dev/null +++ b/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt @@ -0,0 +1,99 @@ +package gg.grounds.api + +import gg.grounds.grpc.player.PlayerHeartbeatBatchReply +import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest +import gg.grounds.persistence.PlayerSessionRepository +import io.quarkus.test.InjectMock +import io.quarkus.test.junit.QuarkusTest +import jakarta.inject.Inject +import java.util.UUID +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.Mockito.reset +import org.mockito.kotlin.any +import org.mockito.kotlin.eq +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoInteractions +import org.mockito.kotlin.whenever + +@QuarkusTest +class PlayerHeartbeatServiceTest { + + @InjectMock lateinit var repository: PlayerSessionRepository + + @Inject lateinit var heartbeatService: PlayerHeartbeatService + + @BeforeEach + fun resetMocks() { + reset(repository) + } + + @Test + fun heartbeatBatchRejectsInvalidPlayerIds() { + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addPlayerIds("bad-id") + .addPlayerIds(UUID.randomUUID().toString()) + .build() + + val reply: PlayerHeartbeatBatchReply = heartbeatService.handleHeartbeatBatch(request) + + assertEquals(0, reply.updated) + assertEquals(0, reply.missing) + assertEquals("player_ids must be UUIDs", reply.message) + verifyNoInteractions(repository) + } + + @Test + fun heartbeatBatchRejectsEmptyPlayerIds() { + val request = PlayerHeartbeatBatchRequest.newBuilder().build() + + val reply: PlayerHeartbeatBatchReply = heartbeatService.handleHeartbeatBatch(request) + + assertEquals(0, reply.updated) + assertEquals(0, reply.missing) + assertEquals("no player ids provided", reply.message) + verifyNoInteractions(repository) + } + + @Test + fun heartbeatBatchUpdatesSessions() { + val first = UUID.randomUUID() + val second = UUID.randomUUID() + whenever(repository.touchSessions(eq(listOf(first, second)), any())).thenReturn(2) + + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addPlayerIds(first.toString()) + .addPlayerIds(second.toString()) + .build() + + val reply: PlayerHeartbeatBatchReply = heartbeatService.handleHeartbeatBatch(request) + + assertEquals(2, reply.updated) + assertEquals(0, reply.missing) + assertEquals("heartbeat accepted", reply.message) + verify(repository).touchSessions(eq(listOf(first, second)), any()) + } + + @Test + fun heartbeatBatchReportsMissingSessions() { + val first = UUID.randomUUID() + val second = UUID.randomUUID() + whenever(repository.touchSessions(eq(listOf(first, second)), any())).thenReturn(1) + + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addPlayerIds(first.toString()) + .addPlayerIds(second.toString()) + .build() + + val reply: PlayerHeartbeatBatchReply = heartbeatService.handleHeartbeatBatch(request) + + assertEquals(1, reply.updated) + assertEquals(1, reply.missing) + assertEquals("heartbeat accepted", reply.message) + verify(repository).touchSessions(eq(listOf(first, second)), any()) + } +} diff --git a/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt b/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt index 606fc5b..926bce9 100644 --- a/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt +++ b/src/test/kotlin/gg/grounds/api/PlayerPresenceGrpcServiceTest.kt @@ -2,8 +2,6 @@ package gg.grounds.api import gg.grounds.domain.PlayerSession import gg.grounds.grpc.player.LoginStatus -import gg.grounds.grpc.player.PlayerHeartbeatBatchReply -import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest import gg.grounds.grpc.player.PlayerLoginReply import gg.grounds.grpc.player.PlayerLoginRequest import gg.grounds.grpc.player.PlayerLogoutReply @@ -126,77 +124,37 @@ class PlayerPresenceGrpcServiceTest { } @Test - fun loginReturnsErrorWhenSessionCannotBeVerified() { + fun loginReturnsErrorWhenStaleSessionReinsertFails() { val playerId = UUID.randomUUID() - whenever(repository.insertSession(any())).thenReturn(false) - whenever(repository.findByPlayerId(eq(playerId))).thenReturn(null) + whenever(repository.insertSession(any())).thenReturn(false, false) + whenever(repository.findByPlayerId(eq(playerId))) + .thenReturn(PlayerSession(playerId, Instant.EPOCH, Instant.EPOCH), null) + whenever(repository.deleteSession(eq(playerId))).thenReturn(DeleteSessionResult.REMOVED) val request = PlayerLoginRequest.newBuilder().setPlayerId(playerId.toString()).build() val reply: PlayerLoginReply = service.tryPlayerLogin(request).await().indefinitely() assertEquals(LoginStatus.LOGIN_STATUS_ERROR, reply.status) - assertEquals("unable to verify player session", reply.message) - verify(repository).findByPlayerId(playerId) + assertEquals("unable to create player session after stale cleanup", reply.message) + verify(repository).deleteSession(playerId) + verify(repository, times(2)).insertSession(any()) + verify(repository, times(2)).findByPlayerId(playerId) } @Test - fun heartbeatBatchRejectsInvalidPlayerIds() { - val request = - PlayerHeartbeatBatchRequest.newBuilder() - .addPlayerIds("bad-id") - .addPlayerIds(UUID.randomUUID().toString()) - .build() - - val reply: PlayerHeartbeatBatchReply = - service.playerHeartbeatBatch(request).await().indefinitely() - - assertEquals(0, reply.updated) - assertEquals(0, reply.missing) - assertEquals("player_ids must be UUIDs", reply.message) - verifyNoInteractions(repository) - } + fun loginReturnsErrorWhenSessionCannotBeVerified() { + val playerId = UUID.randomUUID() + whenever(repository.insertSession(any())).thenReturn(false) + whenever(repository.findByPlayerId(eq(playerId))).thenReturn(null) - @Test - fun heartbeatBatchUpdatesSessions() { - val first = UUID.randomUUID() - val second = UUID.randomUUID() - whenever(repository.touchSessions(eq(listOf(first, second)), any())).thenReturn(2) - - val request = - PlayerHeartbeatBatchRequest.newBuilder() - .addPlayerIds(first.toString()) - .addPlayerIds(second.toString()) - .build() - - val reply: PlayerHeartbeatBatchReply = - service.playerHeartbeatBatch(request).await().indefinitely() - - assertEquals(2, reply.updated) - assertEquals(0, reply.missing) - assertEquals("heartbeat accepted", reply.message) - verify(repository).touchSessions(eq(listOf(first, second)), any()) - } + val request = PlayerLoginRequest.newBuilder().setPlayerId(playerId.toString()).build() - @Test - fun heartbeatBatchReportsMissingSessions() { - val first = UUID.randomUUID() - val second = UUID.randomUUID() - whenever(repository.touchSessions(eq(listOf(first, second)), any())).thenReturn(1) - - val request = - PlayerHeartbeatBatchRequest.newBuilder() - .addPlayerIds(first.toString()) - .addPlayerIds(second.toString()) - .build() - - val reply: PlayerHeartbeatBatchReply = - service.playerHeartbeatBatch(request).await().indefinitely() - - assertEquals(1, reply.updated) - assertEquals(1, reply.missing) - assertEquals("heartbeat accepted", reply.message) - verify(repository).touchSessions(eq(listOf(first, second)), any()) + val reply: PlayerLoginReply = service.tryPlayerLogin(request).await().indefinitely() + + assertEquals(LoginStatus.LOGIN_STATUS_ERROR, reply.status) + assertEquals("unable to verify player session", reply.message) + verify(repository).findByPlayerId(playerId) } @Test diff --git a/src/test/kotlin/gg/grounds/presence/PlayerSessionCleanupTest.kt b/src/test/kotlin/gg/grounds/presence/PlayerSessionCleanupTest.kt new file mode 100644 index 0000000..7333456 --- /dev/null +++ b/src/test/kotlin/gg/grounds/presence/PlayerSessionCleanupTest.kt @@ -0,0 +1,50 @@ +package gg.grounds.presence + +import gg.grounds.persistence.PlayerSessionRepository +import gg.grounds.time.TimeProvider +import io.quarkus.test.InjectMock +import io.quarkus.test.junit.QuarkusTest +import io.quarkus.test.junit.QuarkusTestProfile +import io.quarkus.test.junit.TestProfile +import jakarta.inject.Inject +import java.time.Instant +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.Mockito.reset +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +@QuarkusTest +@TestProfile(PlayerSessionCleanupTestProfile::class) +class PlayerSessionCleanupTest { + + @InjectMock lateinit var repository: PlayerSessionRepository + + @InjectMock lateinit var timeProvider: TimeProvider + + @Inject lateinit var cleanup: PlayerSessionCleanup + + @BeforeEach + fun resetMocks() { + reset(repository, timeProvider) + } + + @Test + fun expireStaleSessionsUsesConfiguredTtl() { + val fixedInstant = Instant.parse("2025-01-02T03:04:05Z") + whenever(timeProvider.now()).thenReturn(fixedInstant) + cleanup.expireStaleSessions() + + val cutoffCaptor = argumentCaptor() + verify(repository).deleteStaleSessions(cutoffCaptor.capture()) + assertEquals(fixedInstant.minusSeconds(120), cutoffCaptor.firstValue) + verify(timeProvider).now() + } +} + +class PlayerSessionCleanupTestProfile : QuarkusTestProfile { + override fun getConfigOverrides(): Map = + mapOf("grounds.player.sessions.ttl" to "120s") +} From 6079f1a851b5aec747a8c069309421537d4a6bf1 Mon Sep 17 00:00:00 2001 From: Lukas Jost Date: Mon, 26 Jan 2026 13:31:56 +0100 Subject: [PATCH 3/3] fix: add success boolean --- src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt | 3 +++ src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt b/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt index 2f11a3c..8abaaba 100644 --- a/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt +++ b/src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt @@ -17,6 +17,7 @@ class PlayerHeartbeatService @Inject constructor(private val repository: PlayerS ?: return PlayerHeartbeatBatchReply.newBuilder() .setUpdated(0) .setMissing(0) + .setSuccess(false) .setMessage("player_ids must be UUIDs") .also { LOG.warnf( @@ -31,6 +32,7 @@ class PlayerHeartbeatService @Inject constructor(private val repository: PlayerS return PlayerHeartbeatBatchReply.newBuilder() .setUpdated(0) .setMissing(0) + .setSuccess(false) .setMessage("no player ids provided") .build() } @@ -46,6 +48,7 @@ class PlayerHeartbeatService @Inject constructor(private val repository: PlayerS return PlayerHeartbeatBatchReply.newBuilder() .setUpdated(updated) .setMissing(missing) + .setSuccess(true) .setMessage("heartbeat accepted") .build() } diff --git a/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt b/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt index 4b3d333..a737cdf 100644 --- a/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt +++ b/src/test/kotlin/gg/grounds/api/PlayerHeartbeatServiceTest.kt @@ -41,6 +41,7 @@ class PlayerHeartbeatServiceTest { assertEquals(0, reply.updated) assertEquals(0, reply.missing) + assertEquals(false, reply.success) assertEquals("player_ids must be UUIDs", reply.message) verifyNoInteractions(repository) } @@ -53,6 +54,7 @@ class PlayerHeartbeatServiceTest { assertEquals(0, reply.updated) assertEquals(0, reply.missing) + assertEquals(false, reply.success) assertEquals("no player ids provided", reply.message) verifyNoInteractions(repository) } @@ -73,6 +75,7 @@ class PlayerHeartbeatServiceTest { assertEquals(2, reply.updated) assertEquals(0, reply.missing) + assertEquals(true, reply.success) assertEquals("heartbeat accepted", reply.message) verify(repository).touchSessions(eq(listOf(first, second)), any()) } @@ -93,6 +96,7 @@ class PlayerHeartbeatServiceTest { assertEquals(1, reply.updated) assertEquals(1, reply.missing) + assertEquals(true, reply.success) assertEquals("heartbeat accepted", reply.message) verify(repository).touchSessions(eq(listOf(first, second)), any()) }