From b749298919dc978137e2c8ef1cd434a85e19f624 Mon Sep 17 00:00:00 2001 From: Pedro Date: Mon, 22 Dec 2025 13:02:34 +0100 Subject: [PATCH 1/2] Revert "Revert "gzip notify/listen payloads" (#313)" This reverts commit b74def26ace7f246a68bc51e47c6b2eecff628c0. --- internal/signaling/stores/postgres.go | 29 ++++++++++------- internal/util/gzip.go | 45 +++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 11 deletions(-) create mode 100644 internal/util/gzip.go diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index 5835dbd..65392ed 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -90,12 +90,18 @@ func (s *PostgresStore) listen(ctx context.Context) error { if !ok { continue } - raw := make([]byte, base64.StdEncoding.DecodedLen(len(data))) - l, err := base64.StdEncoding.Decode(raw, []byte(data)) + rawCompressed := make([]byte, base64.StdEncoding.DecodedLen(len(data))) + l, err := base64.StdEncoding.Decode(rawCompressed, []byte(data)) if err != nil { return fmt.Errorf("failed to decode payload: %w", err) } - raw = raw[:l] + rawCompressed = rawCompressed[:l] + + raw, err := util.GzipDecompress(rawCompressed) + if err != nil { + return fmt.Errorf("failed to decompress payload: %w", err) + } + s.notify(ctx, topic, raw) } } @@ -152,18 +158,19 @@ func (s *PostgresStore) Publish(ctx context.Context, topic string, data []byte) if !topicRegexp.MatchString(topic) { return fmt.Errorf("topic %q is invalid", topic) } - totalLength := base64.StdEncoding.EncodedLen(len(data)) + len(topic) + 1 + + compressedData, err := util.GzipCompress(data) + if err != nil { + return fmt.Errorf("failed to gzip data: %w", err) + } + + totalLength := base64.StdEncoding.EncodedLen(len(compressedData)) + len(topic) + 1 if totalLength > 8000 { - // debug a Poki specific topic to understand why the payload gets so large in some cases - if topic == "2c23b92e-cc51-45f2-8ece-bff5d9b2e2d6" { - logger := logging.GetLogger(ctx) - logger.Warn("data is too long", zap.String("topic", topic), zap.Int("length", totalLength), zap.String("data", string(data))) - } return fmt.Errorf("data too long for topic %q: %d", topic, totalLength) } - encoded := base64.StdEncoding.EncodeToString(data) + encoded := base64.StdEncoding.EncodeToString(compressedData) payload := topic + ":" + encoded - _, err := s.DB.Exec(ctx, `NOTIFY lobbies, '`+payload+`'`) + _, err = s.DB.Exec(ctx, `NOTIFY lobbies, '`+payload+`'`) if err != nil { return fmt.Errorf("failed to publish to lobbies: %w", err) } diff --git a/internal/util/gzip.go b/internal/util/gzip.go new file mode 100644 index 0000000..5414095 --- /dev/null +++ b/internal/util/gzip.go @@ -0,0 +1,45 @@ +package util + +import ( + "bytes" + "compress/gzip" + "io" +) + +func GzipCompress(input []byte) ([]byte, error) { + var buf bytes.Buffer + writer, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed) + if err != nil { + return nil, err + } + + _, err = writer.Write(input) + if err != nil { + return nil, err + } + + if err := writer.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func GzipDecompress(compressedInput []byte) ([]byte, error) { + reader, err := gzip.NewReader(bytes.NewReader(compressedInput)) + if err != nil { + return nil, err + } + + decompressed, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + + err = reader.Close() + if err != nil { + return nil, err + } + + return decompressed, nil +} From 03e783f2d913ee712b1d0a5d097027888ab46203 Mon Sep 17 00:00:00 2001 From: Pedr0Rocha Date: Mon, 22 Dec 2025 13:03:47 +0100 Subject: [PATCH 2/2] check if it's compressed before trying to decompress it --- internal/signaling/stores/postgres.go | 13 ++++++++++--- internal/util/gzip.go | 5 +++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index 65392ed..017e418 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -97,9 +97,16 @@ func (s *PostgresStore) listen(ctx context.Context) error { } rawCompressed = rawCompressed[:l] - raw, err := util.GzipDecompress(rawCompressed) - if err != nil { - return fmt.Errorf("failed to decompress payload: %w", err) + var raw []byte + if util.IsGzipCompressed(rawCompressed) { + raw, err = util.GzipDecompress(rawCompressed) + if err != nil { + return fmt.Errorf("failed to decompress payload: %w", err) + } + } else { + logger := logging.GetLogger(ctx) + logger.Warn("received uncompressed notification", zap.String("topic", topic)) + raw = rawCompressed } s.notify(ctx, topic, raw) diff --git a/internal/util/gzip.go b/internal/util/gzip.go index 5414095..64e14ca 100644 --- a/internal/util/gzip.go +++ b/internal/util/gzip.go @@ -6,6 +6,11 @@ import ( "io" ) +func IsGzipCompressed(data []byte) bool { + // Gzip magic bytes: 0x1f 0x8b + return len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b +} + func GzipCompress(input []byte) ([]byte, error) { var buf bytes.Buffer writer, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed)