Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions blocking.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package klevdb

import "context"
import (
"context"

"github.com/klev-dev/klevdb/notify"
)

// BlockingLog enhances [Log] adding blocking consume
type BlockingLog interface {
Expand Down Expand Up @@ -28,12 +32,12 @@ func WrapBlocking(l Log) (BlockingLog, error) {
if err != nil {
return nil, err
}
return &blockingLog{l, NewOffsetNotify(next)}, nil
return &blockingLog{l, notify.NewOffset(next)}, nil
}

type blockingLog struct {
Log
notify *OffsetNotify
notify *notify.Offset
}

func (l *blockingLog) Publish(messages []Message) (int64, error) {
Expand Down
14 changes: 7 additions & 7 deletions notify.go → notify/notify.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package klevdb
package notify

import (
"context"
Expand All @@ -8,13 +8,13 @@ import (

var ErrOffsetNotifyClosed = errors.New("offset notify already closed")

type OffsetNotify struct {
type Offset struct {
nextOffset atomic.Int64
barrier chan chan struct{}
}

func NewOffsetNotify(nextOffset int64) *OffsetNotify {
w := &OffsetNotify{
func NewOffset(nextOffset int64) *Offset {
w := &Offset{
barrier: make(chan chan struct{}, 1),
}

Expand All @@ -24,7 +24,7 @@ func NewOffsetNotify(nextOffset int64) *OffsetNotify {
return w
}

func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error {
func (w *Offset) Wait(ctx context.Context, offset int64) error {
// quick path, just load and check
if w.nextOffset.Load() > offset {
return nil
Expand Down Expand Up @@ -57,7 +57,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error {
}
}

func (w *OffsetNotify) Set(nextOffset int64) {
func (w *Offset) Set(nextOffset int64) {
// acquire current barrier
b, ok := <-w.barrier
if !ok {
Expand All @@ -77,7 +77,7 @@ func (w *OffsetNotify) Set(nextOffset int64) {
w.barrier <- make(chan struct{})
}

func (w *OffsetNotify) Close() error {
func (w *Offset) Close() error {
// acquire current barrier
b, ok := <-w.barrier
if !ok {
Expand Down
12 changes: 6 additions & 6 deletions notify_test.go → notify/notify_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package klevdb
package notify

import (
"context"
Expand All @@ -11,14 +11,14 @@ import (

func TestNotify(t *testing.T) {
t.Run("unblock", func(t *testing.T) {
n := NewOffsetNotify(10)
n := NewOffset(10)

err := n.Wait(context.TODO(), 5)
require.NoError(t, err)
})

t.Run("blocked", func(t *testing.T) {
n := NewOffsetNotify(10)
n := NewOffset(10)
ch := make(chan struct{})
var wg sync.WaitGroup

Expand All @@ -38,7 +38,7 @@ func TestNotify(t *testing.T) {
})

t.Run("cancel", func(t *testing.T) {
n := NewOffsetNotify(10)
n := NewOffset(10)
ch := make(chan struct{})
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.TODO())
Expand All @@ -60,7 +60,7 @@ func TestNotify(t *testing.T) {
})

t.Run("close", func(t *testing.T) {
n := NewOffsetNotify(10)
n := NewOffset(10)
ch := make(chan struct{})
var wg sync.WaitGroup

Expand All @@ -80,7 +80,7 @@ func TestNotify(t *testing.T) {
})

t.Run("close_err", func(t *testing.T) {
n := NewOffsetNotify(10)
n := NewOffset(10)
err := n.Close()
require.NoError(t, err)

Expand Down
10 changes: 7 additions & 3 deletions typed_blocking.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package klevdb

import "context"
import (
"context"

"github.com/klev-dev/klevdb/notify"
)

// TBlockingLog enhances [TLog] adding blocking consume
type TBlockingLog[K any, V any] interface {
Expand Down Expand Up @@ -28,12 +32,12 @@ func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) {
if err != nil {
return nil, err
}
return &tlogBlocking[K, V]{l, NewOffsetNotify(next)}, nil
return &tlogBlocking[K, V]{l, notify.NewOffset(next)}, nil
}

type tlogBlocking[K any, V any] struct {
TLog[K, V]
notify *OffsetNotify
notify *notify.Offset
}

func (l *tlogBlocking[K, V]) Publish(tmessages []TMessage[K, V]) (int64, error) {
Expand Down