Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cache/jetstreamkv/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func New(opts ...cache.Option) (cache.RawCache, error) {

js, err := natsConn.JetStream()
if err != nil {
natsConn.Close()
return nil, err
}
// Create the client
Expand All @@ -58,15 +59,18 @@ func New(opts ...cache.Option) (cache.RawCache, error) {
// If the bucket already exists, just get a handle to it.
client, err = js.KeyValue(cacheOpts.Name)
if err != nil {
natsConn.Close()
return nil, err
}
} else {
// Another error occurred during creation.
natsConn.Close()
return nil, err
}
}

if _, err = client.Status(); err != nil {
natsConn.Close()
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func New(opts ...cache.Option) (cache.RawCache, error) {

err = client.Ping(ctx).Err()
if err != nil {
_ = client.Close()
return nil, err
}

Expand Down
12 changes: 7 additions & 5 deletions events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package events_test
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

Expand All @@ -23,7 +24,7 @@ type EventsTestSuite struct {

type MessageToTest struct {
Service *frame.Service
Count int
Count atomic.Int64
}

func (event *MessageToTest) Name() string {
Expand All @@ -46,7 +47,7 @@ func (event *MessageToTest) Execute(ctx context.Context, payload any) error {
message := *m
logger := event.Service.Log(ctx).WithField("payload", message).WithField("type", event.Name())
logger.Info("handling event")
event.Count++
event.Count.Add(1)
return nil
}

Expand Down Expand Up @@ -200,7 +201,8 @@ func (s *EventsTestSuite) TestServiceEventsPublishingWorks() {
frametests.WithNoopDriver(),
)

testEvent := MessageToTest{Service: svc, Count: tc.initialCount}
testEvent := MessageToTest{Service: svc}
testEvent.Count.Store(int64(tc.initialCount))
events := frame.WithRegisterEvents(&testEvent)

svc.Init(ctx, events)
Expand All @@ -215,8 +217,8 @@ func (s *EventsTestSuite) TestServiceEventsPublishingWorks() {
time.Sleep(2 * time.Second)
require.Equal(
t,
tc.expectedCount,
testEvent.Count,
int64(tc.expectedCount),
testEvent.Count.Load(),
"event should be processed and count incremented",
)
}
Expand Down
6 changes: 6 additions & 0 deletions events/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package events
import (
"context"
"errors"
"sync"

"github.com/pitabwire/util"

Expand All @@ -14,14 +15,19 @@ type manager struct {
qm queue.Manager
cfg config.ConfigurationEvents

mu sync.RWMutex
eventRegistry map[string]EventI
}

func (m *manager) Add(evt EventI) {
m.mu.Lock()
defer m.mu.Unlock()
m.eventRegistry[evt.Name()] = evt
}

func (m *manager) Get(eventName string) (EventI, error) {
m.mu.RLock()
defer m.mu.RUnlock()
evt, ok := m.eventRegistry[eventName]
if !ok {
return nil, errors.New("event not found in registry: " + eventName)
Expand Down
10 changes: 10 additions & 0 deletions frametests/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"sync"

"github.com/pitabwire/util"

Expand All @@ -28,25 +29,34 @@ func GetFreePort(ctx context.Context) (int, error) {
}

type testDriver struct {
mu sync.RWMutex
srv *httptest.Server
}

func (t *testDriver) ListenAndServe(_ string, h http.Handler) error {
t.mu.Lock()
defer t.mu.Unlock()
t.srv = httptest.NewServer(h)

return nil
}
func (t *testDriver) ListenAndServeTLS(_, _, _ string, h http.Handler) error {
t.mu.Lock()
defer t.mu.Unlock()
t.srv = httptest.NewTLSServer(h)
return nil
}

func (t *testDriver) Shutdown(_ context.Context) error {
t.mu.RLock()
defer t.mu.RUnlock()
t.srv.Close()
return nil
}

func (t *testDriver) GetTestServer() *httptest.Server {
t.mu.RLock()
defer t.mu.RUnlock()
return t.srv
}

Expand Down
66 changes: 33 additions & 33 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ require (
github.com/exaring/otelpgx v0.10.0
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/jackc/pgx/v5 v5.8.0
github.com/lmittmann/tint v1.1.2
github.com/lmittmann/tint v1.1.3
github.com/nats-io/nats.go v1.48.0
github.com/nicksnyder/go-i18n/v2 v2.6.1
github.com/panjf2000/ants/v2 v2.11.4
github.com/pitabwire/natspubsub v0.7.11
github.com/pitabwire/natspubsub v0.7.12
github.com/pitabwire/util v0.4.0
github.com/redis/go-redis/v9 v9.17.3
github.com/rs/xid v1.6.0
Expand All @@ -28,17 +28,17 @@ require (
github.com/testcontainers/testcontainers-go/modules/postgres v0.40.0
github.com/testcontainers/testcontainers-go/modules/valkey v0.40.0
github.com/valkey-io/valkey-go v1.0.71
go.opentelemetry.io/contrib/bridges/otelslog v0.14.0
go.opentelemetry.io/contrib/exporters/autoexport v0.64.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0
go.opentelemetry.io/contrib/propagators/autoprop v0.64.0
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/log v0.15.0
go.opentelemetry.io/otel/metric v1.39.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/sdk/log v0.15.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
go.opentelemetry.io/contrib/bridges/otelslog v0.15.0
go.opentelemetry.io/contrib/exporters/autoexport v0.65.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0
go.opentelemetry.io/contrib/propagators/autoprop v0.65.0
go.opentelemetry.io/otel v1.40.0
go.opentelemetry.io/otel/log v0.16.0
go.opentelemetry.io/otel/metric v1.40.0
go.opentelemetry.io/otel/sdk v1.40.0
go.opentelemetry.io/otel/sdk/log v0.16.0
go.opentelemetry.io/otel/sdk/metric v1.40.0
go.opentelemetry.io/otel/trace v1.40.0
gocloud.dev v0.44.0
golang.org/x/net v0.49.0
golang.org/x/oauth2 v0.34.0
Expand Down Expand Up @@ -76,7 +76,7 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/google/cel-go v0.27.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/gax-go/v2 v2.16.0 // indirect
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
Expand Down Expand Up @@ -114,32 +114,32 @@ require (
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/bridges/prometheus v0.64.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.39.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.39.0 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.39.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.15.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.61.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.15.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect
go.opentelemetry.io/contrib/bridges/prometheus v0.65.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.40.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.40.0 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.40.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.62.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.16.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.264.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading