diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index 5835dbd..017e418 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -90,12 +90,25 @@ func (s *PostgresStore) listen(ctx context.Context) error { if !ok { continue } - raw := make([]byte, base64.StdEncoding.DecodedLen(len(data))) - l, err := base64.StdEncoding.Decode(raw, []byte(data)) + rawCompressed := make([]byte, base64.StdEncoding.DecodedLen(len(data))) + l, err := base64.StdEncoding.Decode(rawCompressed, []byte(data)) if err != nil { return fmt.Errorf("failed to decode payload: %w", err) } - raw = raw[:l] + rawCompressed = rawCompressed[:l] + + var raw []byte + if util.IsGzipCompressed(rawCompressed) { + raw, err = util.GzipDecompress(rawCompressed) + if err != nil { + return fmt.Errorf("failed to decompress payload: %w", err) + } + } else { + logger := logging.GetLogger(ctx) + logger.Warn("received uncompressed notification", zap.String("topic", topic)) + raw = rawCompressed + } + s.notify(ctx, topic, raw) } } @@ -152,18 +165,19 @@ func (s *PostgresStore) Publish(ctx context.Context, topic string, data []byte) if !topicRegexp.MatchString(topic) { return fmt.Errorf("topic %q is invalid", topic) } - totalLength := base64.StdEncoding.EncodedLen(len(data)) + len(topic) + 1 + + compressedData, err := util.GzipCompress(data) + if err != nil { + return fmt.Errorf("failed to gzip data: %w", err) + } + + totalLength := base64.StdEncoding.EncodedLen(len(compressedData)) + len(topic) + 1 if totalLength > 8000 { - // debug a Poki specific topic to understand why the payload gets so large in some cases - if topic == "2c23b92e-cc51-45f2-8ece-bff5d9b2e2d6" { - logger := logging.GetLogger(ctx) - logger.Warn("data is too long", zap.String("topic", topic), zap.Int("length", totalLength), zap.String("data", string(data))) - } return fmt.Errorf("data too long for topic %q: %d", topic, totalLength) } - encoded := base64.StdEncoding.EncodeToString(data) + encoded := base64.StdEncoding.EncodeToString(compressedData) payload := topic + ":" + encoded - _, err := s.DB.Exec(ctx, `NOTIFY lobbies, '`+payload+`'`) + _, err = s.DB.Exec(ctx, `NOTIFY lobbies, '`+payload+`'`) if err != nil { return fmt.Errorf("failed to publish to lobbies: %w", err) } diff --git a/internal/util/gzip.go b/internal/util/gzip.go new file mode 100644 index 0000000..64e14ca --- /dev/null +++ b/internal/util/gzip.go @@ -0,0 +1,50 @@ +package util + +import ( + "bytes" + "compress/gzip" + "io" +) + +func IsGzipCompressed(data []byte) bool { + // Gzip magic bytes: 0x1f 0x8b + return len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b +} + +func GzipCompress(input []byte) ([]byte, error) { + var buf bytes.Buffer + writer, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed) + if err != nil { + return nil, err + } + + _, err = writer.Write(input) + if err != nil { + return nil, err + } + + if err := writer.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func GzipDecompress(compressedInput []byte) ([]byte, error) { + reader, err := gzip.NewReader(bytes.NewReader(compressedInput)) + if err != nil { + return nil, err + } + + decompressed, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + + err = reader.Close() + if err != nil { + return nil, err + } + + return decompressed, nil +}