Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ dependencies {
implementation("io.quarkus:quarkus-jdbc-postgresql")
implementation("io.quarkus:quarkus-flyway")
implementation("io.quarkus:quarkus-kotlin")
implementation("gg.grounds:library-grpc-contracts-player:0.1.0")
implementation("io.quarkus:quarkus-scheduler")
implementation("gg.grounds:library-grpc-contracts-player:feat-player-heartbeat-SNAPSHOT")

compileOnly("com.google.protobuf:protobuf-kotlin")

Expand Down
71 changes: 71 additions & 0 deletions src/main/kotlin/gg/grounds/api/PlayerHeartbeatService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gg.grounds.api

import gg.grounds.grpc.player.PlayerHeartbeatBatchReply
import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest
import gg.grounds.persistence.PlayerSessionRepository
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import java.time.Instant
import java.util.UUID
import org.jboss.logging.Logger

@ApplicationScoped
class PlayerHeartbeatService @Inject constructor(private val repository: PlayerSessionRepository) {
fun handleHeartbeatBatch(request: PlayerHeartbeatBatchRequest): PlayerHeartbeatBatchReply {
val playerIds =
parsePlayerIds(request.playerIdsList)
?: return PlayerHeartbeatBatchReply.newBuilder()
.setUpdated(0)
.setMissing(0)
.setSuccess(false)
.setMessage("player_ids must be UUIDs")
.also {
LOG.warnf(
"Player heartbeat batch rejected (count=%d, reason=invalid_player_ids)",
request.playerIdsList.size,
)
}
.build()

if (playerIds.isEmpty()) {
LOG.debugf("Player heartbeat batch skipped (count=0, reason=empty_request)")
return PlayerHeartbeatBatchReply.newBuilder()
.setUpdated(0)
.setMissing(0)
.setSuccess(false)
.setMessage("no player ids provided")
.build()
}

val updated = repository.touchSessions(playerIds, Instant.now())
val missing = (playerIds.size - updated).coerceAtLeast(0)
LOG.debugf(
"Player heartbeat batch processed (count=%d, updated=%d, missing=%d)",
playerIds.size,
updated,
missing,
)
return PlayerHeartbeatBatchReply.newBuilder()
.setUpdated(updated)
.setMissing(missing)
.setSuccess(true)
.setMessage("heartbeat accepted")
.build()
}

private fun parsePlayerIds(values: List<String>): List<UUID>? {
if (values.isEmpty()) {
return emptyList()
}
val trimmed = values.map { it.trim() }
if (trimmed.any { it.isEmpty() }) {
return null
}
val parsed = trimmed.map { runCatching { UUID.fromString(it) }.getOrNull() }
return parsed.takeIf { parsedIds -> parsedIds.none { it == null } }?.filterNotNull()
}

companion object {
private val LOG = Logger.getLogger(PlayerHeartbeatService::class.java)
}
}
70 changes: 65 additions & 5 deletions src/main/kotlin/gg/grounds/api/PlayerPresenceGrpcService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package gg.grounds.api

import gg.grounds.domain.PlayerSession
import gg.grounds.grpc.player.LoginStatus
import gg.grounds.grpc.player.PlayerHeartbeatBatchReply
import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest
import gg.grounds.grpc.player.PlayerLoginReply
import gg.grounds.grpc.player.PlayerLoginRequest
import gg.grounds.grpc.player.PlayerLogoutReply
Expand All @@ -13,15 +15,23 @@ import io.quarkus.grpc.GrpcService
import io.smallrye.common.annotation.Blocking
import io.smallrye.mutiny.Uni
import jakarta.inject.Inject
import java.time.Duration
import java.time.Instant
import java.util.UUID
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger

@GrpcService
@Blocking
class PlayerPresenceGrpcService
@Inject
constructor(private val repository: PlayerSessionRepository) : PlayerPresenceService {
constructor(
private val repository: PlayerSessionRepository,
private val heartbeatService: PlayerHeartbeatService,
) : PlayerPresenceService {
@ConfigProperty(name = "grounds.player.sessions.ttl", defaultValue = "90s")
private lateinit var sessionTtl: Duration

override fun tryPlayerLogin(request: PlayerLoginRequest): Uni<PlayerLoginReply> {
return Uni.createFrom().item { handleLogin(request) }
}
Expand All @@ -30,6 +40,12 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer
return Uni.createFrom().item { handleLogout(request) }
}

override fun playerHeartbeatBatch(
request: PlayerHeartbeatBatchRequest
): Uni<PlayerHeartbeatBatchReply> {
return Uni.createFrom().item { heartbeatService.handleHeartbeatBatch(request) }
}

private fun handleLogin(request: PlayerLoginRequest): PlayerLoginReply {
val playerId =
parsePlayerId(request.playerId)
Expand All @@ -38,10 +54,11 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer
.setMessage("player_id must be a UUID")
.build()

val session = PlayerSession(playerId, Instant.now())
val now = Instant.now()
val session = PlayerSession(playerId, now, now)
val inserted = repository.insertSession(session)
if (inserted) {
LOG.infof("Player %s logged in", playerId)
LOG.infof("Player session created (playerId=%s, result=accepted)", playerId)
return PlayerLoginReply.newBuilder()
.setStatus(LoginStatus.LOGIN_STATUS_ACCEPTED)
.setMessage("player accepted")
Expand All @@ -50,13 +67,52 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer

val existing = repository.findByPlayerId(playerId)
if (existing != null) {
LOG.infof("Player %s rejected: already online", playerId)
if (isStale(existing, now)) {
val removed = repository.deleteSession(playerId)
if (removed == DeleteSessionResult.ERROR) {
return PlayerLoginReply.newBuilder()
.setStatus(LoginStatus.LOGIN_STATUS_ERROR)
.setMessage("unable to remove stale player session")
.build()
}
if (removed == DeleteSessionResult.REMOVED) {
LOG.infof(
"Player session expired (playerId=%s, lastSeenAt=%s)",
playerId,
existing.lastSeenAt,
)
}
if (removed == DeleteSessionResult.NOT_FOUND) {
LOG.infof("Player session missing during stale cleanup (playerId=%s)", playerId)
}
if (repository.insertSession(session)) {
LOG.infof("Player session created (playerId=%s, result=accepted)", playerId)
return PlayerLoginReply.newBuilder()
.setStatus(LoginStatus.LOGIN_STATUS_ACCEPTED)
.setMessage("player accepted")
.build()
}
val recreated = repository.findByPlayerId(playerId)
if (recreated == null) {
LOG.errorf(
"Player session recreation failed (playerId=%s, reason=insert_failed)",
playerId,
)
return PlayerLoginReply.newBuilder()
.setStatus(LoginStatus.LOGIN_STATUS_ERROR)
.setMessage("unable to create player session after stale cleanup")
.build()
}
}

LOG.infof("Player session rejected (playerId=%s, reason=already_online)", playerId)
return PlayerLoginReply.newBuilder()
.setStatus(LoginStatus.LOGIN_STATUS_ALREADY_ONLINE)
.setMessage("player already online")
.build()
}

LOG.errorf("Player session verification failed (playerId=%s)", playerId)
return PlayerLoginReply.newBuilder()
.setStatus(LoginStatus.LOGIN_STATUS_ERROR)
.setMessage("unable to verify player session")
Expand All @@ -73,7 +129,7 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer

return when (repository.deleteSession(playerId)) {
DeleteSessionResult.REMOVED -> {
LOG.infof("Player %s logged out", playerId)
LOG.infof("Player session removed (playerId=%s, result=logout)", playerId)
PlayerLogoutReply.newBuilder().setRemoved(true).setMessage("player removed").build()
}
DeleteSessionResult.NOT_FOUND ->
Expand All @@ -96,6 +152,10 @@ constructor(private val repository: PlayerSessionRepository) : PlayerPresenceSer
?.let { runCatching { UUID.fromString(it) }.getOrNull() }
}

private fun isStale(session: PlayerSession, now: Instant): Boolean {
return session.lastSeenAt.isBefore(now.minus(sessionTtl))
}

companion object {
private val LOG = Logger.getLogger(PlayerPresenceGrpcService::class.java)
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/gg/grounds/domain/PlayerSession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package gg.grounds.domain
import java.time.Instant
import java.util.UUID

data class PlayerSession(val playerId: UUID, val connectedAt: Instant)
data class PlayerSession(val playerId: UUID, val connectedAt: Instant, val lastSeenAt: Instant)
84 changes: 77 additions & 7 deletions src/main/kotlin/gg/grounds/persistence/PlayerSessionRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import jakarta.inject.Inject
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Timestamp
import java.time.Instant
import java.util.UUID
import javax.sql.DataSource
import org.jboss.logging.Logger
Expand All @@ -24,11 +25,16 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo
connection.prepareStatement(INSERT_SESSION).use { statement ->
statement.setObject(1, session.playerId)
statement.setTimestamp(2, Timestamp.from(session.connectedAt))
statement.setTimestamp(3, Timestamp.from(session.lastSeenAt))
statement.executeUpdate() > 0
}
}
} catch (error: SQLException) {
LOG.errorf(error, "Failed to insert player session for %s", session.playerId)
LOG.errorf(
error,
"Player session insert failed (playerId=%s, reason=sql_error)",
session.playerId,
)
false
}
}
Expand All @@ -44,7 +50,11 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo
}
}
} catch (error: SQLException) {
LOG.errorf(error, "Failed to fetch player session for %s", playerId)
LOG.errorf(
error,
"Player session fetch failed (playerId=%s, reason=sql_error)",
playerId,
)
null
}
}
Expand All @@ -59,11 +69,57 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo
}
}
} catch (error: SQLException) {
LOG.errorf(error, "Failed to delete player session for %s", playerId)
LOG.errorf(
error,
"Player session delete failed (playerId=%s, reason=sql_error)",
playerId,
)
DeleteSessionResult.ERROR
}
}

fun touchSessions(playerIds: Collection<UUID>, lastSeenAt: Instant): Int {
if (playerIds.isEmpty()) {
return 0
}

return try {
dataSource.connection.use { connection ->
connection.prepareStatement(UPDATE_LAST_SEEN_BATCH).use { statement ->
statement.setTimestamp(1, Timestamp.from(lastSeenAt))
val array = connection.createArrayOf("uuid", playerIds.toTypedArray())
statement.setArray(2, array)
statement.executeUpdate()
}
}
} catch (error: SQLException) {
LOG.errorf(
error,
"Player session batch update failed (count=%d, reason=sql_error)",
playerIds.size,
)
0
}
}

fun deleteStaleSessions(cutoff: Instant): Int {
return try {
dataSource.connection.use { connection ->
connection.prepareStatement(DELETE_STALE).use { statement ->
statement.setTimestamp(1, Timestamp.from(cutoff))
statement.executeUpdate()
}
}
} catch (error: SQLException) {
LOG.errorf(
error,
"Stale player session cleanup failed (cutoff=%s, reason=sql_error)",
cutoff,
)
0
}
}

private fun mapSession(resultSet: ResultSet): PlayerSession {
val playerId =
requireNotNull(resultSet.getObject("player_id", UUID::class.java)) {
Expand All @@ -72,21 +128,24 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo
val connectedAt =
requireNotNull(resultSet.getTimestamp("connected_at")) { "connected_at is null" }
.toInstant()
return PlayerSession(playerId, connectedAt)
val lastSeenAt =
requireNotNull(resultSet.getTimestamp("last_seen_at")) { "last_seen_at is null" }
.toInstant()
return PlayerSession(playerId, connectedAt, lastSeenAt)
}

companion object {
private val LOG = Logger.getLogger(PlayerSessionRepository::class.java)

private const val INSERT_SESSION =
"""
INSERT INTO player_sessions (player_id, connected_at)
VALUES (?, ?)
INSERT INTO player_sessions (player_id, connected_at, last_seen_at)
VALUES (?, ?, ?)
ON CONFLICT (player_id) DO NOTHING
"""
private const val SELECT_BY_PLAYER =
"""
SELECT player_id, connected_at
SELECT player_id, connected_at, last_seen_at
FROM player_sessions
WHERE player_id = ?
"""
Expand All @@ -95,5 +154,16 @@ class PlayerSessionRepository @Inject constructor(private val dataSource: DataSo
DELETE FROM player_sessions
WHERE player_id = ?
"""
private const val UPDATE_LAST_SEEN_BATCH =
"""
UPDATE player_sessions
SET last_seen_at = ?
WHERE player_id = ANY(?)
"""
private const val DELETE_STALE =
"""
DELETE FROM player_sessions
WHERE last_seen_at < ?
"""
}
}
32 changes: 32 additions & 0 deletions src/main/kotlin/gg/grounds/presence/PlayerSessionCleanup.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package gg.grounds.presence

import gg.grounds.persistence.PlayerSessionRepository
import gg.grounds.time.TimeProvider
import io.quarkus.scheduler.Scheduled
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import java.time.Duration
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger

@ApplicationScoped
class PlayerSessionCleanup
@Inject
constructor(
private val repository: PlayerSessionRepository,
private val timeProvider: TimeProvider,
) {
@ConfigProperty(name = "grounds.player.sessions.ttl", defaultValue = "90s")
private lateinit var sessionTtl: Duration

@Scheduled(every = "{grounds.player.sessions.cleanup-interval}")
fun expireStaleSessions() {
val cutoff = timeProvider.now().minus(sessionTtl)
val removed = repository.deleteStaleSessions(cutoff)
LOG.infof("Player session cleanup completed (removed=%d, cutoff=%s)", removed, cutoff)
}

companion object {
private val LOG = Logger.getLogger(PlayerSessionCleanup::class.java)
}
}
Loading