From 2dbc27602c6d03faccaea0dc7203eca25a966ed5 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sun, 9 Mar 2025 11:46:22 -0400 Subject: [PATCH 1/4] experiment using std errors --- compact/deletes.go | 17 +++++++++------ compact/updates.go | 17 +++++++++------ index/format.go | 46 ++++++++++++++++++++-------------------- log.go | 4 ++++ message/format.go | 52 +++++++++++++++++++++++----------------------- message/message.go | 9 -------- trim/age.go | 23 +++++++++++--------- trim/count.go | 19 ++++++++++------- trim/offset.go | 17 +++++++++------ trim/size.go | 19 ++++++++++------- 10 files changed, 123 insertions(+), 100 deletions(-) diff --git a/compact/deletes.go b/compact/deletes.go index ea0cbe4..40b71ff 100644 --- a/compact/deletes.go +++ b/compact/deletes.go @@ -2,6 +2,7 @@ package compact import ( "context" + "fmt" "time" art "github.com/plar/go-adaptive-radix-tree/v2" @@ -17,7 +18,7 @@ import ( func FindDeletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) { maxOffset, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindDeletes] %s next offset: %w", l, err) } var keyOffset = art.New() @@ -27,7 +28,7 @@ SEARCH: for offset := klevdb.OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindDeletes] %s consume %d: %w", l, offset, err) } offset = nextOffset @@ -51,12 +52,12 @@ SEARCH: } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindDeletes] %s canceled %d: %w", l, offset, err) } } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindDeletes] %s canceled: %w", l, err) } return offsets, nil @@ -72,7 +73,11 @@ SEARCH: func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("[compact.Deletes] %s find: %w", l, err) } - return l.Delete(offsets) + m, sz, err := l.Delete(offsets) + if err != nil { + return nil, 0, fmt.Errorf("[compact.Deletes] %s delete: %w", l, err) + } + return m, sz, nil } diff --git a/compact/updates.go b/compact/updates.go index dd264d0..dd6a693 100644 --- a/compact/updates.go +++ b/compact/updates.go @@ -2,6 +2,7 @@ package compact import ( "context" + "fmt" "time" art "github.com/plar/go-adaptive-radix-tree/v2" @@ -17,7 +18,7 @@ import ( func FindUpdates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) { maxOffset, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindUpdates] %s next offset: %w", l, err) } var keyOffset = art.New() @@ -27,7 +28,7 @@ SEARCH: for offset := klevdb.OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindUpdates] %s consume %d: %w", l, offset, err) } offset = nextOffset @@ -42,12 +43,12 @@ SEARCH: } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindUpdates] %s canceled %d: %w", l, offset, err) } } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[compact.FindUpdates] %s canceled: %w", l, err) } return offsets, nil @@ -63,7 +64,11 @@ SEARCH: func Updates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("[compact.Updates] %s find: %w", l, err) } - return l.Delete(offsets) + m, sz, err := l.Delete(offsets) + if err != nil { + return nil, 0, fmt.Errorf("[compact.Updates] %s delete: %w", l, err) + } + return m, sz, nil } diff --git a/index/format.go b/index/format.go index eff23c4..ab3bb4c 100644 --- a/index/format.go +++ b/index/format.go @@ -3,10 +3,9 @@ package index import ( "encoding/binary" "errors" + "fmt" "io" "os" - - "github.com/klev-dev/kleverr" ) var ErrCorrupted = errors.New("index corrupted") @@ -22,20 +21,18 @@ type Writer struct { func OpenWriter(path string, opts Params) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("could not open index: %w", err) + return nil, fmt.Errorf("[index.OpenWriter] %s open: %w", path, err) } - pos := int64(0) - if stat, err := f.Stat(); err != nil { - return nil, kleverr.Newf("could not stat index: %w", err) - } else { - pos = stat.Size() + stat, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("[index.OpenWriter] %s stat: %w", path, err) } return &Writer{ opts: opts, f: f, - pos: pos, + pos: stat.Size(), keyOffset: opts.keyOffset(), }, nil } @@ -57,7 +54,7 @@ func (w *Writer) Write(it Item) error { } if n, err := w.f.Write(w.buff); err != nil { - return kleverr.Newf("failed to write index: %w", err) + return fmt.Errorf("[index.Writer.Write] %s offset %d write: %w", w.f.Name(), it.Offset, err) } else { w.pos += int64(n) } @@ -71,24 +68,24 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync index: %w", err) + return fmt.Errorf("[index.Writer.Sync] %s sync: %w", w.f.Name(), err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close index: %w", err) + return fmt.Errorf("[index.Writer.Close] %s close: %w", w.f.Name(), err) } return nil } func (w *Writer) SyncAndClose() error { - if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync index: %w", err) + if err := w.Sync(); err != nil { + return fmt.Errorf("[index.Writer.SyncAndClose] sync: %w", err) } - if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close index: %w", err) + if err := w.Close(); err != nil { + return fmt.Errorf("[index.Writer.SyncAndClose] close: %w", err) } return nil } @@ -96,40 +93,43 @@ func (w *Writer) SyncAndClose() error { func Write(path string, opts Params, index []Item) error { w, err := OpenWriter(path, opts) if err != nil { - return err + return fmt.Errorf("[index.Write] open writer: %w", err) } defer w.Close() for _, item := range index { if err := w.Write(item); err != nil { - return err + return fmt.Errorf("[index.Write] write item: %w", err) } } - return w.SyncAndClose() + if err := w.SyncAndClose(); err != nil { + return fmt.Errorf("[index.Write] close: %w", err) + } + return nil } func Read(path string, opts Params) ([]Item, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("could not open index: %w", err) + return nil, fmt.Errorf("[index.Read] %s open: %w", path, err) } defer f.Close() stat, err := os.Stat(path) if err != nil { - return nil, kleverr.Newf("could not stat index: %w", err) + return nil, fmt.Errorf("[index.Read] %s stat: %w", path, err) } dataSize := stat.Size() itemSize := opts.Size() if dataSize%itemSize > 0 { - return nil, kleverr.Newf("%w: unexpected data len: %d", ErrCorrupted, dataSize) + return nil, fmt.Errorf("[index.Read] %s unexpected data length %d: %w", path, dataSize, ErrCorrupted) } data := make([]byte, dataSize) if _, err = io.ReadFull(f, data); err != nil { - return nil, kleverr.Newf("could not read index: %w", err) + return nil, fmt.Errorf("[index.Read] %s read: %w", path, err) } var keyOffset = opts.keyOffset() diff --git a/log.go b/log.go index a4afdce..58bd042 100644 --- a/log.go +++ b/log.go @@ -549,3 +549,7 @@ func (l *log) Close() error { return nil } + +func (l *log) String() string { + return l.dir +} diff --git a/message/format.go b/message/format.go index da853bd..e188279 100644 --- a/message/format.go +++ b/message/format.go @@ -3,17 +3,19 @@ package message import ( "encoding/binary" "errors" + "fmt" "hash/crc32" "io" "os" "time" "golang.org/x/exp/mmap" - - "github.com/klev-dev/kleverr" ) -var ErrCorrupted = errors.New("message corrupted") +var ErrCorrupted = errors.New("log corrupted") +var errShortHeader = fmt.Errorf("%w: short header", ErrCorrupted) +var errShortMessage = fmt.Errorf("%w: short message", ErrCorrupted) +var errNoMessage = fmt.Errorf("%w: no message", ErrCorrupted) var crc32cTable = crc32.MakeTable(crc32.Castagnoli) @@ -31,17 +33,15 @@ type Writer struct { func OpenWriter(path string) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, fmt.Errorf("[message.OpenWriter] %s open: %w", path, err) } - pos := int64(0) - if stat, err := f.Stat(); err != nil { - return nil, kleverr.Newf("could not stat log: %w", err) - } else { - pos = stat.Size() + stat, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("[message.OpenWriter] %s stat: %w", path, err) } - return &Writer{Path: path, f: f, pos: pos}, nil + return &Writer{Path: path, f: f, pos: stat.Size()}, nil } func (w *Writer) Write(m Message) (int64, error) { @@ -71,7 +71,7 @@ func (w *Writer) Write(m Message) (int64, error) { pos := w.pos if n, err := w.f.Write(w.buff); err != nil { - return 0, kleverr.Newf("log failed write: %w", err) + return 0, fmt.Errorf("[message.Writer.Write] %s [offset=%d] write: %w", w.f.Name(), m.Offset, err) } else { w.pos += int64(n) } @@ -84,24 +84,24 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync log: %w", err) + return fmt.Errorf("[message.Writer.Sync] %s sync: %w", w.f.Name(), err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("[message.Writer.Close] %s close: %w", w.f.Name(), err) } return nil } func (w *Writer) SyncAndClose() error { - if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync log: %w", err) + if err := w.Sync(); err != nil { + return fmt.Errorf("[message.Writer.SyncAndClose] sync: %w", err) } if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("[message.Writer.SyncAndClose] close: %w", err) } return nil } @@ -115,7 +115,7 @@ type Reader struct { func OpenReader(path string) (*Reader, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, fmt.Errorf("[message.OpenReader] %s open: %w", path, err) } return &Reader{ @@ -127,7 +127,7 @@ func OpenReader(path string) (*Reader, error) { func OpenReaderMem(path string) (*Reader, error) { f, err := mmap.Open(path) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, fmt.Errorf("[message.OpenReaderMem] %s open: %w", path, err) } return &Reader{ @@ -174,9 +174,9 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case err == nil: // all good, continue case errors.Is(err, io.ErrUnexpectedEOF): - return -1, kleverr.Newf("%w: short header", ErrCorrupted) + return -1, fmt.Errorf("[index.Reader.Read] %s [position=%d] header read: %w", r.Path, position, errShortHeader) default: - return -1, kleverr.Newf("could not read header: %w", err) + return -1, fmt.Errorf("[index.Reader.Read] %s [position=%d] header read: %w", r.Path, position, err) } msg.Offset = int64(binary.BigEndian.Uint64(headerBytes[0:])) @@ -196,16 +196,16 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case err == nil: // all good, continue case errors.Is(err, io.ErrUnexpectedEOF): - return -1, kleverr.Newf("%w: short message", ErrCorrupted) + return -1, fmt.Errorf("[index.Reader.Read] %s [position=%d] message read: %w", r.Path, position, errShortMessage) case errors.Is(err, io.EOF): - return -1, kleverr.Newf("%w: no message", ErrCorrupted) + return -1, fmt.Errorf("[index.Reader.Read] %s [position=%d] message read: %w", r.Path, position, errNoMessage) default: - return -1, kleverr.Newf("could not read message: %w", err) + return -1, fmt.Errorf("[index.Reader.Read] %s [position=%d] message read: %w", r.Path, position, err) } actualCRC := crc32.Checksum(messageBytes, crc32cTable) if expectedCRC != actualCRC { - return -1, kleverr.Newf("%w: invalid crc: expected=%d; actual=%d", ErrCorrupted, expectedCRC, actualCRC) + return -1, fmt.Errorf("[index.Reader.Read] %s [position=%d] message failed checksum: %w", r.Path, position, ErrCorrupted) } if keySize > 0 { @@ -222,11 +222,11 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err func (r *Reader) Close() error { if r.ra != nil { if err := r.ra.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("[message.Reader.Close] %s - mem close: %w", r.Path, err) } } else { if err := r.r.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("[message.Reader.Close] %s - close: %w", r.Path, err) } } return nil diff --git a/message/message.go b/message/message.go index 3af2c55..2d7c3c2 100644 --- a/message/message.go +++ b/message/message.go @@ -5,8 +5,6 @@ import ( "fmt" "strings" "time" - - "github.com/klev-dev/kleverr" ) const ( @@ -23,13 +21,6 @@ const ( var ErrInvalidOffset = errors.New("invalid offset") var ErrNotFound = errors.New("not found") -func ValidateOffset(offset int64) error { - if offset < OffsetOldest { - return kleverr.Newf("%w: %d is not a valid offset", ErrInvalidOffset, offset) - } - return nil -} - type Message struct { Offset int64 Time time.Time diff --git a/trim/age.go b/trim/age.go index de11bb2..b5e9b70 100644 --- a/trim/age.go +++ b/trim/age.go @@ -3,6 +3,7 @@ package trim import ( "context" "errors" + "fmt" "time" "github.com/klev-dev/klevdb" @@ -20,19 +21,17 @@ func FindByAge(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]s // this log is not indexed by time, use the max as a bound maxOffset, err = l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByAge] %s next offset, no index: %w", l, err) } case errors.Is(err, klevdb.ErrNotFound): // all messages are before, again use the max as a bound maxOffset, err = l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByAge] %s next offset, not found: %w", l, err) } default: // something else went wrong - if err != nil { - return nil, err - } + return nil, fmt.Errorf("[trim.FindByAge] %s offset by time: %w", l, err) } var offsets = map[int64]struct{}{} @@ -41,7 +40,7 @@ SEARCH: for offset := klevdb.OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByAge] %s consume %d: %w", l, offset, err) } offset = nextOffset @@ -54,12 +53,12 @@ SEARCH: } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByAge] %s canceled %d: %w", l, offset, err) } } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByAge] %s canceled: %w", l, err) } return offsets, nil @@ -71,7 +70,11 @@ SEARCH: func ByAge(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("[trim.ByAge] %s find: %w", l, err) + } + m, sz, err := l.Delete(offsets) + if err != nil { + return nil, 0, fmt.Errorf("[trim.ByAge] %s delete: %w", l, err) } - return l.Delete(offsets) + return m, sz, nil } diff --git a/trim/count.go b/trim/count.go index a3a88c3..c1192ca 100644 --- a/trim/count.go +++ b/trim/count.go @@ -2,6 +2,7 @@ package trim import ( "context" + "fmt" "github.com/klev-dev/klevdb" ) @@ -12,14 +13,14 @@ func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{} stats, err := l.Stat() switch { case err != nil: - return nil, err + return nil, fmt.Errorf("[trim.FindByCount] %s stat: %w", l, err) case stats.Messages <= max: return nil, nil } maxOffset, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByCount] %s next offset: %w", l, err) } var offsets = map[int64]struct{}{} @@ -28,7 +29,7 @@ func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{} for offset := klevdb.OffsetOldest; offset < maxOffset && toRemove > 0; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByCount] %s consume %d: %w", l, offset, err) } offset = nextOffset @@ -42,12 +43,12 @@ func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{} } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByCount] %s canceled %d: %w", l, offset, err) } } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByCount] %s canceled: %w", l, err) } return offsets, nil @@ -60,7 +61,11 @@ func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{} func ByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{}, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("[trim.ByCount] %s find: %w", l, err) } - return l.Delete(offsets) + m, sz, err := l.Delete(offsets) + if err != nil { + return nil, 0, fmt.Errorf("[trim.ByCount] %s delete: %w", l, err) + } + return m, sz, nil } diff --git a/trim/offset.go b/trim/offset.go index 4412a30..e6cfd4d 100644 --- a/trim/offset.go +++ b/trim/offset.go @@ -2,6 +2,7 @@ package trim import ( "context" + "fmt" "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/message" @@ -16,7 +17,7 @@ func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]st maxOffset, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByOffset] %s next offset: %w", l, err) } if before == message.OffsetNewest { before = maxOffset @@ -28,7 +29,7 @@ func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]st for offset := klevdb.OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByOffset] %s consume %d: %w", l, offset, err) } offset = nextOffset @@ -40,12 +41,12 @@ func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]st } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByOffset] %s canceled %d: %w", l, offset, err) } } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindByOffset] %s canceled: %w", l, err) } return offsets, nil @@ -57,7 +58,11 @@ func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]st func ByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]struct{}, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("[trim.ByOffset] %s find: %w", l, err) } - return l.Delete(offsets) + m, sz, err := l.Delete(offsets) + if err != nil { + return nil, 0, fmt.Errorf("[trim.ByOffset] %s delete: %w", l, err) + } + return m, sz, nil } diff --git a/trim/size.go b/trim/size.go index 893807a..41a5bbb 100644 --- a/trim/size.go +++ b/trim/size.go @@ -2,6 +2,7 @@ package trim import ( "context" + "fmt" "github.com/klev-dev/klevdb" ) @@ -12,14 +13,14 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} stats, err := l.Stat() switch { case err != nil: - return nil, err + return nil, fmt.Errorf("[trim.FindBySize] %s stat: %w", l, err) case stats.Size < sz: return nil, nil } maxOffset, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindBySize] %s next offset: %w", l, err) } var offsets = map[int64]struct{}{} @@ -28,7 +29,7 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} for offset := klevdb.OffsetOldest; offset < maxOffset && total >= sz; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindBySize] %s consume %d: %w", l, offset, err) } offset = nextOffset @@ -42,12 +43,12 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindBySize] %s canceled %d: %w", l, offset, err) } } if err := ctx.Err(); err != nil { - return nil, err + return nil, fmt.Errorf("[trim.FindBySize] %s canceled: %w", l, err) } return offsets, nil @@ -59,7 +60,11 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} func BySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("[trim.BySize] %s find: %w", l, err) } - return l.Delete(offsets) + m, sz, err := l.Delete(offsets) + if err != nil { + return nil, 0, fmt.Errorf("[trim.BySize] %s delete: %w", l, err) + } + return m, sz, nil } From ad2aa7feab5f8be5af0024f561b317939570c381 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sun, 9 Mar 2025 13:37:51 -0400 Subject: [PATCH 2/4] migrate segment --- segment/index.go | 8 +-- segment/segment.go | 120 ++++++++++++++++++++++++-------------------- segment/segments.go | 39 ++++++++------ segment/utils.go | 22 ++++---- 4 files changed, 104 insertions(+), 85 deletions(-) diff --git a/segment/index.go b/segment/index.go index dadcc3d..8b5fc94 100644 --- a/segment/index.go +++ b/segment/index.go @@ -1,8 +1,9 @@ package segment import ( + "fmt" + "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/kleverr" ) type Offsetter interface { @@ -61,11 +62,10 @@ func Get[S ~[]O, O Offsetter](segments S, offset int64) (O, int, error) { switch { case offset < beginSegment.GetOffset(): var v O - err := message.ErrNotFound if beginSegment.GetOffset() == 0 { - err = message.ErrInvalidOffset + return v, -1, fmt.Errorf("[segment.Get] invalid offset %d: %w", offset, message.ErrInvalidOffset) } - return v, -1, kleverr.Newf("%w: %d is before beginning", err, offset) + return v, -1, fmt.Errorf("[segment.Get] offset %d is before beginning: %w", offset, message.ErrNotFound) case offset == beginSegment.GetOffset(): return beginSegment, 0, nil } diff --git a/segment/segment.go b/segment/segment.go index 6bdbf33..06d7660 100644 --- a/segment/segment.go +++ b/segment/segment.go @@ -12,7 +12,6 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/kleverr" ) type Segment struct { @@ -46,12 +45,12 @@ type Stats struct { func (s Segment) Stat(params index.Params) (Stats, error) { logStat, err := os.Stat(s.Log) if err != nil { - return Stats{}, kleverr.Newf("could not stat log: %w", err) + return Stats{}, fmt.Errorf("[segment.Segment.Stat] %s stat: %w", s.Log, err) } indexStat, err := os.Stat(s.Index) if err != nil { - return Stats{}, kleverr.Newf("could not stat index: %w", err) + return Stats{}, fmt.Errorf("[segment.Segment.Stat] %s stat: %w", s.Index, err) } return Stats{ @@ -64,7 +63,7 @@ func (s Segment) Stat(params index.Params) (Stats, error) { func (s Segment) Check(params index.Params) error { log, err := message.OpenReader(s.Log) if err != nil { - return err + return fmt.Errorf("[segment.Segment.Check] %s open reader: %w", s.Log, err) } defer log.Close() @@ -75,7 +74,7 @@ func (s Segment) Check(params index.Params) error { if errors.Is(err, io.EOF) { break } else if err != nil { - return kleverr.Newf("%s: %w", s.Log, err) + return fmt.Errorf("[segment.Segment.Check] %s read: %w", s.Log, err) } item := params.NewItem(msg, position, indexTime) @@ -89,13 +88,13 @@ func (s Segment) Check(params index.Params) error { case errors.Is(err, os.ErrNotExist): return nil case err != nil: - return err + return fmt.Errorf("[segment.Segment.Check] %s read: %w", s.Index, err) case len(logIndex) != len(items): - return kleverr.Newf("%s: incorrect index size: %w", s.Index, index.ErrCorrupted) + return fmt.Errorf("[segment.Segment.Check] %s incorrect index size: %w", s.Index, index.ErrCorrupted) default: for i, item := range logIndex { if item != items[i] { - return kleverr.Newf("%s: incorrect index item: %w", s.Index, index.ErrCorrupted) + return fmt.Errorf("[segment.Segment.Check] %s incorrect index item %d: %w", s.Index, i, index.ErrCorrupted) } } } @@ -106,13 +105,13 @@ func (s Segment) Check(params index.Params) error { func (s Segment) Recover(params index.Params) error { log, err := message.OpenReader(s.Log) if err != nil { - return err + return fmt.Errorf("[segment.Segment.Recover] %s open reader: %w", s.Log, err) } defer log.Close() restore, err := message.OpenWriter(s.Log + ".recover") if err != nil { - return err + return fmt.Errorf("[segment.Segment.Recover] %s.recover open writer: %w", s.Log, err) } defer restore.Close() @@ -127,11 +126,11 @@ func (s Segment) Recover(params index.Params) error { corrupted = true break } else if err != nil { - return err + return fmt.Errorf("[segment.Segment.Recover] %s read %d: %w", s.Log, position, err) } if _, err := restore.Write(msg); err != nil { - return err + return fmt.Errorf("[segment.Segment.Recover] %s.recover write %d: %w", s.Log, position, err) } item := params.NewItem(msg, position, indexTime) @@ -142,22 +141,19 @@ func (s Segment) Recover(params index.Params) error { } if err := log.Close(); err != nil { - return err + return fmt.Errorf("[segment.Segment.Recover] %s close: %w", s.Log, err) } - if err := restore.Sync(); err != nil { - return err - } - if err := restore.Close(); err != nil { - return err + if err := restore.SyncAndClose(); err != nil { + return fmt.Errorf("[segment.Segment.Recover] %s.recover sync close: %w", s.Log, err) } if corrupted { if err := os.Rename(restore.Path, log.Path); err != nil { - return kleverr.Newf("could not rename restore: %w", err) + return fmt.Errorf("[segment.Segment.Recover] %s.recover rename to %s: %w", s.Log, s.Log, err) } } else { if err := os.Remove(restore.Path); err != nil { - return kleverr.Newf("could not delete restore: %w", err) + return fmt.Errorf("[segment.Segment.Recover] %s.recover remove: %w", s.Log, err) } } @@ -168,7 +164,7 @@ func (s Segment) Recover(params index.Params) error { case errors.Is(err, index.ErrCorrupted): corruptedIndex = true case err != nil: - return err + return fmt.Errorf("[segment.Segment.Recover] %s read: %w", s.Index, err) case len(logIndex) != len(items): corruptedIndex = true default: @@ -182,7 +178,7 @@ func (s Segment) Recover(params index.Params) error { if corruptedIndex { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("could not remove corrupted index: %w", err) + return fmt.Errorf("[segment.Segment.Recover] %s remove: %w", s.Index, err) } } @@ -194,7 +190,7 @@ func (s Segment) NeedsReindex() (bool, error) { case os.IsNotExist(err): return true, nil case err != nil: - return false, kleverr.Newf("could not stat index: %w", err) + return false, fmt.Errorf("[segment.Segment.NeedsReindex] %s stat: %w", s.Index, err) case info.Size() == 0: return true, nil default: @@ -205,22 +201,34 @@ func (s Segment) NeedsReindex() (bool, error) { func (s Segment) ReindexAndReadIndex(params index.Params) ([]index.Item, error) { switch reindex, err := s.NeedsReindex(); { case err != nil: - return nil, err + return nil, fmt.Errorf("[segment.Segment.ReindexAndReadIndex] %s needs reindex: %w", s.Index, err) case reindex: - return s.Reindex(params) + items, err := s.Reindex(params) + if err != nil { + return nil, fmt.Errorf("[segment.Segment.ReindexAndReadIndex] %s reindex: %w", s.Index, err) + } + return items, nil default: - return index.Read(s.Index, params) + items, err := index.Read(s.Index, params) + if err != nil { + return nil, fmt.Errorf("[segment.Segment.ReindexAndReadIndex] %s read: %w", s.Index, err) + } + return items, nil } } func (s Segment) Reindex(params index.Params) ([]index.Item, error) { log, err := message.OpenReader(s.Log) if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Reindex] %s open reader: %w", s.Log, err) } defer log.Close() - return s.ReindexReader(params, log) + items, err := s.ReindexReader(params, log) + if err != nil { + return nil, fmt.Errorf("[segment.Segment.Reindex] %s reindex reader: %w", s.Index, err) + } + return items, nil } func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]index.Item, error) { @@ -231,7 +239,7 @@ func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]inde if errors.Is(err, io.EOF) { break } else if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.ReindexReader] %s read: %w", s.Log, err) } item := params.NewItem(msg, position, indexTime) @@ -242,7 +250,7 @@ func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]inde } if err := index.Write(s.Index, params, items); err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.ReindexReader] %s write: %w", s.Index, err) } return items, nil } @@ -250,20 +258,20 @@ func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]inde func (s Segment) Backup(targetDir string) error { logName, err := filepath.Rel(s.Dir, s.Log) if err != nil { - return kleverr.Newf("could not rel log: %w", err) + return fmt.Errorf("[segment.Segment.Backup] %s log rel: %w", s.Log, err) } targetLog := filepath.Join(targetDir, logName) if err := copyFile(s.Log, targetLog); err != nil { - return kleverr.Newf("could not copy log: %w", err) + return fmt.Errorf("[segment.Segment.Backup] %s copy log to %s: %w", s.Log, targetLog, err) } indexName, err := filepath.Rel(s.Dir, s.Index) if err != nil { - return kleverr.Newf("could not rel index: %w", err) + return fmt.Errorf("[segment.Segment.Backup] %s index rel: %w", s.Index, err) } targetIndex := filepath.Join(targetDir, indexName) if err := copyFile(s.Index, targetIndex); err != nil { - return kleverr.Newf("could not copy index: %w", err) + return fmt.Errorf("[segment.Segment.Backup] %s copy index to %s: %w", s.Index, targetIndex, err) } return nil @@ -272,7 +280,7 @@ func (s Segment) Backup(targetDir string) error { func (s Segment) ForRewrite() (Segment, error) { randStr, err := randStr(8) if err != nil { - return Segment{}, nil + return Segment{}, fmt.Errorf("[segment.Segment.ForRewrite] %d rand: %w", s.Offset, err) } s.Log = fmt.Sprintf("%s.rewrite.%s", s.Log, randStr) @@ -282,11 +290,11 @@ func (s Segment) ForRewrite() (Segment, error) { func (olds Segment) Rename(news Segment) error { if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("could not rename log: %w", err) + return fmt.Errorf("[segment.Segment.Rename] %s log rename to %s: %w", olds.Log, news.Log, err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("could not rename index: %w", err) + return fmt.Errorf("[segment.Segment.Rename] %s index rename to %s: %w", olds.Index, news.Index, err) } return nil @@ -295,15 +303,15 @@ func (olds Segment) Rename(news Segment) error { func (olds Segment) Override(news Segment) error { // remove index segment so we don't have invalid index if err := os.Remove(news.Index); err != nil { - return kleverr.Newf("could not delete index: %w", err) + return fmt.Errorf("[segment.Segment.Override] %s index remove: %w", news.Index, err) } if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("could not rename log: %w", err) + return fmt.Errorf("[segment.Segment.Override] %s log rename to %s: %w", olds.Log, news.Log, err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("could not rename index: %w", err) + return fmt.Errorf("[segment.Segment.Override] %s index rename to %s: %w", olds.Index, news.Index, err) } return nil @@ -311,10 +319,10 @@ func (olds Segment) Override(news Segment) error { func (s Segment) Remove() error { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("could not delete index: %w", err) + return fmt.Errorf("[segment.Segment.Remove] %s index remove: %w", s.Index, err) } if err := os.Remove(s.Log); err != nil { - return kleverr.Newf("could not delete log: %w", err) + return fmt.Errorf("[segment.Segment.Remove] %s log remove: %w", s.Log, err) } return nil } @@ -338,20 +346,20 @@ func (r *RewriteSegment) GetNewSegment() Segment { func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params) (*RewriteSegment, error) { dst, err := src.ForRewrite() if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %d for rewrite: %w", src.Offset, err) } result := &RewriteSegment{Segment: dst} srcLog, err := message.OpenReader(src.Log) if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s open reader: %w", src.Log, err) } defer srcLog.Close() dstLog, err := message.OpenWriter(dst.Log) if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s open writer: %w", dst.Log, err) } defer dstLog.Close() @@ -360,13 +368,15 @@ func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params) var srcPosition, indexTime int64 var dstItems []index.Item +LOOP: for { msg, nextSrcPosition, err := srcLog.Read(srcPosition) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, err + switch { + case err == nil: + case errors.Is(err, io.EOF): + break LOOP + default: + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s read %d: %w", src.Log, srcPosition, err) } if _, ok := dropOffsets[msg.Offset]; ok { @@ -375,7 +385,7 @@ func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params) } else { dstPosition, err := dstLog.Write(msg) if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s write: %w", dst.Log, err) } result.SurviveOffsets[msg.Offset] = struct{}{} @@ -388,18 +398,18 @@ func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params) } if err := srcLog.Close(); err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s close: %w", src.Log, err) } if err := dstLog.SyncAndClose(); err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s sync close: %w", dst.Log, err) } if err := index.Write(dst.Index, params, dstItems); err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s write: %w", dst.Index, err) } result.Stats, err = dst.Stat(params) if err != nil { - return nil, err + return nil, fmt.Errorf("[segment.Segment.Rewrite] %s stat: %w", dst.Log, err) } return result, nil } diff --git a/segment/segments.go b/segment/segments.go index c16eceb..efef108 100644 --- a/segment/segments.go +++ b/segment/segments.go @@ -2,18 +2,18 @@ package segment import ( "errors" + "fmt" "os" "strconv" "strings" "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/kleverr" ) func Find(dir string) ([]Segment, error) { files, err := os.ReadDir(dir) if err != nil { - return nil, kleverr.Newf("could not list dir: %w", err) + return nil, fmt.Errorf("[segment.Find] %s read: %w", dir, err) } var segments []Segment @@ -23,7 +23,7 @@ func Find(dir string) ([]Segment, error) { offset, err := strconv.ParseInt(offsetStr, 10, 64) if err != nil { - return nil, kleverr.Newf("parse offset failed: %w", err) + return nil, fmt.Errorf("[segment.Find] %s parse offset: %w", f.Name(), err) } segments = append(segments, New(dir, offset)) @@ -39,7 +39,7 @@ func StatDir(dir string, params index.Params) (Stats, error) { case errors.Is(err, os.ErrNotExist): return Stats{}, nil case err != nil: - return Stats{}, err + return Stats{}, fmt.Errorf("[segment.StatDir] %s find: %w", dir, err) } return Stat(segments, params) } @@ -49,7 +49,7 @@ func Stat(segments []Segment, params index.Params) (Stats, error) { for _, seg := range segments { segStat, err := seg.Stat(params) if err != nil { - return Stats{}, err + return Stats{}, fmt.Errorf("[segment.Stat] %d stat: %w", seg.Offset, err) } total.Segments += segStat.Segments @@ -64,12 +64,15 @@ func CheckDir(dir string, params index.Params) error { case errors.Is(err, os.ErrNotExist): return nil case err != nil: - return err + return fmt.Errorf("[segment.CheckDir] %s find: %w", dir, err) case len(segments) == 0: return nil default: s := segments[len(segments)-1] - return s.Check(params) + if err := s.Check(params); err != nil { + return fmt.Errorf("[segment.CheckDir] %s check: %w", dir, err) + } + return nil } } @@ -78,12 +81,15 @@ func RecoverDir(dir string, params index.Params) error { case errors.Is(err, os.ErrNotExist): return nil case err != nil: - return err + return fmt.Errorf("[segment.RecoverDir] %s find: %w", dir, err) case len(segments) == 0: return nil default: s := segments[len(segments)-1] - return s.Recover(params) + if err := s.Recover(params); err != nil { + return fmt.Errorf("[segment.RecoverDir] %s recover: %w", dir, err) + } + return nil } } @@ -93,20 +99,23 @@ func BackupDir(dir, target string) error { case errors.Is(err, os.ErrNotExist): return nil case err != nil: - return err + return fmt.Errorf("[segment.BackupDir] %s find: %w", dir, err) } if err := os.MkdirAll(target, 0700); err != nil { - return kleverr.Newf("could not create backup dir: %w", err) + return fmt.Errorf("[segment.BackupDir] %s mkdir: %w", target, err) } - return Backup(segments, target) + if err := Backup(segments, target); err != nil { + return fmt.Errorf("[segment.BackupDir] %s backup to %s: %w", dir, target, err) + } + return nil } -func Backup(segments []Segment, dir string) error { +func Backup(segments []Segment, target string) error { for _, seg := range segments { - if err := seg.Backup(dir); err != nil { - return kleverr.Newf("could not backup segment %d: %w", seg.Offset, err) + if err := seg.Backup(target); err != nil { + return fmt.Errorf("[segment.Backup] %s backup of %d: %w", target, seg.Offset, err) } } diff --git a/segment/utils.go b/segment/utils.go index 654e24a..aab98e6 100644 --- a/segment/utils.go +++ b/segment/utils.go @@ -2,17 +2,17 @@ package segment import ( "crypto/rand" + "fmt" "io" "os" - "github.com/klev-dev/kleverr" "github.com/mr-tron/base58" ) func randStr(length int) (string, error) { k := make([]byte, length) if _, err := io.ReadFull(rand.Reader, k); err != nil { - return "", kleverr.Ret(err) + return "", fmt.Errorf("[segment.Rand] read full: %w", err) } return base58.Encode(k), nil } @@ -20,20 +20,20 @@ func randStr(length int) (string, error) { func copyFile(src, dst string) error { fsrc, err := os.Open(src) if err != nil { - return kleverr.Newf("could not open src: %w", err) + return fmt.Errorf("[segment.Copy] %s open src: %w", src, err) } defer fsrc.Close() stat, err := fsrc.Stat() if err != nil { - return kleverr.Newf("could not stat src: %w", err) + return fmt.Errorf("[segment.Copy] %s stat src: %w", src, err) } fdst, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if os.IsExist(err) { switch dstStat, err := os.Stat(dst); { case err != nil: - return kleverr.Newf("could not stat dst: %w", err) + return fmt.Errorf("[segment.Copy] %s stat dst: %w", dst, err) case stat.Size() == dstStat.Size() && stat.ModTime().Equal(dstStat.ModTime()): // TODO do we need a safer version of this? return nil @@ -41,25 +41,25 @@ func copyFile(src, dst string) error { fdst, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) } if err != nil { - return kleverr.Newf("could not open dst: %w", err) + return fmt.Errorf("[segment.Copy] %s open dst: %w", dst, err) } defer fdst.Close() switch n, err := io.Copy(fdst, fsrc); { case err != nil: - return kleverr.Newf("could not copy: %w", err) + return fmt.Errorf("[segment.Copy] %s copy to %s: %w", src, dst, err) case n < stat.Size(): - return kleverr.Newf("could not copy all data (%d/%d)", n, stat.Size()) + return fmt.Errorf("[segment.Copy] %s (%d) incomplete copy to %s (%d): %w", src, stat.Size(), dst, n, err) } if err := fdst.Sync(); err != nil { - return kleverr.Newf("could not sync dst: %w", err) + return fmt.Errorf("[segment.Copy] %s sync: %w", dst, err) } if err := fdst.Close(); err != nil { - return kleverr.Newf("could not close dst: %w", err) + return fmt.Errorf("[segment.Copy] %s close: %w", dst, err) } if err := os.Chtimes(dst, stat.ModTime(), stat.ModTime()); err != nil { - return kleverr.Newf("could not set dst time: %w", err) + return fmt.Errorf("[segment.Copy] %s chtimes: %w", dst, err) } return nil From 696fd1fe15986d93411b416efaf939be55bd2f21 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sun, 9 Mar 2025 16:02:58 -0400 Subject: [PATCH 3/4] more wrapping --- api.go | 25 ++++++++++++++++---- blocking.go | 46 +++++++++++++++++++++++++++--------- delete.go | 7 +++--- notify.go | 9 ++++---- segment/utils.go | 20 ++++++++-------- typed.go | 59 +++++++++++++++++++++++++++++++---------------- typed_blocking.go | 48 +++++++++++++++++++++++++++++--------- typed_codec.go | 5 ++-- 8 files changed, 152 insertions(+), 67 deletions(-) diff --git a/api.go b/api.go index bb7996e..3c11f1a 100644 --- a/api.go +++ b/api.go @@ -2,6 +2,7 @@ package klevdb import ( "errors" + "fmt" "time" "github.com/klev-dev/klevdb/index" @@ -139,29 +140,45 @@ type Log interface { // Stat stats a store directory, without opening the store func Stat(dir string, opts Options) (Stats, error) { - return segment.StatDir(dir, index.Params{ + stat, err := segment.StatDir(dir, index.Params{ Times: opts.TimeIndex, Keys: opts.KeyIndex, }) + if err != nil { + return Stats{}, fmt.Errorf("[klevdb.Stat] %s stat: %w", dir, err) + } + return stat, nil } // Backup backups a store directory to another location, without opening the store func Backup(src, dst string) error { - return segment.BackupDir(src, dst) + err := segment.BackupDir(src, dst) + if err != nil { + return fmt.Errorf("[klevdb.Backup] %s backup to %s: %w", src, dst, err) + } + return nil } // Check runs an integrity check, without opening the store func Check(dir string, opts Options) error { - return segment.CheckDir(dir, index.Params{ + err := segment.CheckDir(dir, index.Params{ Times: opts.TimeIndex, Keys: opts.KeyIndex, }) + if err != nil { + return fmt.Errorf("[klevdb.Check] %s check: %w", dir, err) + } + return nil } // Recover rewrites the storage to include all messages prior the first that fails an integrity check func Recover(dir string, opts Options) error { - return segment.RecoverDir(dir, index.Params{ + err := segment.RecoverDir(dir, index.Params{ Times: opts.TimeIndex, Keys: opts.KeyIndex, }) + if err != nil { + return fmt.Errorf("[klevdb.Record] %s recover: %w", dir, err) + } + return nil } diff --git a/blocking.go b/blocking.go index 5513421..84bcf8a 100644 --- a/blocking.go +++ b/blocking.go @@ -1,6 +1,9 @@ package klevdb -import "context" +import ( + "context" + "fmt" +) type BlockingLog interface { Log @@ -12,18 +15,24 @@ type BlockingLog interface { ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) } +// TODO docs func OpenBlocking(dir string, opts Options) (BlockingLog, error) { l, err := Open(dir, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.OpenBlocking] %s open: %w", dir, err) } - return WrapBlocking(l) + w, err := WrapBlocking(l) + if err != nil { + return nil, fmt.Errorf("[klevdb.OpenBlocking] %s wrap: %w", dir, err) + } + return w, nil } +// TODO docs func WrapBlocking(l Log) (BlockingLog, error) { next, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.WrapBlocking] %s next offset: %w", l, err) } return &blockingLog{l, NewOffsetNotify(next)}, nil } @@ -35,27 +44,42 @@ type blockingLog struct { func (l *blockingLog) Publish(messages []Message) (int64, error) { nextOffset, err := l.Log.Publish(messages) + if err != nil { + return OffsetInvalid, fmt.Errorf("[klevdb.BlockingLog.Publish] %s publish: %w", l.Log, err) + } + l.notify.Set(nextOffset) - return nextOffset, err + return nextOffset, nil } func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) { if err := l.notify.Wait(ctx, offset); err != nil { - return 0, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeBlocking] %s wait: %w", l.Log, err) } - return l.Log.Consume(offset, maxCount) + next, msgs, err := l.Log.Consume(offset, maxCount) + if err != nil { + return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeBlocking] %s consume: %w", l.Log, err) + } + return next, msgs, nil } func (l *blockingLog) ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (int64, []Message, error) { if err := l.notify.Wait(ctx, offset); err != nil { - return 0, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeByKeyBlocking] %s wait: %w", l.Log, err) } - return l.Log.ConsumeByKey(key, offset, maxCount) + next, msgs, err := l.Log.ConsumeByKey(key, offset, maxCount) + if err != nil { + return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeByKeyBlocking] %s consume: %w", l.Log, err) + } + return next, msgs, nil } func (l *blockingLog) Close() error { if err := l.notify.Close(); err != nil { - return err + return fmt.Errorf("[klevdb.BlockingLog.Close] %s notify close: %w", l.Log, err) + } + if err := l.Log.Close(); err != nil { + return fmt.Errorf("[klevdb.BlockingLog.Close] %s close: %w", l.Log, err) } - return l.Log.Close() + return nil } diff --git a/delete.go b/delete.go index fb380a5..5a1b11c 100644 --- a/delete.go +++ b/delete.go @@ -2,6 +2,7 @@ package klevdb import ( "context" + "fmt" "time" "golang.org/x/exp/maps" @@ -24,7 +25,7 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff { case <-t.C: return nil case <-ctx.Done(): - return ctx.Err() + return fmt.Errorf("[klevdb.DeleteMultiWithWait] canceled: %w", ctx.Err()) } } } @@ -48,7 +49,7 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff deleted, size, err := l.Delete(offsets) switch { case err != nil: - return deletedOffsets, deletedSize, err + return deletedOffsets, deletedSize, fmt.Errorf("[klevdb.DeleteMulti] delete: %w", err) case len(deleted) == 0: return deletedOffsets, deletedSize, nil } @@ -61,7 +62,7 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff }) if err := backoff(ctx); err != nil { - return deletedOffsets, deletedSize, err + return deletedOffsets, deletedSize, fmt.Errorf("[klevdb.DeleteMulti] backoff: %w", err) } } diff --git a/notify.go b/notify.go index 74f69e7..fbb89f7 100644 --- a/notify.go +++ b/notify.go @@ -3,9 +3,8 @@ package klevdb import ( "context" "errors" + "fmt" "sync/atomic" - - "github.com/klev-dev/kleverr" ) var ErrOffsetNotifyClosed = errors.New("offset notify already closed") @@ -36,7 +35,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { b, ok := <-w.barrier if !ok { // already closed, return error - return kleverr.Ret(ErrOffsetNotifyClosed) + return fmt.Errorf("[klevdb.OffsetNotify.Read] aquire barrier: %w", ErrOffsetNotifyClosed) } // probe the current offset @@ -55,7 +54,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { case <-b: return nil case <-ctx.Done(): - return kleverr.Ret(ctx.Err()) + return fmt.Errorf("[klevdb.OffsetNotify.Read] wait canceled: %w", ctx.Err()) } } @@ -84,7 +83,7 @@ func (w *OffsetNotify) Close() error { b, ok := <-w.barrier if !ok { // already closed, return an error - return kleverr.Ret(ErrOffsetNotifyClosed) + return fmt.Errorf("[klevdb.OffsetNotify.Close] aquire barrier: %w", ErrOffsetNotifyClosed) } // close the current barrier, e.g. broadcasting update diff --git a/segment/utils.go b/segment/utils.go index aab98e6..a375f49 100644 --- a/segment/utils.go +++ b/segment/utils.go @@ -12,7 +12,7 @@ import ( func randStr(length int) (string, error) { k := make([]byte, length) if _, err := io.ReadFull(rand.Reader, k); err != nil { - return "", fmt.Errorf("[segment.Rand] read full: %w", err) + return "", fmt.Errorf("[segment.rand] read full: %w", err) } return base58.Encode(k), nil } @@ -20,20 +20,20 @@ func randStr(length int) (string, error) { func copyFile(src, dst string) error { fsrc, err := os.Open(src) if err != nil { - return fmt.Errorf("[segment.Copy] %s open src: %w", src, err) + return fmt.Errorf("[segment.copy] %s open src: %w", src, err) } defer fsrc.Close() stat, err := fsrc.Stat() if err != nil { - return fmt.Errorf("[segment.Copy] %s stat src: %w", src, err) + return fmt.Errorf("[segment.copy] %s stat src: %w", src, err) } fdst, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if os.IsExist(err) { switch dstStat, err := os.Stat(dst); { case err != nil: - return fmt.Errorf("[segment.Copy] %s stat dst: %w", dst, err) + return fmt.Errorf("[segment.copy] %s stat dst: %w", dst, err) case stat.Size() == dstStat.Size() && stat.ModTime().Equal(dstStat.ModTime()): // TODO do we need a safer version of this? return nil @@ -41,25 +41,25 @@ func copyFile(src, dst string) error { fdst, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) } if err != nil { - return fmt.Errorf("[segment.Copy] %s open dst: %w", dst, err) + return fmt.Errorf("[segment.copy] %s open dst: %w", dst, err) } defer fdst.Close() switch n, err := io.Copy(fdst, fsrc); { case err != nil: - return fmt.Errorf("[segment.Copy] %s copy to %s: %w", src, dst, err) + return fmt.Errorf("[segment.copy] %s copy to %s: %w", src, dst, err) case n < stat.Size(): - return fmt.Errorf("[segment.Copy] %s (%d) incomplete copy to %s (%d): %w", src, stat.Size(), dst, n, err) + return fmt.Errorf("[segment.copy] %s (%d) incomplete copy to %s (%d): %w", src, stat.Size(), dst, n, err) } if err := fdst.Sync(); err != nil { - return fmt.Errorf("[segment.Copy] %s sync: %w", dst, err) + return fmt.Errorf("[segment.copy] %s sync: %w", dst, err) } if err := fdst.Close(); err != nil { - return fmt.Errorf("[segment.Copy] %s close: %w", dst, err) + return fmt.Errorf("[segment.copy] %s close: %w", dst, err) } if err := os.Chtimes(dst, stat.ModTime(), stat.ModTime()); err != nil { - return fmt.Errorf("[segment.Copy] %s chtimes: %w", dst, err) + return fmt.Errorf("[segment.copy] %s chtimes: %w", dst, err) } return nil diff --git a/typed.go b/typed.go index 8dd956b..34e9c2b 100644 --- a/typed.go +++ b/typed.go @@ -1,6 +1,9 @@ package klevdb -import "time" +import ( + "fmt" + "time" +) type TMessage[K any, V any] struct { Offset int64 @@ -47,7 +50,7 @@ type TLog[K any, V any] interface { func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error) { l, err := Open(dir, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("[OpenT] %s open: %w", dir, err) } return &tlog[K, V]{l, keyCodec, valueCodec}, nil } @@ -69,17 +72,21 @@ func (l *tlog[K, V]) Publish(tmessages []TMessage[K, V]) (int64, error) { for i, tmsg := range tmessages { messages[i], err = l.encode(tmsg) if err != nil { - return OffsetInvalid, err + return OffsetInvalid, fmt.Errorf("[klevdb.TLog.Publish] encode: %w", err) } } - return l.Log.Publish(messages) + offset, err := l.Log.Publish(messages) + if err != nil { + return OffsetInvalid, fmt.Errorf("[klevdb.TLog.Publish] publish: %w", err) + } + return offset, nil } func (l *tlog[K, V]) Consume(offset int64, maxCount int64) (int64, []TMessage[K, V], error) { nextOffset, messages, err := l.Log.Consume(offset, maxCount) if err != nil { - return OffsetInvalid, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TLog.Consume] consume: %w", err) } if len(messages) == 0 { return nextOffset, nil, nil @@ -89,7 +96,7 @@ func (l *tlog[K, V]) Consume(offset int64, maxCount int64) (int64, []TMessage[K, for i, msg := range messages { tmessages[i], err = l.decode(msg) if err != nil { - return OffsetInvalid, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TLog.Consume] decode: %w", err) } } return nextOffset, tmessages, nil @@ -98,12 +105,12 @@ func (l *tlog[K, V]) Consume(offset int64, maxCount int64) (int64, []TMessage[K, func (l *tlog[K, V]) ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (int64, []TMessage[K, V], error) { kbytes, err := l.keyCodec.Encode(key, empty) if err != nil { - return OffsetInvalid, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TLog.ConsumeByKey] encode: %w", err) } nextOffset, messages, err := l.Log.ConsumeByKey(kbytes, offset, maxCount) if err != nil { - return OffsetInvalid, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TLog.ConsumeByKey] consume: %w", err) } if len(messages) == 0 { return nextOffset, nil, nil @@ -113,7 +120,7 @@ func (l *tlog[K, V]) ConsumeByKey(key K, empty bool, offset int64, maxCount int6 for i, msg := range messages { tmessages[i], err = l.decode(msg) if err != nil { - return OffsetInvalid, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TLog.ConsumeByKey] decode: %w", err) } } return nextOffset, tmessages, nil @@ -122,29 +129,41 @@ func (l *tlog[K, V]) ConsumeByKey(key K, empty bool, offset int64, maxCount int6 func (l *tlog[K, V]) Get(offset int64) (TMessage[K, V], error) { msg, err := l.Log.Get(offset) if err != nil { - return TMessage[K, V]{Offset: OffsetInvalid}, err + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.Get] get: %w", err) + } + tmsg, err := l.decode(msg) + if err != nil { + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.Get] decode: %w", err) } - return l.decode(msg) + return tmsg, nil } func (l *tlog[K, V]) GetByKey(key K, empty bool) (TMessage[K, V], error) { kbytes, err := l.keyCodec.Encode(key, empty) if err != nil { - return TMessage[K, V]{Offset: OffsetInvalid}, err + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.GetByKey] encode: %w", err) } msg, err := l.Log.GetByKey(kbytes) if err != nil { - return TMessage[K, V]{Offset: OffsetInvalid}, err + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.GetByKey] get: %w", err) } - return l.decode(msg) + tmsg, err := l.decode(msg) + if err != nil { + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.GetByKey] decode: %w", err) + } + return tmsg, nil } func (l *tlog[K, V]) GetByTime(start time.Time) (TMessage[K, V], error) { msg, err := l.Log.GetByTime(start) if err != nil { - return TMessage[K, V]{Offset: OffsetInvalid}, err + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.GetByTime] get: %w", err) } - return l.decode(msg) + tmsg, err := l.decode(msg) + if err != nil { + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.GetByTime] decode: %w", err) + } + return tmsg, nil } func (l *tlog[K, V]) Raw() Log { @@ -157,12 +176,12 @@ func (l *tlog[K, V]) encode(tmsg TMessage[K, V]) (msg Message, err error) { msg.Key, err = l.keyCodec.Encode(tmsg.Key, tmsg.KeyEmpty) if err != nil { - return InvalidMessage, err + return InvalidMessage, fmt.Errorf("[klevdb.TLog.encode] key: %w", err) } msg.Value, err = l.valueCodec.Encode(tmsg.Value, tmsg.ValueEmpty) if err != nil { - return InvalidMessage, err + return InvalidMessage, fmt.Errorf("[klevdb.TLog.encode] value: %w", err) } return msg, nil @@ -174,12 +193,12 @@ func (l *tlog[K, V]) decode(msg Message) (tmsg TMessage[K, V], err error) { tmsg.Key, tmsg.KeyEmpty, err = l.keyCodec.Decode(msg.Key) if err != nil { - return TMessage[K, V]{Offset: OffsetInvalid}, err + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.decode] key: %w", err) } tmsg.Value, tmsg.ValueEmpty, err = l.valueCodec.Decode(msg.Value) if err != nil { - return TMessage[K, V]{Offset: OffsetInvalid}, err + return TMessage[K, V]{Offset: OffsetInvalid}, fmt.Errorf("[klevdb.TLog.decode] value: %w", err) } return tmsg, nil diff --git a/typed_blocking.go b/typed_blocking.go index a97e56e..57eee29 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -1,27 +1,38 @@ package klevdb -import "context" +import ( + "context" + "fmt" +) type TBlockingLog[K any, V any] interface { TLog[K, V] + // ConsumeBlocking is similar to Consume, but if offset is equal to the next offsetit will block until next event is produced ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) } +// TODO docs func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) { l, err := OpenT(dir, opts, keyCodec, valueCodec) if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.OpenTBlocking] %s open: %w", dir, err) } - return WrapTBlocking(l) + w, err := WrapTBlocking(l) + if err != nil { + return nil, fmt.Errorf("[klevdb.OpenTBlocking] %s wrap: %w", dir, err) + } + return w, nil } +// TODO docs func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) { next, err := l.NextOffset() if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.WrapTBlocking] %s next offset: %w", l, err) } return &tlogBlocking[K, V]{l, NewOffsetNotify(next)}, nil } @@ -33,27 +44,42 @@ type tlogBlocking[K any, V any] struct { func (l *tlogBlocking[K, V]) Publish(tmessages []TMessage[K, V]) (int64, error) { nextOffset, err := l.TLog.Publish(tmessages) + if err != nil { + return OffsetInvalid, fmt.Errorf("[klevdb.TBlockingLog.Publish] %s publish: %w", l.TLog, err) + } + l.notify.Set(nextOffset) - return nextOffset, err + return nextOffset, nil } func (l *tlogBlocking[K, V]) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []TMessage[K, V], error) { if err := l.notify.Wait(ctx, offset); err != nil { - return 0, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TBlockingLog.ConsumeBlocking] %s wait: %w", l.TLog, err) } - return l.TLog.Consume(offset, maxCount) + next, msgs, err := l.TLog.Consume(offset, maxCount) + if err != nil { + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TBlockingLog.ConsumeBlocking] %s consume: %w", l.TLog, err) + } + return next, msgs, nil } func (l *tlogBlocking[K, V]) ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (int64, []TMessage[K, V], error) { if err := l.notify.Wait(ctx, offset); err != nil { - return 0, nil, err + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TBlockingLog.ConsumeByKeyBlocking] %s wait: %w", l.TLog, err) } - return l.TLog.ConsumeByKey(key, empty, offset, maxCount) + next, msgs, err := l.TLog.ConsumeByKey(key, empty, offset, maxCount) + if err != nil { + return OffsetInvalid, nil, fmt.Errorf("[klevdb.TBlockingLog.ConsumeByKeyBlocking] %s consume: %w", l.TLog, err) + } + return next, msgs, nil } func (l *tlogBlocking[K, V]) Close() error { if err := l.notify.Close(); err != nil { - return err + return fmt.Errorf("[klevdb.TBlockingLog.Close] %s notify close: %w", l.TLog, err) + } + if err := l.TLog.Close(); err != nil { + return fmt.Errorf("[klevdb.TBlockingLog.Close] %s close: %w", l.TLog, err) } - return l.TLog.Close() + return nil } diff --git a/typed_codec.go b/typed_codec.go index 30ebf2a..b6889c5 100644 --- a/typed_codec.go +++ b/typed_codec.go @@ -3,8 +3,7 @@ package klevdb import ( "encoding/binary" "encoding/json" - - "github.com/klev-dev/kleverr" + "fmt" ) type Codec[T any] interface { @@ -77,7 +76,7 @@ func (c varintCodec) Decode(b []byte) (int64, bool, error) { } t, n := binary.Varint(b) if n <= 0 { - return 0, true, kleverr.Newf("invalid varint: %d", n) + return 0, true, fmt.Errorf("[klevdb.VarintCodec.Decode] invalid varint %d", n) } return t, false, nil } From 04559b76a195469cfff5dc36500bd5c3d327602b Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sun, 9 Mar 2025 17:27:57 -0400 Subject: [PATCH 4/4] more wrapping --- log.go | 6 ++--- writer.go | 66 +++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/log.go b/log.go index 58bd042..7a6e79e 100644 --- a/log.go +++ b/log.go @@ -261,7 +261,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { switch msg, err := rdr.GetByKey(key, hash); { case err == nil: return msg, nil - case err == message.ErrNotFound: + case errors.Is(err, message.ErrNotFound): // not in this segment, try the rest default: return message.Invalid, err @@ -296,12 +296,12 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { switch msg, err := rdr.GetByTime(ts); { case err == nil: return msg, nil - case err == message.ErrInvalidOffset: + case errors.Is(err, message.ErrInvalidOffset): // not in this segment, try the rest if i == 0 { return rdr.Get(message.OffsetOldest) } - case err == message.ErrNotFound: + case errors.Is(err, message.ErrNotFound): // time is between end of this and begin next if i < len(l.readers)-1 { nextRdr := l.readers[i+1] diff --git a/writer.go b/writer.go index 39d718e..f45f543 100644 --- a/writer.go +++ b/writer.go @@ -2,6 +2,7 @@ package klevdb import ( "errors" + "fmt" "sync" "sync/atomic" "time" @@ -11,7 +12,6 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) type writer struct { @@ -27,14 +27,14 @@ type writer struct { func openWriter(seg segment.Segment, params index.Params, nextTime int64) (*writer, error) { messages, err := message.OpenWriter(seg.Log) if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.openWriter] %s messages open: %w", seg.Log, err) } var ix *writerIndex if messages.Size() > 0 { indexItems, err := seg.ReindexAndReadIndex(params) if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.openWriter] %s reindex: %w", seg.Index, err) } ix = newWriterIndex(indexItems, params.Keys, seg.Offset, nextTime) } else { @@ -43,12 +43,12 @@ func openWriter(seg segment.Segment, params index.Params, nextTime int64) (*writ items, err := index.OpenWriter(seg.Index, params) if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.openWriter] %s index open: %w", seg.Index, err) } reader, err := openReaderAppend(seg, params, ix) if err != nil { - return nil, err + return nil, fmt.Errorf("[klevdb.openWriter] %d reader open: %w", seg.Offset, err) } return &writer{ @@ -63,7 +63,11 @@ func openWriter(seg segment.Segment, params index.Params, nextTime int64) (*writ } func (w *writer) GetNextOffset() (int64, error) { - return w.index.GetNextOffset() + nextOffset, err := w.index.GetNextOffset() + if err != nil { + return OffsetInvalid, fmt.Errorf("[klevdb.writer.GetNextOffset] get: %w", err) + } + return nextOffset, nil } func (w *writer) NeedsRollover(rollover int64) bool { @@ -82,12 +86,12 @@ func (w *writer) Publish(msgs []message.Message) (int64, error) { position, err := w.messages.Write(msgs[i]) if err != nil { - return OffsetInvalid, err + return OffsetInvalid, fmt.Errorf("[klevdb.writer.Publish] messages write: %w", err) } items[i] = w.params.NewItem(msgs[i], position, indexTime) if err := w.items.Write(items[i]); err != nil { - return OffsetInvalid, err + return OffsetInvalid, fmt.Errorf("[klevdb.writer.Publish] index write: %w", err) } indexTime = items[i].Timestamp } @@ -105,19 +109,19 @@ var errSegmentChanged = errors.New("writing segment changed") func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { if err := w.Sync(); err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("[klevdb.writer.Delete] sync: %w", err) } if len(rs.SurviveOffsets)+len(rs.DeletedOffsets) != w.index.Len() { // the number of messages changed, nothing to drop if err := rs.Segment.Remove(); err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("[klevdb.writer.Delete] rewrite remove: %w", err) } - return nil, nil, kleverr.Newf("delete failed: %w", errSegmentChanged) + return nil, nil, fmt.Errorf("[klevdb.writer.Delete] rewrite check: %w", errSegmentChanged) } if err := w.Close(); err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("[klevdb.writer.Delete] close: %w", err) } if len(rs.SurviveOffsets) == 0 { @@ -179,20 +183,20 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { func (w *writer) Sync() error { if err := w.messages.Sync(); err != nil { - return err + return fmt.Errorf("[klevdb.writer.Sync] messages: %w", err) } if err := w.items.Sync(); err != nil { - return err + return fmt.Errorf("[klevdb.writer.Sync] index: %w", err) } return nil } func (w *writer) Close() error { if err := w.messages.Close(); err != nil { - return err + return fmt.Errorf("[klevdb.writer.Close] messages: %w", err) } if err := w.items.Close(); err != nil { - return err + return fmt.Errorf("[klevdb.writer.Close] index: %w", err) } return w.reader.Close() @@ -273,16 +277,19 @@ func (ix *writerIndex) Consume(offset int64) (int64, int64, int64, error) { defer ix.mu.RUnlock() position, maxPosition, err := index.Consume(ix.items, offset) - if err == index.ErrIndexEmpty { + switch { + case err == nil: + return position, maxPosition, offset, nil + case errors.Is(err, index.ErrIndexEmpty): if nextOffset := ix.nextOffset.Load(); offset <= nextOffset { return -1, -1, nextOffset, nil } - } else if err == message.ErrInvalidOffset { + case errors.Is(err, message.ErrInvalidOffset): if nextOffset := ix.nextOffset.Load(); offset == nextOffset { return -1, -1, nextOffset, nil } } - return position, maxPosition, offset, err + return -1, -1, OffsetInvalid, fmt.Errorf("[klevdb.writerIndex.Consume] consume: %w", err) } func (ix *writerIndex) Get(offset int64) (int64, error) { @@ -290,26 +297,37 @@ func (ix *writerIndex) Get(offset int64) (int64, error) { defer ix.mu.RUnlock() position, err := index.Get(ix.items, offset) - if err == message.ErrNotFound { + switch { + case err == nil: + return position, nil + case errors.Is(err, message.ErrNotFound): if nextOffset := ix.nextOffset.Load(); offset >= nextOffset { - return 0, message.ErrInvalidOffset + return -1, fmt.Errorf("[klevdb.writerIndex.Get] get offset too big: %w", message.ErrInvalidOffset) } } - return position, err + return -1, fmt.Errorf("[klevdb.writerIndex.Get] get: %w", err) } func (ix *writerIndex) Keys(keyHash []byte) ([]int64, error) { ix.mu.RLock() defer ix.mu.RUnlock() - return index.Keys(ix.keys, keyHash) + offsets, err := index.Keys(ix.keys, keyHash) + if err != nil { + return nil, fmt.Errorf("[klevdb.writerIndex.Keys] keys: %w", err) + } + return offsets, nil } func (ix *writerIndex) Time(ts int64) (int64, error) { ix.mu.RLock() defer ix.mu.RUnlock() - return index.Time(ix.items, ts) + offset, err := index.Time(ix.items, ts) + if err != nil { + return OffsetInvalid, fmt.Errorf("[klevdb.writerIndex.Time] time: %w", err) + } + return offset, nil } func (ix *writerIndex) Len() int {