diff --git a/blocking.go b/blocking.go index d54baff..b18eb26 100644 --- a/blocking.go +++ b/blocking.go @@ -2,18 +2,18 @@ package klevdb import "context" -// BlockingLog enhances log adding blocking consume +// BlockingLog enhances [Log] adding blocking consume type BlockingLog interface { Log - // ConsumeBlocking is similar to Consume, but if offset is equal to the next offset it will block until next message is produced + // ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is produced ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) - // ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key + // ConsumeByKeyBlocking is similar to [ConsumeBlocking], but only returns messages matching the key ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) } -// OpenBlocking opens log and wraps it with support for blocking consume +// OpenBlocking opens a [Log] and wraps it with support for blocking consume func OpenBlocking(dir string, opts Options) (BlockingLog, error) { l, err := Open(dir, opts) if err != nil { @@ -22,7 +22,7 @@ func OpenBlocking(dir string, opts Options) (BlockingLog, error) { return WrapBlocking(l) } -// WrapBlocking wraps log with support for blocking consume +// WrapBlocking wraps a [Log] with support for blocking consume func WrapBlocking(l Log) (BlockingLog, error) { next, err := l.NextOffset() if err != nil { diff --git a/compact/deletes_test.go b/compact/deletes_test.go index e48e012..0aad84c 100644 --- a/compact/deletes_test.go +++ b/compact/deletes_test.go @@ -2,13 +2,13 @@ package compact import ( "context" + "slices" "testing" "time" "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" ) func TestDeletes(t *testing.T) { diff --git a/compact/updates_test.go b/compact/updates_test.go index 8c01b2b..a2b23e5 100644 --- a/compact/updates_test.go +++ b/compact/updates_test.go @@ -2,13 +2,13 @@ package compact import ( "context" + "slices" "testing" "time" "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" ) func TestUpdates(t *testing.T) { diff --git a/delete.go b/delete.go index 1c1d506..6147a55 100644 --- a/delete.go +++ b/delete.go @@ -2,14 +2,12 @@ package klevdb import ( "context" + "maps" "time" - - "golang.org/x/exp/maps" ) -// DeleteMultiBackoff is call on each iteration of -// DeleteMulti to give applications opportunity to not overload -// the target log with deletes +// DeleteMultiBackoff is call on each iteration of [DeleteMulti] to give applications +// opportunity to not overload the target log with deletes type DeleteMultiBackoff func(context.Context) error // DeleteMultiWithWait returns a backoff func that sleeps/waits @@ -27,16 +25,13 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff { } // DeleteMulti tries to delete all messages with offsets -// -// from the log and returns the amount of storage deleted +// from the log and returns the amount of storage deleted // // If error is encountered, it will return the deleted offsets +// and size, together with the error // -// and size, together with the error -// -// DeleteMultiBackoff is called on each iteration to give -// -// others a chanse to work with the log, while being deleted +// [DeleteMultiBackoff] is called on each iteration to give +// others a chanse to work with the log, while being deleted func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { var deletedOffsets = map[int64]struct{}{} var deletedSize int64 diff --git a/log.go b/log.go index 5af27dc..b0c00c5 100644 --- a/log.go +++ b/log.go @@ -3,14 +3,14 @@ package klevdb import ( "errors" "fmt" + "maps" "os" "path/filepath" + "slices" "sync" "time" "github.com/gofrs/flock" - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" @@ -23,7 +23,7 @@ var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex) var errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound) var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset) -// Open create a log based on a dir and set of options +// Open opens or creates a [Log] based on a dir and set of options func Open(dir string, opts Options) (result Log, err error) { if opts.Rollover == 0 { opts.Rollover = 1024 * 1024 @@ -423,7 +423,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err } func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) { - orderedOffsets := maps.Keys(offsets) + orderedOffsets := slices.Collect(maps.Keys(offsets)) slices.Sort(orderedOffsets) lowestOffset := orderedOffsets[0] diff --git a/segment/segment.go b/segment/segment.go index 55fa17f..ce80d7b 100644 --- a/segment/segment.go +++ b/segment/segment.go @@ -4,11 +4,10 @@ import ( "errors" "fmt" "io" + "maps" "os" "path/filepath" - - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" + "slices" "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" @@ -328,7 +327,7 @@ type RewriteSegment struct { } func (r *RewriteSegment) GetNewSegment() Segment { - orderedOffsets := maps.Keys(r.SurviveOffsets) + orderedOffsets := slices.Collect(maps.Keys(r.SurviveOffsets)) slices.Sort(orderedOffsets) lowestOffset := orderedOffsets[0] return New(r.Segment.Dir, lowestOffset) diff --git a/typed.go b/typed.go index d63a2d3..26d248d 100644 --- a/typed.go +++ b/typed.go @@ -2,6 +2,7 @@ package klevdb import "time" +// TMessage represents a typed [Message] type TMessage[K any, V any] struct { Offset int64 Time time.Time @@ -11,7 +12,7 @@ type TMessage[K any, V any] struct { ValueEmpty bool } -// TLog is a typed log +// TLog is a typed [Log] which encodes/decodes keys and values to bytes. type TLog[K any, V any] interface { // Publish see [Log.Publish] Publish(messages []TMessage[K, V]) (nextOffset int64, err error) diff --git a/typed_blocking.go b/typed_blocking.go index 7df734e..8db0b08 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -2,7 +2,7 @@ package klevdb import "context" -// TBlockingLog enhances tlog adding blocking consume +// TBlockingLog enhances [TLog] adding blocking consume type TBlockingLog[K any, V any] interface { TLog[K, V] @@ -13,7 +13,7 @@ type TBlockingLog[K any, V any] interface { ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) } -// OpenBlocking opens tlog and wraps it with support for blocking consume +// OpenTBlocking opens tlog and wraps it with support for blocking consume func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) { l, err := OpenT(dir, opts, keyCodec, valueCodec) if err != nil { @@ -22,7 +22,7 @@ func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], va return WrapTBlocking(l) } -// WrapBlocking wraps tlog with support for blocking consume +// WrapTBlocking wraps tlog with support for blocking consume func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) { next, err := l.NextOffset() if err != nil {