Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions cmd/ges/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package main

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/pentops/ges/internal/service"
"github.com/pentops/grpc.go/grpcbind"
"github.com/pentops/runner/commander"
"github.com/pentops/sqrlx.go/pgenv"
"github.com/pressly/goose"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
Expand Down Expand Up @@ -51,11 +55,33 @@ func runServe(ctx context.Context, cfg struct {
return err
}

grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(
service.GRPCMiddleware()...,
))
app.RegisterGRPC(grpcServer)
reflection.Register(grpcServer)
awsConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %w", err)
}

sqsClient := sqs.NewFromConfig(awsConfig)

listener, err := service.ReplayListener(cfg.DatabaseConfig.URL, sqsClient)
if err != nil {
return err
}

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {

grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(
service.GRPCMiddleware()...,
))
app.RegisterGRPC(grpcServer)
reflection.Register(grpcServer)

return cfg.ListenAndServe(ctx, grpcServer)
})

eg.Go(func() error {
return listener.Listen(ctx)
})

return cfg.ListenAndServe(ctx, grpcServer)
return eg.Wait()
}
14 changes: 11 additions & 3 deletions ext/db/00002_events.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
-- +goose Up
CREATE TABLE event (
grpc_service text NOT NULL,
grpc_method text NOT NULL,

id text PRIMARY KEY,
timestamp timestamptz NOT NULL,
entity_name text NOT NULL,

data jsonb NOT NULL
data jsonb NOT NULL -- ges.v1.Event

);

CREATE TABLE upsert (
grpc_service text NOT NULL,
grpc_method text NOT NULL,

entity_name text NOT NULL,
entity_id text NOT NULL,
last_event_id text NOT NULL,
last_event_timestamp timestamptz NOT NULL,
data jsonb NOT NULL,

data jsonb NOT NULL, -- ges.v1.Upsert

PRIMARY KEY (entity_name, entity_id)
PRIMARY KEY (grpc_service, grpc_method, entity_id)
);


Expand Down
20 changes: 20 additions & 0 deletions ext/db/00003_replay.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +goose Up

CREATE TABLE replay_event (
replay_id text PRIMARY KEY, -- {queue_url}/{event_id}
event_id text REFERENCES event(id) NOT NULL,
queue_url text NOT NULL
);

CREATE TABLE replay_upsert (
replay_id text PRIMARY KEY, -- {queue_url}/{entity_name}/{entity_id}
grpc_service text NOT NULL,
grpc_method text NOT NULL,
entity_id text NOT NULL,
queue_url text NOT NULL
);

-- +goose Down

DROP TABLE replay_event;
DROP TABLE replay_upsert;
1 change: 1 addition & 0 deletions ext/o5/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ runtimes:
envName: "*"
- name: "global/upsert"
envName: "*"
- name: "/ges.v1.topic.ReplayTopic"

containers:
- name: main
Expand Down
40 changes: 29 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,45 @@ go 1.24.0
toolchain go1.24.1

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.5-20250307204501-0409229c3780.1
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250307204501-0409229c3780.1
github.com/aws/aws-sdk-go-v2/config v1.29.9
github.com/aws/aws-sdk-go-v2/service/sqs v1.38.4
github.com/elgris/sqrl v0.0.0-20210727210741-7e0198b30236
github.com/google/uuid v1.6.0
github.com/pentops/flowtest v0.0.0-20241110231021-42663ac00b63
github.com/pentops/golib v0.0.0-20250107012216-1b5307b3bfe0
github.com/jackc/pgx/v5 v5.7.4
github.com/lib/pq v1.10.9
github.com/pentops/flowtest v0.0.0-20250403234635-311159fa1e81
github.com/pentops/golib v0.0.0-20250326060930-8c83d58ddb63
github.com/pentops/grpc.go v0.0.0-20250326042738-bcdfc2b43fa9
github.com/pentops/j5 v0.0.0-20250326000307-24e2adf77e8e
github.com/pentops/j5 v0.0.0-20250407052915-b2fc017d8ac2
github.com/pentops/log.go v0.0.15
github.com/pentops/o5-messaging v0.0.0-20250317182016-de51c0e702a3
github.com/pentops/o5-messaging v0.0.0-20250408063726-cf9d6419c7cd
github.com/pentops/pgtest.go v0.0.0-20241223222214-7638cc50e15b
github.com/pentops/protostate v0.0.0-20250324023023-a72be074893a
github.com/pentops/protostate v0.0.0-20250403011625-3c2baa2e4af4
github.com/pentops/runner v0.0.0-20250116202335-8635b2a42547
github.com/pentops/sqrlx.go v0.0.0-20250324231942-5f3ef6c21f8e
github.com/pressly/goose v2.7.0+incompatible
google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4
golang.org/x/sync v0.12.0
google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463
google.golang.org/grpc v1.71.0
google.golang.org/protobuf v1.36.5
google.golang.org/protobuf v1.36.6
)

require (
cel.dev/expr v0.22.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.62 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 // indirect
github.com/aws/smithy-go v1.22.3 // indirect
github.com/bufbuild/protocompile v0.14.1 // indirect
github.com/bufbuild/protovalidate-go v0.9.2 // indirect
github.com/fatih/color v1.18.0 // indirect
Expand All @@ -36,17 +53,18 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jhump/protoreflect v1.17.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect
)
Loading