Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/lib/pq v1.10.9
github.com/pentops/flowtest v0.0.0-20250521181823-71b0be743b08
github.com/pentops/golib v0.0.0-20250326060930-8c83d58ddb63
github.com/pentops/j5 v0.0.0-20250605002250-2add77d73a52
github.com/pentops/j5 v0.0.0-20250610001046-f1c2162a4508
github.com/pentops/log.go v0.0.16
github.com/pentops/o5-messaging v0.0.0-20250520213617-fba07334e9aa
github.com/pentops/pgtest.go v0.0.0-20241223222214-7638cc50e15b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/pentops/flowtest v0.0.0-20250521181823-71b0be743b08 h1:Xeip/GxtvcAGFF
github.com/pentops/flowtest v0.0.0-20250521181823-71b0be743b08/go.mod h1:vNp8crAKcH0f/sZU9frkmQLUeDsTIgMqV14kQtkAqC0=
github.com/pentops/golib v0.0.0-20250326060930-8c83d58ddb63 h1:s5qtWT2/s79gy/wm3/bwvKYLK6u2AkW05JiLPqxraP0=
github.com/pentops/golib v0.0.0-20250326060930-8c83d58ddb63/go.mod h1:I58JIVvL1/nP4CEHGKGbBhvWIEA9mVkGeoviemaqanU=
github.com/pentops/j5 v0.0.0-20250605002250-2add77d73a52 h1:natrugFwp8KWyN1W1SHphF+IokRkU+Yf+k/akHk2V1A=
github.com/pentops/j5 v0.0.0-20250605002250-2add77d73a52/go.mod h1:DZbBKepsGataOEtfB8AjkRiejRtLGQcBejTUYJK5wlY=
github.com/pentops/j5 v0.0.0-20250610001046-f1c2162a4508 h1:S80wU/ls85v5UpdFOsUbYwMISyN0DmFvowIpDWUNF7s=
github.com/pentops/j5 v0.0.0-20250610001046-f1c2162a4508/go.mod h1:DZbBKepsGataOEtfB8AjkRiejRtLGQcBejTUYJK5wlY=
github.com/pentops/log.go v0.0.16 h1:oxCuHSBOBPjfUVSXyOSEEdYUwytysj4T29/7T2FBp9Q=
github.com/pentops/log.go v0.0.16/go.mod h1:yR34x8aMlvhdGvqgIU4+0MiLjJTKt0vpcgUnVN2nZV4=
github.com/pentops/o5-messaging v0.0.0-20250520213617-fba07334e9aa h1:Sdnc9mrRSefBbbrwmpq/31ABuXBwtch2KGd68ORJS44=
Expand Down
5 changes: 3 additions & 2 deletions internal/integration/shadow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/google/uuid"
"github.com/pentops/flowtest"
"github.com/pentops/j5/gen/j5/state/v1/psm_j5pb"
"github.com/pentops/j5/lib/id62"
"github.com/pentops/protostate/internal/testproto/gen/test/v1/test_pb"
"github.com/pentops/protostate/internal/testproto/gen/test/v1/test_spb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -22,7 +23,7 @@ func TestStateMachineShadow(t *testing.T) {

events := []*test_pb.FooEvent{{
Metadata: &psm_j5pb.EventMetadata{
EventId: uuid.NewString(),
EventId: id62.NewString(),
Sequence: 1,
Cause: &psm_j5pb.Cause{
Type: &psm_j5pb.Cause_ExternalEvent{
Expand All @@ -47,7 +48,7 @@ func TestStateMachineShadow(t *testing.T) {
},
}, {
Metadata: &psm_j5pb.EventMetadata{
EventId: uuid.NewString(),
EventId: id62.NewString(),
Sequence: 1,
Cause: &psm_j5pb.Cause{
Type: &psm_j5pb.Cause_ExternalEvent{
Expand Down
12 changes: 9 additions & 3 deletions internal/pgstore/pgmigrate/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func (t *CreateTableBuilder) Column(name string, typ ColumnType, options ...Colu
column := &column{
name: name,
typeName: typ,
flags: []string{},
}
for _, opt := range options {
opt(column)
Expand Down Expand Up @@ -59,9 +58,9 @@ type column struct {

primaryKey bool // Multi Primary Key is possible
notNull bool
unique bool

typeName ColumnType
flags []string
}

type ColumnOption func(*column)
Expand All @@ -76,6 +75,10 @@ func NotNull(c *column) {
c.notNull = true
}

func Unique(c *column) {
c.unique = true
}

type ColumnType string

const (
Expand All @@ -102,6 +105,9 @@ func (t *CreateTableBuilder) Build() (*Table, error) {
if col.notNull {
column.Flags = append(column.Flags, "NOT NULL")
}
if col.unique {
column.Flags = append(column.Flags, "UNIQUE")
}
table.Columns = append(table.Columns, column)
}

Expand Down Expand Up @@ -138,7 +144,7 @@ type ForeignKey struct {
}

func (tt *Table) DownSQL() (string, error) {
return fmt.Sprintf("DROP TABLE %s;", tt.Name), nil
return fmt.Sprintf("DROP TABLE IF EXISTS %s;", tt.Name), nil
}

func (table *Table) ToSQL() (string, error) {
Expand Down
6 changes: 4 additions & 2 deletions internal/pgstore/pgmigrate/psm.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func BuildPSMTables(spec psm.QueryTableSpec) (*Table, *Table, error) {
stateTable := CreateTable(spec.State.TableName)

eventTable := CreateTable(spec.Event.TableName).
Column(spec.Event.ID.ColumnName, uuidType, PrimaryKey)
Column(spec.Event.ID.ColumnName, id62Type, PrimaryKey)

eventForeignKey := eventTable.ForeignKey("state", spec.State.TableName)
for _, key := range spec.KeyColumns {
Expand Down Expand Up @@ -252,7 +252,9 @@ func BuildPSMTables(spec psm.QueryTableSpec) (*Table, *Table, error) {

stateTable.Column(spec.State.Root.ColumnName, jsonbType, NotNull)

eventTable.Column(spec.Event.Timestamp.ColumnName, timestamptzType, NotNull).
eventTable.
Column(spec.Event.Timestamp.ColumnName, timestamptzType, NotNull).
Column(spec.Event.IdempotencyHash.ColumnName, textType, NotNull, Unique).
Column(spec.Event.Sequence.ColumnName, intType, NotNull).
Column(spec.Event.Root.ColumnName, jsonbType, NotNull).
Column(spec.Event.StateSnapshot.ColumnName, jsonbType, NotNull)
Expand Down
9 changes: 0 additions & 9 deletions psm/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ type StateMachineConfig[
tableName *string
}

// DEPRECATED: This does nothing.
type SystemActor any

// DEPRECATED: This does nothing.
func (smc *StateMachineConfig[K, S, ST, SD, E, IE]) SystemActor(systemActor SystemActor) *StateMachineConfig[K, S, ST, SD, E, IE] {
//smc.systemActor = systemActor
return smc
}

func (smc *StateMachineConfig[K, S, ST, SD, E, IE]) TableMap(tableMap *TableMap) *StateMachineConfig[K, S, ST, SD, E, IE] {
smc.tableMap = tableMap
return smc
Expand Down
155 changes: 147 additions & 8 deletions psm/event.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package psm

import (
"crypto/sha1"
"fmt"
"math/big"
"strings"
"time"

"github.com/pentops/j5/gen/j5/auth/v1/auth_j5pb"
"github.com/pentops/j5/gen/j5/messaging/v1/messaging_j5pb"
"github.com/pentops/j5/gen/j5/state/v1/psm_j5pb"
"github.com/pentops/j5/lib/id62"
"google.golang.org/protobuf/types/known/timestamppb"
)

type EventSpec[
Expand All @@ -20,9 +25,6 @@ type EventSpec[
// Keys must be set, to identify the state machine.
Keys K

// EventID is optional and will be set by the state machine if empty
EventID string

// The inner PSM Event type. Must be set for incoming events.
Event IE

Expand Down Expand Up @@ -81,14 +83,151 @@ func (es *EventSpec[K, S, ST, SD, E, IE]) validateAndPrepare() error {

// check that the cause type is supported.
switch es.Cause.Type.(type) {
case *psm_j5pb.Cause_PsmEvent,
*psm_j5pb.Cause_Command,
*psm_j5pb.Cause_ExternalEvent,
*psm_j5pb.Cause_Message:
// All OK
case *psm_j5pb.Cause_PsmEvent:
case *psm_j5pb.Cause_Command:
case *psm_j5pb.Cause_ExternalEvent:
case *psm_j5pb.Cause_Message:

default:
return fmt.Errorf("EventSpec.Cause.Source must be set")
}

return nil
}

func base62String(id []byte) string {
var i big.Int
i.SetBytes(id)
str := i.Text(62)
return fmt.Sprintf("%022s", str)
}

// hashString generates a sha1 hash from the input strings.
func hashString(input ...string) (string, error) {
if len(input) == 0 {
return "", fmt.Errorf("hashString requires at least one input string")
}
fullInput := strings.Join(input, "")
sum := sha1.Sum([]byte(fullInput))
return base62String(sum[:]), nil
}

func eventIdempotencyKey(event looseEvent) (string, error) {
eventFullType := string(event.ProtoReflect().Descriptor().FullName())

switch cause := event.PSMMetadata().Cause.Type.(type) {
case *psm_j5pb.Cause_PsmEvent:
return hashString(
eventFullType,
cause.PsmEvent.EventId,
)

case *psm_j5pb.Cause_Command:
providedKey := cause.Command.IdempotencyKey
if providedKey == "" {
return event.PSMMetadata().GetEventId(), nil
}
return hashString(
eventFullType,
cause.Command.Actor.Claim.TenantId,
providedKey,
)

case *psm_j5pb.Cause_ExternalEvent:
if cause.ExternalEvent.ExternalId != nil {
return hashString(
eventFullType,
*cause.ExternalEvent.ExternalId,
)
} else {
return event.PSMMetadata().GetEventId(), nil
}

case *psm_j5pb.Cause_Message:
return hashString(
eventFullType,
cause.Message.MessageId,
)

case *psm_j5pb.Cause_Init:
return event.PSMMetadata().GetEventId(), nil

default:
return "", fmt.Errorf("EventSpec.Cause.Source must be set")
}
}

type preparedEvent[
K IKeyset,
S IState[K, ST, SD],
ST IStatusEnum,
SD IStateData,
E IEvent[K, S, ST, SD, IE],
IE IInnerEvent,
] struct {
event E
state S
idempotencyKey string
}

func prepareFollowEvent[
K IKeyset,
S IState[K, ST, SD],
ST IStatusEnum,
SD IStateData,
E IEvent[K, S, ST, SD, IE],
IE IInnerEvent,
](event E, state S) (built preparedEvent[K, S, ST, SD, E, IE], err error) {
idempotencyKey, err := eventIdempotencyKey(event)
if err != nil {
return built, err
}
return preparedEvent[K, S, ST, SD, E, IE]{
event: event,
state: state,
idempotencyKey: idempotencyKey,
}, nil
}

func (es *EventSpec[K, S, ST, SD, E, IE]) buildWrapper(state S) (built preparedEvent[K, S, ST, SD, E, IE], err error) {

evt := (*new(E)).ProtoReflect().New().Interface().(E)
if err := evt.SetPSMEvent(es.Event); err != nil {
return built, fmt.Errorf("set event: %w", err)
}
evt.SetPSMKeys(es.Keys)

eventMeta := evt.PSMMetadata()
eventMeta.EventId = id62.NewString()
eventMeta.Timestamp = timestamppb.Now()
eventMeta.Cause = es.Cause

incrementEventSequence(state, eventMeta)

built.event = evt
built.state = state

idempotencyKey, err := eventIdempotencyKey(evt)
if err != nil {
return built, err
}
built.idempotencyKey = idempotencyKey

return

}

func incrementEventSequence[K IKeyset, S IState[K, ST, SD], ST IStatusEnum, SD IStateData](state S, eventMeta *psm_j5pb.EventMetadata) {
stateMeta := state.PSMMetadata()

eventMeta.Sequence = 0
if state.GetStatus() == 0 {
eventMeta.Sequence = 0
stateMeta.CreatedAt = eventMeta.Timestamp
stateMeta.UpdatedAt = eventMeta.Timestamp
} else {
eventMeta.Sequence = stateMeta.LastSequence + 1
stateMeta.LastSequence = eventMeta.Sequence
stateMeta.UpdatedAt = eventMeta.Timestamp
}
}
6 changes: 5 additions & 1 deletion psm/gen_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,15 @@ type IEvent[
SD IStateData,
Inner any,
] interface {
proto.Message
looseEvent
UnwrapPSMEvent() Inner
SetPSMEvent(Inner) error
PSMKeys() K
SetPSMKeys(K)
}

type looseEvent interface {
proto.Message
PSMMetadata() *psm_j5pb.EventMetadata
PSMIsSet() bool
}
Expand Down
Loading