From b98113e2d958805ba99920e825222f10ed4cc53a Mon Sep 17 00:00:00 2001 From: Damien Whitten Date: Tue, 15 Apr 2025 16:57:28 -0700 Subject: [PATCH] Log events for end-to-end testing --- cmd/test/test.go | 15 +- ext/db/00003_message.sql | 12 + go.mod | 3 +- .../gen/test/v1/test_pb/event_log.j5s.pb.go | 195 +++++++++++++ .../test/v1/test_pb/event_log.j5s_sugar.pb.go | 3 + .../test/v1/test_spb/event_log.p.j5s.pb.go | 256 +++++++++++++++++ .../v1/test_spb/event_log.p.j5s_grpc.pb.go | 111 +++++++ .../v1/test_spb/event_log.p.j5s_sugar.pb.go | 3 + .../genclient/j5/state/v1/state/generated.go | 21 ++ internal/genclient/o5/ges/v1/ges/generated.go | 270 ++++++++++++++++++ internal/genclient/test/v1/test/generated.go | 65 +++++ internal/integration/event_log_test.go | 78 +++++ internal/integration/service_test.go | 2 + internal/integration/universe_test.go | 9 +- internal/service/app.go | 17 ++ internal/service/ges_worker.go | 56 ++++ internal/service/test_access.go | 92 ++++++ internal/service/test_worker.go | 30 +- internal/test/000_index.go | 1 + internal/test/004_replay.go | 48 ++++ j5.yaml | 5 +- schema/test/v1/event_log.j5s | 24 ++ schema/test/v1/event_log.j5s.proto | 43 +++ schema/test/v1/service/event_log.p.j5s.proto | 36 +++ 24 files changed, 1364 insertions(+), 31 deletions(-) create mode 100644 ext/db/00003_message.sql create mode 100644 internal/gen/test/v1/test_pb/event_log.j5s.pb.go create mode 100644 internal/gen/test/v1/test_pb/event_log.j5s_sugar.pb.go create mode 100644 internal/gen/test/v1/test_spb/event_log.p.j5s.pb.go create mode 100644 internal/gen/test/v1/test_spb/event_log.p.j5s_grpc.pb.go create mode 100644 internal/gen/test/v1/test_spb/event_log.p.j5s_sugar.pb.go create mode 100644 internal/genclient/o5/ges/v1/ges/generated.go create mode 100644 internal/integration/event_log_test.go create mode 100644 internal/service/ges_worker.go create mode 100644 internal/service/test_access.go create mode 100644 internal/test/004_replay.go create mode 100644 schema/test/v1/event_log.j5s create mode 100644 schema/test/v1/event_log.j5s.proto create mode 100644 schema/test/v1/service/event_log.p.j5s.proto diff --git a/cmd/test/test.go b/cmd/test/test.go index a604a9a..22e6d3e 100644 --- a/cmd/test/test.go +++ b/cmd/test/test.go @@ -23,10 +23,15 @@ func main() { } } - domain := os.Getenv("O5_E2E_DOMAIN") + apiRoot := os.Getenv("O5_API") + if apiRoot == "" { + fmt.Println("O5_API is not set") + os.Exit(1) + } + cfg := &universe.APIConfig{ - APIRoot: fmt.Sprintf("https://o5.%s", domain), - MetaRoot: fmt.Sprintf("https://o5.%s", domain), + APIRoot: apiRoot, + MetaRoot: apiRoot, BearerToken: os.Getenv("O5_BEARER"), } @@ -48,8 +53,8 @@ func loadEnv(filename string) error { return err } - lines := strings.Split(string(fileData), "\n") - for _, line := range lines { + lines := strings.SplitSeq(string(fileData), "\n") + for line := range lines { if strings.HasPrefix(line, "#") { continue } diff --git a/ext/db/00003_message.sql b/ext/db/00003_message.sql new file mode 100644 index 0000000..8cc5b13 --- /dev/null +++ b/ext/db/00003_message.sql @@ -0,0 +1,12 @@ +-- +goose Up + +CREATE TABLE greeting_message ( + greeting_id uuid NOT NULL, + event_id uuid NOT NULL, + message_id uuid NOT NULL, + timestamp timestamptz NOT NULL DEFAULT now() +); + +-- +goose Down + +DROP TABLE greeting_message; \ No newline at end of file diff --git a/go.mod b/go.mod index dffc5ef..2e31150 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,10 @@ toolchain go1.24.1 require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250307204501-0409229c3780.1 + github.com/elgris/sqrl v0.0.0-20210727210741-7e0198b30236 github.com/google/uuid v1.6.0 github.com/pentops/flowtest v0.0.0-20250403234635-311159fa1e81 + github.com/pentops/golib v0.0.0-20250326060930-8c83d58ddb63 github.com/pentops/grpc.go v0.0.0-20250326042738-bcdfc2b43fa9 github.com/pentops/j5 v0.0.0-20250407052915-b2fc017d8ac2 github.com/pentops/log.go v0.0.15 @@ -27,7 +29,6 @@ require ( cel.dev/expr v0.22.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/bufbuild/protovalidate-go v0.9.2 // indirect - github.com/elgris/sqrl v0.0.0-20210727210741-7e0198b30236 // indirect github.com/fatih/color v1.18.0 // indirect github.com/google/cel-go v0.24.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect diff --git a/internal/gen/test/v1/test_pb/event_log.j5s.pb.go b/internal/gen/test/v1/test_pb/event_log.j5s.pb.go new file mode 100644 index 0000000..aaee555 --- /dev/null +++ b/internal/gen/test/v1/test_pb/event_log.j5s.pb.go @@ -0,0 +1,195 @@ +// Generated by j5build v0.0.0-20250403212908-de7c3c2e6cce. DO NOT EDIT + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc (unknown) +// source: test/v1/event_log.j5s.proto + +package test_pb + +import ( + _ "buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate" + _ "github.com/pentops/j5/gen/j5/ext/v1/ext_j5pb" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GreetingId string `protobuf:"bytes,1,opt,name=greeting_id,json=greetingId,proto3" json:"greeting_id,omitempty"` + EventId string `protobuf:"bytes,2,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + MessageId string `protobuf:"bytes,3,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_test_v1_event_log_j5s_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_test_v1_event_log_j5s_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_test_v1_event_log_j5s_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetGreetingId() string { + if x != nil { + return x.GreetingId + } + return "" +} + +func (x *Message) GetEventId() string { + if x != nil { + return x.EventId + } + return "" +} + +func (x *Message) GetMessageId() string { + if x != nil { + return x.MessageId + } + return "" +} + +func (x *Message) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + +var File_test_v1_event_log_j5s_proto protoreflect.FileDescriptor + +var file_test_v1_event_log_j5s_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, + 0x6c, 0x6f, 0x67, 0x2e, 0x6a, 0x35, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x74, + 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x62, 0x75, 0x66, 0x2f, 0x76, 0x61, 0x6c, 0x69, + 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x6a, 0x35, 0x2f, 0x65, 0x78, 0x74, 0x2f, 0x76, 0x31, 0x2f, + 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0xf1, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x34, 0x0a, + 0x0b, 0x67, 0x72, 0x65, 0x65, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x42, 0x13, 0xba, 0x48, 0x08, 0xc8, 0x01, 0x01, 0x72, 0x03, 0xb0, 0x01, 0x01, 0xc2, + 0xff, 0x8e, 0x02, 0x03, 0xb2, 0x02, 0x00, 0x52, 0x0a, 0x67, 0x72, 0x65, 0x65, 0x74, 0x69, 0x6e, + 0x67, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x13, 0xba, 0x48, 0x08, 0xc8, 0x01, 0x01, 0x72, 0x03, 0xb0, + 0x01, 0x01, 0xc2, 0xff, 0x8e, 0x02, 0x03, 0xb2, 0x02, 0x00, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x49, 0x64, 0x12, 0x2d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x42, 0x0e, 0xba, 0x48, 0x03, 0xc8, 0x01, 0x01, 0xc2, + 0xff, 0x8e, 0x02, 0x03, 0xf2, 0x01, 0x00, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x49, 0x64, 0x12, 0x48, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x42, 0x0e, 0xba, 0x48, 0x03, 0xc8, 0x01, 0x01, 0xc2, 0xff, 0x8e, 0x02, 0x03, 0xaa, 0x02, + 0x00, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x3a, 0x07, 0xc2, 0xff, + 0x8e, 0x02, 0x02, 0x52, 0x00, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x65, 0x6e, 0x74, 0x6f, 0x70, 0x73, 0x2f, 0x6f, 0x35, 0x2d, 0x74, + 0x65, 0x73, 0x74, 0x2d, 0x61, 0x70, 0x70, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x74, 0x65, 0x73, + 0x74, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_v1_event_log_j5s_proto_rawDescOnce sync.Once + file_test_v1_event_log_j5s_proto_rawDescData = file_test_v1_event_log_j5s_proto_rawDesc +) + +func file_test_v1_event_log_j5s_proto_rawDescGZIP() []byte { + file_test_v1_event_log_j5s_proto_rawDescOnce.Do(func() { + file_test_v1_event_log_j5s_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_v1_event_log_j5s_proto_rawDescData) + }) + return file_test_v1_event_log_j5s_proto_rawDescData +} + +var file_test_v1_event_log_j5s_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_test_v1_event_log_j5s_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: test.v1.Message + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp +} +var file_test_v1_event_log_j5s_proto_depIdxs = []int32{ + 1, // 0: test.v1.Message.timestamp:type_name -> google.protobuf.Timestamp + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_test_v1_event_log_j5s_proto_init() } +func file_test_v1_event_log_j5s_proto_init() { + if File_test_v1_event_log_j5s_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_v1_event_log_j5s_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_v1_event_log_j5s_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_test_v1_event_log_j5s_proto_goTypes, + DependencyIndexes: file_test_v1_event_log_j5s_proto_depIdxs, + MessageInfos: file_test_v1_event_log_j5s_proto_msgTypes, + }.Build() + File_test_v1_event_log_j5s_proto = out.File + file_test_v1_event_log_j5s_proto_rawDesc = nil + file_test_v1_event_log_j5s_proto_goTypes = nil + file_test_v1_event_log_j5s_proto_depIdxs = nil +} diff --git a/internal/gen/test/v1/test_pb/event_log.j5s_sugar.pb.go b/internal/gen/test/v1/test_pb/event_log.j5s_sugar.pb.go new file mode 100644 index 0000000..f937d68 --- /dev/null +++ b/internal/gen/test/v1/test_pb/event_log.j5s_sugar.pb.go @@ -0,0 +1,3 @@ +// Code generated by protoc-gen-go-sugar. DO NOT EDIT. + +package test_pb diff --git a/internal/gen/test/v1/test_spb/event_log.p.j5s.pb.go b/internal/gen/test/v1/test_spb/event_log.p.j5s.pb.go new file mode 100644 index 0000000..484c1c9 --- /dev/null +++ b/internal/gen/test/v1/test_spb/event_log.p.j5s.pb.go @@ -0,0 +1,256 @@ +// Generated by j5build v0.0.0-20250403212908-de7c3c2e6cce. DO NOT EDIT + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc (unknown) +// source: test/v1/service/event_log.p.j5s.proto + +package test_spb + +import ( + _ "buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate" + _ "github.com/pentops/j5/gen/j5/ext/v1/ext_j5pb" + test_pb "github.com/pentops/o5-test-app/internal/gen/test/v1/test_pb" + _ "google.golang.org/genproto/googleapis/api/annotations" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetMessagesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GreetingId *string `protobuf:"bytes,1,opt,name=greeting_id,json=greetingId,proto3,oneof" json:"greeting_id,omitempty"` + EventId *string `protobuf:"bytes,2,opt,name=event_id,json=eventId,proto3,oneof" json:"event_id,omitempty"` +} + +func (x *GetMessagesRequest) Reset() { + *x = GetMessagesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_test_v1_service_event_log_p_j5s_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetMessagesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetMessagesRequest) ProtoMessage() {} + +func (x *GetMessagesRequest) ProtoReflect() protoreflect.Message { + mi := &file_test_v1_service_event_log_p_j5s_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetMessagesRequest.ProtoReflect.Descriptor instead. +func (*GetMessagesRequest) Descriptor() ([]byte, []int) { + return file_test_v1_service_event_log_p_j5s_proto_rawDescGZIP(), []int{0} +} + +func (x *GetMessagesRequest) GetGreetingId() string { + if x != nil && x.GreetingId != nil { + return *x.GreetingId + } + return "" +} + +func (x *GetMessagesRequest) GetEventId() string { + if x != nil && x.EventId != nil { + return *x.EventId + } + return "" +} + +type GetMessagesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Messages []*test_pb.Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` +} + +func (x *GetMessagesResponse) Reset() { + *x = GetMessagesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_test_v1_service_event_log_p_j5s_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetMessagesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetMessagesResponse) ProtoMessage() {} + +func (x *GetMessagesResponse) ProtoReflect() protoreflect.Message { + mi := &file_test_v1_service_event_log_p_j5s_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetMessagesResponse.ProtoReflect.Descriptor instead. +func (*GetMessagesResponse) Descriptor() ([]byte, []int) { + return file_test_v1_service_event_log_p_j5s_proto_rawDescGZIP(), []int{1} +} + +func (x *GetMessagesResponse) GetMessages() []*test_pb.Message { + if x != nil { + return x.Messages + } + return nil +} + +var File_test_v1_service_event_log_p_j5s_proto protoreflect.FileDescriptor + +var file_test_v1_service_event_log_p_j5s_proto_rawDesc = []byte{ + 0x0a, 0x25, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x2e, 0x6a, 0x35, + 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x1a, 0x1b, 0x62, 0x75, 0x66, 0x2f, 0x76, 0x61, + 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, + 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x6a, 0x35, 0x2f, 0x65, 0x78, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x61, + 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x1a, 0x1b, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, + 0x6c, 0x6f, 0x67, 0x2e, 0x6a, 0x35, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x01, + 0x0a, 0x12, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x67, 0x72, 0x65, 0x65, 0x74, 0x69, 0x6e, 0x67, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x10, 0xba, 0x48, 0x05, 0x72, 0x03, + 0xb0, 0x01, 0x01, 0xc2, 0xff, 0x8e, 0x02, 0x03, 0xb2, 0x02, 0x00, 0x48, 0x00, 0x52, 0x0a, 0x67, + 0x72, 0x65, 0x65, 0x74, 0x69, 0x6e, 0x67, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a, 0x08, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x10, + 0xba, 0x48, 0x05, 0x72, 0x03, 0xb0, 0x01, 0x01, 0xc2, 0xff, 0x8e, 0x02, 0x03, 0xb2, 0x02, 0x00, + 0x48, 0x01, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x88, 0x01, 0x01, 0x3a, 0x07, + 0xc2, 0xff, 0x8e, 0x02, 0x02, 0x52, 0x00, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x67, 0x72, 0x65, 0x65, + 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x69, 0x64, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x5f, 0x69, 0x64, 0x22, 0x56, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, + 0x74, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, + 0x08, 0xc2, 0xff, 0x8e, 0x02, 0x03, 0xaa, 0x01, 0x00, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x3a, 0x07, 0xc2, 0xff, 0x8e, 0x02, 0x02, 0x52, 0x00, 0x32, 0x8f, 0x01, 0x0a, + 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x7c, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, + 0x23, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x22, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x1c, 0x12, 0x1a, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x6c, 0x6f, 0x67, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x3e, + 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x65, 0x6e, + 0x74, 0x6f, 0x70, 0x73, 0x2f, 0x6f, 0x35, 0x2d, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x61, 0x70, 0x70, + 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x74, 0x65, + 0x73, 0x74, 0x2f, 0x76, 0x31, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x73, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_v1_service_event_log_p_j5s_proto_rawDescOnce sync.Once + file_test_v1_service_event_log_p_j5s_proto_rawDescData = file_test_v1_service_event_log_p_j5s_proto_rawDesc +) + +func file_test_v1_service_event_log_p_j5s_proto_rawDescGZIP() []byte { + file_test_v1_service_event_log_p_j5s_proto_rawDescOnce.Do(func() { + file_test_v1_service_event_log_p_j5s_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_v1_service_event_log_p_j5s_proto_rawDescData) + }) + return file_test_v1_service_event_log_p_j5s_proto_rawDescData +} + +var file_test_v1_service_event_log_p_j5s_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_test_v1_service_event_log_p_j5s_proto_goTypes = []interface{}{ + (*GetMessagesRequest)(nil), // 0: test.v1.service.GetMessagesRequest + (*GetMessagesResponse)(nil), // 1: test.v1.service.GetMessagesResponse + (*test_pb.Message)(nil), // 2: test.v1.Message +} +var file_test_v1_service_event_log_p_j5s_proto_depIdxs = []int32{ + 2, // 0: test.v1.service.GetMessagesResponse.messages:type_name -> test.v1.Message + 0, // 1: test.v1.service.EventLogService.GetMessages:input_type -> test.v1.service.GetMessagesRequest + 1, // 2: test.v1.service.EventLogService.GetMessages:output_type -> test.v1.service.GetMessagesResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_test_v1_service_event_log_p_j5s_proto_init() } +func file_test_v1_service_event_log_p_j5s_proto_init() { + if File_test_v1_service_event_log_p_j5s_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_v1_service_event_log_p_j5s_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetMessagesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_v1_service_event_log_p_j5s_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetMessagesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_test_v1_service_event_log_p_j5s_proto_msgTypes[0].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_v1_service_event_log_p_j5s_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_test_v1_service_event_log_p_j5s_proto_goTypes, + DependencyIndexes: file_test_v1_service_event_log_p_j5s_proto_depIdxs, + MessageInfos: file_test_v1_service_event_log_p_j5s_proto_msgTypes, + }.Build() + File_test_v1_service_event_log_p_j5s_proto = out.File + file_test_v1_service_event_log_p_j5s_proto_rawDesc = nil + file_test_v1_service_event_log_p_j5s_proto_goTypes = nil + file_test_v1_service_event_log_p_j5s_proto_depIdxs = nil +} diff --git a/internal/gen/test/v1/test_spb/event_log.p.j5s_grpc.pb.go b/internal/gen/test/v1/test_spb/event_log.p.j5s_grpc.pb.go new file mode 100644 index 0000000..7e96b54 --- /dev/null +++ b/internal/gen/test/v1/test_spb/event_log.p.j5s_grpc.pb.go @@ -0,0 +1,111 @@ +// Generated by j5build v0.0.0-20250403212908-de7c3c2e6cce. DO NOT EDIT + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc (unknown) +// source: test/v1/service/event_log.p.j5s.proto + +package test_spb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + EventLogService_GetMessages_FullMethodName = "/test.v1.service.EventLogService/GetMessages" +) + +// EventLogServiceClient is the client API for EventLogService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type EventLogServiceClient interface { + GetMessages(ctx context.Context, in *GetMessagesRequest, opts ...grpc.CallOption) (*GetMessagesResponse, error) +} + +type eventLogServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewEventLogServiceClient(cc grpc.ClientConnInterface) EventLogServiceClient { + return &eventLogServiceClient{cc} +} + +func (c *eventLogServiceClient) GetMessages(ctx context.Context, in *GetMessagesRequest, opts ...grpc.CallOption) (*GetMessagesResponse, error) { + out := new(GetMessagesResponse) + err := c.cc.Invoke(ctx, EventLogService_GetMessages_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// EventLogServiceServer is the server API for EventLogService service. +// All implementations must embed UnimplementedEventLogServiceServer +// for forward compatibility +type EventLogServiceServer interface { + GetMessages(context.Context, *GetMessagesRequest) (*GetMessagesResponse, error) + mustEmbedUnimplementedEventLogServiceServer() +} + +// UnimplementedEventLogServiceServer must be embedded to have forward compatible implementations. +type UnimplementedEventLogServiceServer struct { +} + +func (UnimplementedEventLogServiceServer) GetMessages(context.Context, *GetMessagesRequest) (*GetMessagesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMessages not implemented") +} +func (UnimplementedEventLogServiceServer) mustEmbedUnimplementedEventLogServiceServer() {} + +// UnsafeEventLogServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EventLogServiceServer will +// result in compilation errors. +type UnsafeEventLogServiceServer interface { + mustEmbedUnimplementedEventLogServiceServer() +} + +func RegisterEventLogServiceServer(s grpc.ServiceRegistrar, srv EventLogServiceServer) { + s.RegisterService(&EventLogService_ServiceDesc, srv) +} + +func _EventLogService_GetMessages_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetMessagesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventLogServiceServer).GetMessages(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventLogService_GetMessages_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventLogServiceServer).GetMessages(ctx, req.(*GetMessagesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// EventLogService_ServiceDesc is the grpc.ServiceDesc for EventLogService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var EventLogService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "test.v1.service.EventLogService", + HandlerType: (*EventLogServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetMessages", + Handler: _EventLogService_GetMessages_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "test/v1/service/event_log.p.j5s.proto", +} diff --git a/internal/gen/test/v1/test_spb/event_log.p.j5s_sugar.pb.go b/internal/gen/test/v1/test_spb/event_log.p.j5s_sugar.pb.go new file mode 100644 index 0000000..2bfa669 --- /dev/null +++ b/internal/gen/test/v1/test_spb/event_log.p.j5s_sugar.pb.go @@ -0,0 +1,3 @@ +// Code generated by protoc-gen-go-sugar. DO NOT EDIT. + +package test_spb diff --git a/internal/genclient/j5/state/v1/state/generated.go b/internal/genclient/j5/state/v1/state/generated.go index 91f81f9..ae4aac5 100644 --- a/internal/genclient/j5/state/v1/state/generated.go +++ b/internal/genclient/j5/state/v1/state/generated.go @@ -65,6 +65,21 @@ type EventMetadata struct { Cause *Cause `json:"cause,omitempty"` } +// EventPublishMetadata Proto: EventPublishMetadata +type EventPublishMetadata struct { + EventId string `json:"eventId,omitempty"` + Sequence uint64 `json:"sequence,omitempty,string"` + Timestamp *time.Time `json:"timestamp"` + Cause *Cause `json:"cause,omitempty"` + Auth *PublishAuth `json:"auth,omitempty"` +} + +// EventTenant Proto: EventTenant +type EventTenant struct { + TenantType string `json:"tenantType,omitempty"` + TenantId string `json:"tenantId,omitempty"` +} + // ExternalEventCause Proto: ExternalEventCause type ExternalEventCause struct { SystemName string `json:"systemName,omitempty"` @@ -82,6 +97,12 @@ type PSMEventCause struct { StateMachine string `json:"stateMachine,omitempty"` } +// PublishAuth Proto: PublishAuth +type PublishAuth struct { + RequiredScopes []string `json:"requiredScopes,omitempty"` + TenantKeys []*EventTenant `json:"tenantKeys,omitempty"` +} + // StateMetadata Proto: StateMetadata type StateMetadata struct { CreatedAt *time.Time `json:"createdAt,omitempty"` diff --git a/internal/genclient/o5/ges/v1/ges/generated.go b/internal/genclient/o5/ges/v1/ges/generated.go new file mode 100644 index 0000000..c6dab13 --- /dev/null +++ b/internal/genclient/o5/ges/v1/ges/generated.go @@ -0,0 +1,270 @@ +package ges + +// Code generated by jsonapi. DO NOT EDIT. +// Source: github.com/pentops/o5-test-app/internal/genclient/o5/ges/v1/ges + +import ( + context "context" + json "encoding/json" + list "github.com/pentops/o5-test-app/internal/genclient/j5/list/v1/list" + state "github.com/pentops/o5-test-app/internal/genclient/j5/state/v1/state" + url "net/url" + strings "strings" + time "time" +) + +type Requester interface { + Request(ctx context.Context, method string, path string, body interface{}, response interface{}) error +} + +// CommandService +type CommandService struct { + Requester +} + +func NewCommandService(requester Requester) *CommandService { + return &CommandService{ + Requester: requester, + } +} + +func (s CommandService) ReplayEvents(ctx context.Context, req *ReplayEventsRequest) (*ReplayEventsResponse, error) { + pathParts := make([]string, 5) + pathParts[0] = "" + pathParts[1] = "ges" + pathParts[2] = "v1" + pathParts[3] = "events" + pathParts[4] = "replay" + path := strings.Join(pathParts, "/") + resp := &ReplayEventsResponse{} + err := s.Request(ctx, "POST", path, req, resp) + if err != nil { + return nil, err + } + return resp, nil +} + +func (s CommandService) ReplayUpserts(ctx context.Context, req *ReplayUpsertsRequest) (*ReplayUpsertsResponse, error) { + pathParts := make([]string, 5) + pathParts[0] = "" + pathParts[1] = "ges" + pathParts[2] = "v1" + pathParts[3] = "upserts" + pathParts[4] = "replay" + path := strings.Join(pathParts, "/") + resp := &ReplayUpsertsResponse{} + err := s.Request(ctx, "POST", path, req, resp) + if err != nil { + return nil, err + } + return resp, nil +} + +// ReplayEventsRequest +type ReplayEventsRequest struct { + QueueUrl string `json:"queueUrl"` + GrpcService string `json:"grpcService"` + GrpcMethod string `json:"grpcMethod"` +} + +// ReplayEventsResponse +type ReplayEventsResponse struct { +} + +// ReplayUpsertsRequest +type ReplayUpsertsRequest struct { + QueueUrl string `json:"queueUrl"` + GrpcService string `json:"grpcService"` + GrpcMethod string `json:"grpcMethod"` +} + +// ReplayUpsertsResponse +type ReplayUpsertsResponse struct { +} + +// QueryService +type QueryService struct { + Requester +} + +func NewQueryService(requester Requester) *QueryService { + return &QueryService{ + Requester: requester, + } +} + +func (s QueryService) EventsList(ctx context.Context, req *EventsListRequest) (*EventsListResponse, error) { + pathParts := make([]string, 4) + pathParts[0] = "" + pathParts[1] = "ges" + pathParts[2] = "v1" + pathParts[3] = "events" + path := strings.Join(pathParts, "/") + if query, err := req.QueryParameters(); err != nil { + return nil, err + } else if len(query) > 0 { + path += "?" + query.Encode() + } + resp := &EventsListResponse{} + err := s.Request(ctx, "GET", path, req, resp) + if err != nil { + return nil, err + } + return resp, nil +} + +func (s QueryService) UpsertList(ctx context.Context, req *UpsertListRequest) (*UpsertListResponse, error) { + pathParts := make([]string, 4) + pathParts[0] = "" + pathParts[1] = "ges" + pathParts[2] = "v1" + pathParts[3] = "upsert" + path := strings.Join(pathParts, "/") + if query, err := req.QueryParameters(); err != nil { + return nil, err + } else if len(query) > 0 { + path += "?" + query.Encode() + } + resp := &UpsertListResponse{} + err := s.Request(ctx, "GET", path, req, resp) + if err != nil { + return nil, err + } + return resp, nil +} + +// EventsListRequest +type EventsListRequest struct { + Page *list.PageRequest `json:"-" query:"page"` + Query *list.QueryRequest `json:"-" query:"query"` +} + +func (s *EventsListRequest) SetPageToken(pageToken string) { + if s.Page == nil { + s.Page = &list.PageRequest{} + } + s.Page.Token = &pageToken +} + +func (s EventsListRequest) QueryParameters() (url.Values, error) { + values := url.Values{} + if s.Page != nil { + bb, err := json.Marshal(s.Page) + if err != nil { + return nil, err + } + values.Set("page", string(bb)) + } + if s.Query != nil { + bb, err := json.Marshal(s.Query) + if err != nil { + return nil, err + } + values.Set("query", string(bb)) + } + return values, nil +} + +// EventsListResponse +type EventsListResponse struct { + Page *list.PageResponse `json:"page,omitempty"` + Events []*Event `json:"events,omitempty"` +} + +func (s EventsListResponse) GetPageToken() *string { + if s.Page == nil { + return nil + } + return s.Page.NextToken +} + +func (s EventsListResponse) GetItems() []*Event { + return s.Events +} + +// UpsertListRequest +type UpsertListRequest struct { + Page *list.PageRequest `json:"-" query:"page"` + Query *list.QueryRequest `json:"-" query:"query"` +} + +func (s *UpsertListRequest) SetPageToken(pageToken string) { + if s.Page == nil { + s.Page = &list.PageRequest{} + } + s.Page.Token = &pageToken +} + +func (s UpsertListRequest) QueryParameters() (url.Values, error) { + values := url.Values{} + if s.Page != nil { + bb, err := json.Marshal(s.Page) + if err != nil { + return nil, err + } + values.Set("page", string(bb)) + } + if s.Query != nil { + bb, err := json.Marshal(s.Query) + if err != nil { + return nil, err + } + values.Set("query", string(bb)) + } + return values, nil +} + +// UpsertListResponse +type UpsertListResponse struct { + Page *list.PageResponse `json:"page,omitempty"` + Events []*Upsert `json:"events,omitempty"` +} + +func (s UpsertListResponse) GetPageToken() *string { + if s.Page == nil { + return nil + } + return s.Page.NextToken +} + +func (s UpsertListResponse) GetItems() []*Upsert { + return s.Events +} + +// Event Proto: Event +type Event struct { + EntityName string `json:"entityName"` + Metadata *state.EventPublishMetadata `json:"metadata"` + GrpcMethod string `json:"grpcMethod"` + GrpcService string `json:"grpcService"` + BodyType string `json:"bodyType"` + EventType string `json:"eventType"` + EntityKeys interface{} `json:"entityKeys"` + EventData interface{} `json:"eventData"` + EntityState interface{} `json:"entityState"` + EntityStatus string `json:"entityStatus"` +} + +// Upsert Proto: Upsert +type Upsert struct { + EntityName string `json:"entityName"` + EntityId string `json:"entityId"` + GrpcMethod string `json:"grpcMethod"` + GrpcService string `json:"grpcService"` + LastEventId string `json:"lastEventId"` + LastEventTimestamp *time.Time `json:"lastEventTimestamp"` + Data interface{} `json:"data"` +} + +// CombinedClient +type CombinedClient struct { + *CommandService + *QueryService +} + +func NewCombinedClient(requester Requester) *CombinedClient { + return &CombinedClient{ + CommandService: NewCommandService(requester), + QueryService: NewQueryService(requester), + } +} diff --git a/internal/genclient/test/v1/test/generated.go b/internal/genclient/test/v1/test/generated.go index 9e64e61..aa4c691 100644 --- a/internal/genclient/test/v1/test/generated.go +++ b/internal/genclient/test/v1/test/generated.go @@ -11,12 +11,67 @@ import ( state "github.com/pentops/o5-test-app/internal/genclient/j5/state/v1/state" url "net/url" strings "strings" + time "time" ) type Requester interface { Request(ctx context.Context, method string, path string, body interface{}, response interface{}) error } +// EventLogService +type EventLogService struct { + Requester +} + +func NewEventLogService(requester Requester) *EventLogService { + return &EventLogService{ + Requester: requester, + } +} + +func (s EventLogService) GetMessages(ctx context.Context, req *GetMessagesRequest) (*GetMessagesResponse, error) { + pathParts := make([]string, 5) + pathParts[0] = "" + pathParts[1] = "test" + pathParts[2] = "v1" + pathParts[3] = "eventlog" + pathParts[4] = "messages" + path := strings.Join(pathParts, "/") + if query, err := req.QueryParameters(); err != nil { + return nil, err + } else if len(query) > 0 { + path += "?" + query.Encode() + } + resp := &GetMessagesResponse{} + err := s.Request(ctx, "GET", path, req, resp) + if err != nil { + return nil, err + } + return resp, nil +} + +// GetMessagesRequest +type GetMessagesRequest struct { + GreetingId *string `json:"-" query:"greetingId"` + EventId *string `json:"-" query:"eventId"` +} + +func (s GetMessagesRequest) QueryParameters() (url.Values, error) { + values := url.Values{} + if s.GreetingId != nil { + values.Set("greetingId", *s.GreetingId) + } + if s.EventId != nil { + values.Set("eventId", *s.EventId) + } + return values, nil +} + +// GetMessagesResponse +type GetMessagesResponse struct { + Messages []*Message `json:"messages,omitempty"` +} + // GreetingQueryService type GreetingQueryService struct { Requester @@ -332,6 +387,14 @@ const ( GreetingStatus_REPLIED GreetingStatus = "REPLIED" ) +// Message Proto: Message +type Message struct { + GreetingId string `json:"greetingId"` + EventId string `json:"eventId"` + MessageId string `json:"messageId"` + Timestamp *time.Time `json:"timestamp"` +} + // TestError Proto: TestError type TestError struct { Message string `json:"message,omitempty"` @@ -340,12 +403,14 @@ type TestError struct { // CombinedClient type CombinedClient struct { + *EventLogService *GreetingCommandService *GreetingQueryService } func NewCombinedClient(requester Requester) *CombinedClient { return &CombinedClient{ + EventLogService: NewEventLogService(requester), GreetingCommandService: NewGreetingCommandService(requester), GreetingQueryService: NewGreetingQueryService(requester), } diff --git a/internal/integration/event_log_test.go b/internal/integration/event_log_test.go new file mode 100644 index 0000000..8da25bd --- /dev/null +++ b/internal/integration/event_log_test.go @@ -0,0 +1,78 @@ +package integration + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/pentops/flowtest" + "github.com/pentops/golib/gl" + "github.com/pentops/o5-test-app/internal/gen/test/v1/test_pb" + "github.com/pentops/o5-test-app/internal/gen/test/v1/test_spb" + "github.com/pentops/o5-test-app/internal/gen/test/v1/test_tpb" + "github.com/pentops/realms/authtest" +) + +func TestEventLog(t *testing.T) { + flow, uu := NewUniverse(t) + defer flow.RunSteps(t) + + // Variables which cross step boundaries are declared + var greetingID string + var createdEvent *test_tpb.GreetingEventMessage + + var requestMessage *test_tpb.GreetingMessage + + flow.Step("Hello", func(ctx context.Context, t flowtest.Asserter) { + ctx = authtest.JWTContext(ctx) + + greetingID = uuid.NewString() + + res, err := uu.GreetingCommand.Hello(ctx, &test_spb.HelloRequest{ + GreetingId: greetingID, + Name: "World", + }) + t.NoError(err) + t.Equal("World", res.Greeting.Data.Name) + t.Equal(TestVersion, res.Greeting.Data.AppVersion) + t.Equal(test_pb.GreetingStatus_INITIATED, res.Greeting.Status) + + requestMessage = uu.PopGreeting(t) + t.Equal(greetingID, requestMessage.GreetingId) + + createdEvent = uu.PopGreetingEvent(t) + }) + + flow.Step("Receive First Greeting Event", func(ctx context.Context, t flowtest.Asserter) { + t.MustMessage(uu.GreetingPublish.GreetingEvent(ctx, createdEvent)) + }) + + flow.Step("Check Messages", func(ctx context.Context, t flowtest.Asserter) { + ctx = authtest.JWTContext(ctx) + messages, err := uu.EventLog.GetMessages(ctx, &test_spb.GetMessagesRequest{ + GreetingId: gl.Ptr(greetingID), + EventId: gl.Ptr(createdEvent.Metadata.EventId), + }) + t.NoError(err) + t.Equal(1, len(messages.Messages)) + t.Equal(greetingID, messages.Messages[0].GreetingId) + }) + + flow.Step("Receive Replay Greeting Event", func(ctx context.Context, t flowtest.Asserter) { + t.MustMessage(uu.GreetingPublish.GreetingEvent(ctx, createdEvent)) + }) + + flow.Step("Check Messages", func(ctx context.Context, t flowtest.Asserter) { + ctx = authtest.JWTContext(ctx) + messages, err := uu.EventLog.GetMessages(ctx, &test_spb.GetMessagesRequest{ + GreetingId: gl.Ptr(greetingID), + EventId: gl.Ptr(createdEvent.Metadata.EventId), + }) + t.NoError(err) + t.Equal(2, len(messages.Messages)) + t.Equal(greetingID, messages.Messages[0].GreetingId) + if messages.Messages[0].MessageId == messages.Messages[1].MessageId { + t.Errorf("MessageId should be different") + } + }) +} diff --git a/internal/integration/service_test.go b/internal/integration/service_test.go index 46b6633..1778589 100644 --- a/internal/integration/service_test.go +++ b/internal/integration/service_test.go @@ -21,6 +21,7 @@ func TestService(t *testing.T) { // Variables which cross step boundaries are declared var greetingID string + var requestMessage *test_tpb.GreetingMessage flow.Step("Hello", func(ctx context.Context, t flowtest.Asserter) { @@ -61,6 +62,7 @@ func TestService(t *testing.T) { t.NoError(err) t.Equal("Hello World", *greeting.Greeting.Data.ReplyMessage) }) + } func TestThrowError(t *testing.T) { diff --git a/internal/integration/universe_test.go b/internal/integration/universe_test.go index 75f7784..e85eda9 100644 --- a/internal/integration/universe_test.go +++ b/internal/integration/universe_test.go @@ -11,6 +11,7 @@ import ( "github.com/pentops/o5-test-app/internal/gen/test/v1/test_tpb" "github.com/pentops/o5-test-app/internal/service" "github.com/pentops/pgtest.go/pgtest" + "github.com/pentops/realms/authtest" "github.com/pentops/sqrlx.go/sqrlx" ) @@ -19,6 +20,8 @@ type Universe struct { GreetingCommand test_spb.GreetingCommandServiceClient GreetingQuery test_spb.GreetingQueryServiceClient TestTopic test_tpb.TestTopicClient + EventLog test_spb.EventLogServiceClient + GreetingPublish test_tpb.GreetingPublishTopicClient } func NewUniverse(t *testing.T) (*flowtest.Stepper[*testing.T], *Universe) { @@ -52,7 +55,9 @@ func setupUniverse(ctx context.Context, t flowtest.Asserter, uu *Universe) { uu.Outbox = outboxtest.NewOutboxAsserter(t, conn) - grpcPair := flowtest.NewGRPCPair(t, service.GRPCMiddleware(TestVersion)...) + middleware := service.GRPCMiddleware("testing") + middleware = append(middleware, authtest.AutoMessageGRPCMiddleware) + grpcPair := flowtest.NewGRPCPair(t, middleware...) app, err := service.NewApp(db, TestVersion) if err != nil { @@ -64,6 +69,8 @@ func setupUniverse(ctx context.Context, t flowtest.Asserter, uu *Universe) { uu.GreetingCommand = test_spb.NewGreetingCommandServiceClient(grpcPair.Client) uu.GreetingQuery = test_spb.NewGreetingQueryServiceClient(grpcPair.Client) uu.TestTopic = test_tpb.NewTestTopicClient(grpcPair.Client) + uu.EventLog = test_spb.NewEventLogServiceClient(grpcPair.Client) + uu.GreetingPublish = test_tpb.NewGreetingPublishTopicClient(grpcPair.Client) grpcPair.ServeUntilDone(t, ctx) } diff --git a/internal/service/app.go b/internal/service/app.go index ac13ef6..ada7205 100644 --- a/internal/service/app.go +++ b/internal/service/app.go @@ -13,13 +13,18 @@ import ( type App struct { GreetingCommand *GreetingCommandService GreetingQuery *GreetingQueryService + TestAccess *TestAccess TestWorker *TestWorker + PublishWorker *PublishWorker } func (aa *App) RegisterGRPC(server grpc.ServiceRegistrar) { aa.GreetingCommand.RegisterGRPC(server) aa.GreetingQuery.RegisterGRPC(server) aa.TestWorker.RegisterGRPC(server) + aa.TestAccess.RegisterGRPC(server) + aa.PublishWorker.RegisterGRPC(server) + } func NewApp(db sqrlx.Transactor, appVersion string) (*App, error) { @@ -43,10 +48,22 @@ func NewApp(db sqrlx.Transactor, appVersion string) (*App, error) { return nil, err } + testAccess, err := NewTestAccess(db) + if err != nil { + return nil, err + } + + publishWorker, err := NewPublishWorker(db) + if err != nil { + return nil, err + } + return &App{ GreetingCommand: commandService, GreetingQuery: testQueryService, TestWorker: testWorker, + TestAccess: testAccess, + PublishWorker: publishWorker, }, nil } diff --git a/internal/service/ges_worker.go b/internal/service/ges_worker.go new file mode 100644 index 0000000..ed5d805 --- /dev/null +++ b/internal/service/ges_worker.go @@ -0,0 +1,56 @@ +package service + +import ( + "context" + "database/sql" + + sq "github.com/elgris/sqrl" + "github.com/pentops/o5-test-app/internal/gen/test/v1/test_tpb" + "github.com/pentops/realms/j5auth" + "github.com/pentops/sqrlx.go/sqrlx" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +type PublishWorker struct { + db sqrlx.Transactor + test_tpb.UnsafeGreetingPublishTopicServer +} + +func NewPublishWorker(db sqrlx.Transactor) (*PublishWorker, error) { + return &PublishWorker{ + db: db, + }, nil +} + +func (ww *PublishWorker) RegisterGRPC(server grpc.ServiceRegistrar) { + test_tpb.RegisterGreetingPublishTopicServer(server, ww) +} + +func (ww *PublishWorker) GreetingEvent(ctx context.Context, req *test_tpb.GreetingEventMessage) (*emptypb.Empty, error) { + + message, err := j5auth.GetMessageCause(ctx) + if err != nil { + return nil, err + } + err = ww.db.Transact(ctx, &sqrlx.TxOptions{ + ReadOnly: false, + Retryable: true, + Isolation: sql.LevelDefault, + }, func(ctx context.Context, tx sqrlx.Transaction) error { + + _, err := tx.Insert(ctx, sq. + Insert("greeting_message"). + Columns("greeting_id", "event_id", "message_id"). + Values(req.Keys.GreetingId, req.Metadata.EventId, message.MessageId)) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} diff --git a/internal/service/test_access.go b/internal/service/test_access.go new file mode 100644 index 0000000..df056a2 --- /dev/null +++ b/internal/service/test_access.go @@ -0,0 +1,92 @@ +package service + +import ( + + // Replace with the actual path to the generated protobuf package + + "context" + "database/sql" + "time" + + sq "github.com/elgris/sqrl" + "github.com/pentops/o5-test-app/internal/gen/test/v1/test_pb" + "github.com/pentops/o5-test-app/internal/gen/test/v1/test_spb" + "github.com/pentops/realms/j5auth" + "github.com/pentops/sqrlx.go/sqrlx" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type TestAccess struct { + db sqrlx.Transactor + test_spb.UnsafeEventLogServiceServer +} + +func NewTestAccess(db sqrlx.Transactor) (*TestAccess, error) { + return &TestAccess{ + db: db, + }, nil +} + +func (ta *TestAccess) RegisterGRPC(server grpc.ServiceRegistrar) { + test_spb.RegisterEventLogServiceServer(server, ta) +} + +func (ta *TestAccess) GetMessages(ctx context.Context, req *test_spb.GetMessagesRequest) (*test_spb.GetMessagesResponse, error) { + + _, err := j5auth.GetAuthenticatedAction(ctx) + if err != nil { + return nil, err + } + + qq := sq.Select("message_id", "greeting_id", "event_id", "timestamp"). + From("greeting_message") + + if req.EventId == nil && req.GreetingId == nil { + return nil, status.Error(codes.InvalidArgument, "Either EventId or GreetingId must be provided") + } + + if req.EventId != nil { + qq.Where("event_id = ?", req.EventId) + } + if req.GreetingId != nil { + qq.Where("greeting_id = ?", req.GreetingId) + } + + res := &test_spb.GetMessagesResponse{} + + err = ta.db.Transact(ctx, &sqrlx.TxOptions{ + ReadOnly: true, + Retryable: true, + Isolation: sql.LevelDefault, + }, func(ctx context.Context, tx sqrlx.Transaction) error { + rows, err := tx.Select(ctx, qq) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + message := &test_pb.Message{} + var timestamp time.Time + if err := rows.Scan(&message.MessageId, &message.GreetingId, &message.EventId, ×tamp); err != nil { + return err + } + message.Timestamp = timestamppb.New(timestamp) + res.Messages = append(res.Messages, message) + } + if err := rows.Err(); err != nil { + return err + } + return nil + + }) + + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/internal/service/test_worker.go b/internal/service/test_worker.go index 71ef5f1..10246f5 100644 --- a/internal/service/test_worker.go +++ b/internal/service/test_worker.go @@ -3,13 +3,11 @@ package service import ( "context" "fmt" - "time" - "github.com/google/uuid" - "github.com/pentops/j5/gen/j5/state/v1/psm_j5pb" "github.com/pentops/o5-test-app/internal/gen/test/v1/test_pb" "github.com/pentops/o5-test-app/internal/gen/test/v1/test_tpb" "github.com/pentops/o5-test-app/internal/state" + "github.com/pentops/realms/j5auth" "github.com/pentops/sqrlx.go/sqrlx" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -36,12 +34,6 @@ func (ww *TestWorker) RegisterGRPC(server grpc.ServiceRegistrar) { test_tpb.RegisterTestTopicServer(server, ww) } -var replyNamespace = uuid.MustParse("7B4D4FB7-28BA-4848-9EE3-4C3B0B2263E6") - -func replyID(greetingID string) string { - return uuid.NewSHA1(replyNamespace, []byte(greetingID)).String() -} - func (ww *TestWorker) Greeting(ctx context.Context, req *test_tpb.GreetingMessage) (*emptypb.Empty, error) { if req.WorkerError != nil { @@ -53,30 +45,22 @@ func (ww *TestWorker) Greeting(ctx context.Context, req *test_tpb.GreetingMessag return nil, status.Error(codes.Code(req.WorkerError.Code), req.WorkerError.Message) } - // TODO: Greeting should reply to a reply topic with the reply, but for now - // we are just going directly to the state machine. + message, err := j5auth.GetMessageCause(ctx) + if err != nil { + return nil, err + } evt := &test_pb.GreetingPSMEventSpec{ Keys: &test_pb.GreetingKeys{ GreetingId: req.GreetingId, }, - EventID: replyID(req.GreetingId), - Timestamp: time.Now(), - Cause: &psm_j5pb.Cause{ - Type: &psm_j5pb.Cause_ExternalEvent{ - ExternalEvent: &psm_j5pb.ExternalEventCause{ - SystemName: "test", - EventName: "greeting", - }, - }, - }, - + Message: message, Event: &test_pb.GreetingEventType_Replied{ ReplyMessage: fmt.Sprintf("Hello %s", req.Name), }, } - _, err := ww.stateMachines.Greeting.Transition(ctx, ww.db, evt) + _, err = ww.stateMachines.Greeting.Transition(ctx, ww.db, evt) if err != nil { return nil, err } diff --git a/internal/test/000_index.go b/internal/test/000_index.go index 408ee42..dfc2175 100644 --- a/internal/test/000_index.go +++ b/internal/test/000_index.go @@ -14,6 +14,7 @@ func Run(ctx context.Context, cfg *universe.APIConfig, tags []string) error { testSet.Register(2, "HandlerError", universe.UniverseWrapper(cfg, HandlerErrorTests)) testSet.Register(3, "WorkerErrorTests", universe.UniverseWrapper(cfg, WorkerErrorTests), "name=worker-error") + testSet.Register(4, "ReplayTests", universe.UniverseWrapper(cfg, ReplayTests)) return testSet.Run(ctx, tags) } diff --git a/internal/test/004_replay.go b/internal/test/004_replay.go new file mode 100644 index 0000000..e3c5d1a --- /dev/null +++ b/internal/test/004_replay.go @@ -0,0 +1,48 @@ +package test + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/pentops/flowtest" + "github.com/pentops/o5-test-app/internal/genclient/test/v1/test" + "github.com/pentops/o5-test-app/internal/test/universe" +) + +func ReplayTests(flow flowtest.StepSetter, uu *universe.Universe) { + greetingID := uuid.NewString() + + flow.Step("Hello", func(ctx context.Context, t flowtest.Asserter) { + greetingClient := test.NewGreetingCommandService(uu.Client) + + reply, err := greetingClient.Hello(ctx, &test.HelloRequest{ + GreetingId: greetingID, + Name: "World", + }) + t.NoError(err) + + t.NotNil(reply, reply.Greeting, reply.Greeting.Data) + t.Equal(reply.Greeting.Data.Name, "World") + }) + + flow.Step("Wait Loop", func(ctx context.Context, t flowtest.Asserter) { + queryClient := test.NewGreetingQueryService(uu.Client) + + for { + reply, err := queryClient.GreetingGet(ctx, &test.GreetingGetRequest{ + GreetingId: greetingID, + }) + t.NoError(err) + + t.NotNil(reply, reply.Greeting, reply.Greeting.Data) + t.Equal(reply.Greeting.Data.Name, "World") + + if reply.Greeting.Data.ReplyMessage != nil { + t.Equal(*reply.Greeting.Data.ReplyMessage, "Hello World") + return + } + time.Sleep(1 * time.Second) + } + }) +} diff --git a/j5.yaml b/j5.yaml index 1a7a19e..0490b68 100644 --- a/j5.yaml +++ b/j5.yaml @@ -9,7 +9,7 @@ generate: - local: test output: . mods: - - goPackageNames: + - goPackageNames: prefix: github.com/pentops/o5-test-app/internal/gen opts: paths: import @@ -27,6 +27,9 @@ generate: - registry: owner: pentops name: dante + - registry: + owner: pentops + name: ges output: ./internal/genclient plugins: - base: go-client diff --git a/schema/test/v1/event_log.j5s b/schema/test/v1/event_log.j5s new file mode 100644 index 0000000..3611c73 --- /dev/null +++ b/schema/test/v1/event_log.j5s @@ -0,0 +1,24 @@ +package test.v1 + +service EventLog { + basePath = "/test/v1/eventlog" + method GetMessages { + httpMethod = "GET" + httpPath = "/messages" + + request { + field greetingId ? key:uuid + field eventId ? key:uuid + } + response { + field messages array:object:Message + } + } +} + +object Message { + field greetingId ! key:uuid + field eventId ! key:uuid + field messageId ! string + field timestamp ! timestamp +} \ No newline at end of file diff --git a/schema/test/v1/event_log.j5s.proto b/schema/test/v1/event_log.j5s.proto new file mode 100644 index 0000000..556a78f --- /dev/null +++ b/schema/test/v1/event_log.j5s.proto @@ -0,0 +1,43 @@ +// Generated by j5build v0.0.0-20250403212908-de7c3c2e6cce. DO NOT EDIT + +syntax = "proto3"; + +package test.v1; + +import "buf/validate/validate.proto"; +import "google/protobuf/timestamp.proto"; +import "j5/ext/v1/annotations.proto"; + +message Message { + option (j5.ext.v1.message).object = {}; + + string greeting_id = 1 [ + (buf.validate.field) = { + required: true + string: { + uuid: true + } + }, + (j5.ext.v1.field).key = {} + ]; + + string event_id = 2 [ + (buf.validate.field) = { + required: true + string: { + uuid: true + } + }, + (j5.ext.v1.field).key = {} + ]; + + string message_id = 3 [ + (buf.validate.field).required = true, + (j5.ext.v1.field).string = {} + ]; + + google.protobuf.Timestamp timestamp = 4 [ + (buf.validate.field).required = true, + (j5.ext.v1.field).timestamp = {} + ]; +} diff --git a/schema/test/v1/service/event_log.p.j5s.proto b/schema/test/v1/service/event_log.p.j5s.proto new file mode 100644 index 0000000..6ae2333 --- /dev/null +++ b/schema/test/v1/service/event_log.p.j5s.proto @@ -0,0 +1,36 @@ +// Generated by j5build v0.0.0-20250403212908-de7c3c2e6cce. DO NOT EDIT + +syntax = "proto3"; + +package test.v1.service; + +import "buf/validate/validate.proto"; +import "google/api/annotations.proto"; +import "j5/ext/v1/annotations.proto"; +import "test/v1/event_log.j5s.proto"; + +service EventLogService { + rpc GetMessages(GetMessagesRequest) returns (GetMessagesResponse) { + option (google.api.http) = {get: "/test/v1/eventlog/messages"}; + } +} + +message GetMessagesRequest { + option (j5.ext.v1.message).object = {}; + + optional string greeting_id = 1 [ + (buf.validate.field).string.uuid = true, + (j5.ext.v1.field).key = {} + ]; + + optional string event_id = 2 [ + (buf.validate.field).string.uuid = true, + (j5.ext.v1.field).key = {} + ]; +} + +message GetMessagesResponse { + option (j5.ext.v1.message).object = {}; + + repeated test.v1.Message messages = 1 [(j5.ext.v1.field).array = {}]; +}