diff --git a/Makefile b/Makefile index 0296b4f..06f2b97 100644 --- a/Makefile +++ b/Makefile @@ -32,5 +32,5 @@ bench-multi: .PHONY: update-libs update-libs: - go get -u github.com/klev-dev/kleverr@main + go get -u ./... go mod tidy diff --git a/blocking.go b/blocking.go index 5513421..b632a07 100644 --- a/blocking.go +++ b/blocking.go @@ -2,16 +2,18 @@ package klevdb import "context" +// BlockingLog enhances log adding blocking consume type BlockingLog interface { Log - // ConsumeBlocking is similar to Consume, but if offset is equal to the next offsetit will block until next event is produced + // ConsumeBlocking is similar to Consume, but if offset is equal to the next offset it will block until next message is produced ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) // ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) } +// OpenBlocking opens log and wraps it with support for blocking consume func OpenBlocking(dir string, opts Options) (BlockingLog, error) { l, err := Open(dir, opts) if err != nil { @@ -20,6 +22,7 @@ func OpenBlocking(dir string, opts Options) (BlockingLog, error) { return WrapBlocking(l) } +// WrapBlocking wraps log with support for blocking consume func WrapBlocking(l Log) (BlockingLog, error) { next, err := l.NextOffset() if err != nil { @@ -35,8 +38,12 @@ type blockingLog struct { func (l *blockingLog) Publish(messages []Message) (int64, error) { nextOffset, err := l.Log.Publish(messages) + if err != nil { + return OffsetInvalid, err + } + l.notify.Set(nextOffset) - return nextOffset, err + return nextOffset, nil } func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) { diff --git a/go.mod b/go.mod index 8784725..96ffefc 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ toolchain go1.23.1 require ( github.com/gofrs/flock v0.12.1 - github.com/klev-dev/kleverr v0.1.0 github.com/mr-tron/base58 v1.2.0 github.com/plar/go-adaptive-radix-tree/v2 v2.0.3 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index de81f44..ef297f7 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E= github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0= -github.com/klev-dev/kleverr v0.1.0 h1:UnBDKFlHFy6bnN5M/fQ3uCI4G91ciCf1jX3dj1EqL9k= -github.com/klev-dev/kleverr v0.1.0/go.mod h1:DV1tEcfsgAzKraeb/7nux27wOJs8w9P8fLB6GT7DmGM= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/index/format.go b/index/format.go index eff23c4..fc6b7d8 100644 --- a/index/format.go +++ b/index/format.go @@ -3,13 +3,13 @@ package index import ( "encoding/binary" "errors" + "fmt" "io" "os" - - "github.com/klev-dev/kleverr" ) var ErrCorrupted = errors.New("index corrupted") +var errIndexSize = fmt.Errorf("%w: unaligned index size", ErrCorrupted) type Writer struct { opts Params @@ -22,20 +22,18 @@ type Writer struct { func OpenWriter(path string, opts Params) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("could not open index: %w", err) + return nil, fmt.Errorf("write index open: %w", err) } - pos := int64(0) - if stat, err := f.Stat(); err != nil { - return nil, kleverr.Newf("could not stat index: %w", err) - } else { - pos = stat.Size() + stat, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("write index stat: %w", err) } return &Writer{ opts: opts, f: f, - pos: pos, + pos: stat.Size(), keyOffset: opts.keyOffset(), }, nil } @@ -57,7 +55,7 @@ func (w *Writer) Write(it Item) error { } if n, err := w.f.Write(w.buff); err != nil { - return kleverr.Newf("failed to write index: %w", err) + return fmt.Errorf("write index: %w", err) } else { w.pos += int64(n) } @@ -71,26 +69,23 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync index: %w", err) + return fmt.Errorf("write index sync: %w", err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close index: %w", err) + return fmt.Errorf("write index close: %w", err) } return nil } func (w *Writer) SyncAndClose() error { - if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync index: %w", err) - } - if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close index: %w", err) + if err := w.Sync(); err != nil { + return err } - return nil + return w.Close() } func Write(path string, opts Params, index []Item) error { @@ -112,24 +107,24 @@ func Write(path string, opts Params, index []Item) error { func Read(path string, opts Params) ([]Item, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("could not open index: %w", err) + return nil, fmt.Errorf("read index open: %w", err) } defer f.Close() stat, err := os.Stat(path) if err != nil { - return nil, kleverr.Newf("could not stat index: %w", err) + return nil, fmt.Errorf("read index stat: %w", err) } dataSize := stat.Size() itemSize := opts.Size() if dataSize%itemSize > 0 { - return nil, kleverr.Newf("%w: unexpected data len: %d", ErrCorrupted, dataSize) + return nil, errIndexSize } data := make([]byte, dataSize) if _, err = io.ReadFull(f, data); err != nil { - return nil, kleverr.Newf("could not read index: %w", err) + return nil, fmt.Errorf("read index: %w", err) } var keyOffset = opts.keyOffset() diff --git a/log.go b/log.go index a4afdce..f237fae 100644 --- a/log.go +++ b/log.go @@ -2,6 +2,7 @@ package klevdb import ( "errors" + "fmt" "os" "path/filepath" "sync" @@ -14,9 +15,14 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) +var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex) +var errKeyNotFound = fmt.Errorf("key %w", ErrNotFound) +var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex) +var errTimeNotFound = fmt.Errorf("time %w", ErrNotFound) +var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset) + // Open create a log based on a dir and set of options func Open(dir string, opts Options) (result Log, err error) { if opts.Rollover == 0 { @@ -25,7 +31,7 @@ func Open(dir string, opts Options) (result Log, err error) { if opts.CreateDirs { if err := os.MkdirAll(dir, 0700); err != nil { - return nil, kleverr.Newf("could not create log dirs: %w", err) + return nil, fmt.Errorf("open create dirs: %w", err) } } @@ -33,22 +39,22 @@ func Open(dir string, opts Options) (result Log, err error) { if opts.Readonly { switch ok, err := lock.TryRLock(); { case err != nil: - return nil, kleverr.Newf("could not lock: %w", err) + return nil, fmt.Errorf("open read lock: %w", err) case !ok: - return nil, kleverr.Newf("log already writing locked") + return nil, fmt.Errorf("open already writing locked") } } else { switch ok, err := lock.TryLock(); { case err != nil: - return nil, kleverr.Newf("could not lock: %w", err) + return nil, fmt.Errorf("open lock: %w", err) case !ok: - return nil, kleverr.Newf("log already locked") + return nil, fmt.Errorf("open already locked") } } defer func() { if err != nil { if lerr := lock.Unlock(); lerr != nil { - err = kleverr.Newf("%w: could not release lock: %w", err, lerr) + err = fmt.Errorf("%w: open release lock: %w", err, lerr) } } }() @@ -206,7 +212,7 @@ func (l *log) Consume(offset int64, maxCount int64) (int64, []message.Message, e func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []message.Message, error) { if !l.opts.KeyIndex { - return OffsetInvalid, nil, kleverr.Newf("%w by key", ErrNoIndex) + return OffsetInvalid, nil, errNoKeyIndex } hash := index.KeyHashEncoded(index.KeyHash(key)) @@ -247,7 +253,7 @@ func (l *log) Get(offset int64) (message.Message, error) { func (l *log) GetByKey(key []byte) (message.Message, error) { if !l.opts.KeyIndex { - return message.Invalid, kleverr.Newf("%w by key", ErrNoIndex) + return message.Invalid, errNoKeyIndex } hash := index.KeyHashEncoded(index.KeyHash(key)) @@ -269,7 +275,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { } // not in any segment, so just return the error - return message.Invalid, kleverr.Newf("key %w", message.ErrNotFound) + return message.Invalid, errKeyNotFound } func (l *log) OffsetByKey(key []byte) (int64, error) { @@ -282,7 +288,7 @@ func (l *log) OffsetByKey(key []byte) (int64, error) { func (l *log) GetByTime(start time.Time) (message.Message, error) { if !l.opts.TimeIndex { - return message.Invalid, kleverr.Newf("%w by time", ErrNoIndex) + return message.Invalid, errNoTimeIndex } ts := start.UnixMicro() @@ -314,7 +320,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { } } - return message.Invalid, kleverr.Newf("time %w", message.ErrNotFound) + return message.Invalid, errTimeNotFound } func (l *log) OffsetByTime(start time.Time) (int64, time.Time, error) { @@ -371,7 +377,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err newWriter, newReader, err := l.writer.Delete(rs) switch { - case errors.Is(err, errSegmentChanged): + case err == errSegmentChanged: return nil, 0, nil case err != nil: return nil, 0, err @@ -419,7 +425,7 @@ func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) { lowestOffset := orderedOffsets[0] if lowestOffset < 0 { - return nil, kleverr.Newf("%w: cannot delete relative offsets", message.ErrInvalidOffset) + return nil, errDeleteRelative } l.readersMu.RLock() @@ -544,7 +550,7 @@ func (l *log) Close() error { } if err := l.lock.Unlock(); err != nil { - return kleverr.Newf("could not release lock: %w", err) + return fmt.Errorf("close unlock: %w", err) } return nil diff --git a/log_test.go b/log_test.go index fa13567..3409cca 100644 --- a/log_test.go +++ b/log_test.go @@ -16,7 +16,6 @@ import ( "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) { @@ -1556,7 +1555,7 @@ func testConcurrentPubsubRecent(t *testing.T) { for ctx.Err() == nil { next, msgs, err := s.Consume(offset, 32) if err != nil { - return kleverr.Newf("could not consume offset %d: %w", offset, err) + return fmt.Errorf("could not consume offset %d: %w", offset, err) } if offset == next { @@ -1591,11 +1590,7 @@ func testConcurrentPubsubRecent(t *testing.T) { return nil }) - err = g.Wait() - if serr := kleverr.Get(err); serr != nil { - fmt.Println(serr.Print()) - } - require.NoError(t, err) + require.NoError(t, g.Wait()) } func testConcurrentConsume(t *testing.T) { @@ -1742,9 +1737,5 @@ func testConcurrentGC(t *testing.T) { return nil }) - err = g.Wait() - if serr := kleverr.Get(err); serr != nil { - fmt.Println(serr.Print()) - } - require.NoError(t, err) + require.NoError(t, g.Wait()) } diff --git a/message/format.go b/message/format.go index da853bd..028677c 100644 --- a/message/format.go +++ b/message/format.go @@ -3,17 +3,20 @@ package message import ( "encoding/binary" "errors" + "fmt" "hash/crc32" "io" "os" "time" "golang.org/x/exp/mmap" - - "github.com/klev-dev/kleverr" ) -var ErrCorrupted = errors.New("message corrupted") +var ErrCorrupted = errors.New("log corrupted") +var errShortHeader = fmt.Errorf("%w: short header", ErrCorrupted) +var errShortMessage = fmt.Errorf("%w: short message", ErrCorrupted) +var errNoMessage = fmt.Errorf("%w: no message", ErrCorrupted) +var errCrcFailed = fmt.Errorf("%w: crc failed", ErrCorrupted) var crc32cTable = crc32.MakeTable(crc32.Castagnoli) @@ -31,17 +34,15 @@ type Writer struct { func OpenWriter(path string) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, fmt.Errorf("write log open: %w", err) } - pos := int64(0) - if stat, err := f.Stat(); err != nil { - return nil, kleverr.Newf("could not stat log: %w", err) - } else { - pos = stat.Size() + stat, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("write log stat: %w", err) } - return &Writer{Path: path, f: f, pos: pos}, nil + return &Writer{Path: path, f: f, pos: stat.Size()}, nil } func (w *Writer) Write(m Message) (int64, error) { @@ -71,7 +72,7 @@ func (w *Writer) Write(m Message) (int64, error) { pos := w.pos if n, err := w.f.Write(w.buff); err != nil { - return 0, kleverr.Newf("log failed write: %w", err) + return 0, fmt.Errorf("write log: %w", err) } else { w.pos += int64(n) } @@ -84,26 +85,23 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync log: %w", err) + return fmt.Errorf("write log sync: %w", err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("write log close: %w", err) } return nil } func (w *Writer) SyncAndClose() error { - if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync log: %w", err) + if err := w.Sync(); err != nil { + return err } - if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) - } - return nil + return w.Close() } type Reader struct { @@ -115,7 +113,7 @@ type Reader struct { func OpenReader(path string) (*Reader, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, fmt.Errorf("read log open: %w", err) } return &Reader{ @@ -127,7 +125,7 @@ func OpenReader(path string) (*Reader, error) { func OpenReaderMem(path string) (*Reader, error) { f, err := mmap.Open(path) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, fmt.Errorf("read mem log open: %w", err) } return &Reader{ @@ -174,9 +172,9 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case err == nil: // all good, continue case errors.Is(err, io.ErrUnexpectedEOF): - return -1, kleverr.Newf("%w: short header", ErrCorrupted) + return -1, errShortHeader default: - return -1, kleverr.Newf("could not read header: %w", err) + return -1, fmt.Errorf("read header: %w", err) } msg.Offset = int64(binary.BigEndian.Uint64(headerBytes[0:])) @@ -196,16 +194,16 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case err == nil: // all good, continue case errors.Is(err, io.ErrUnexpectedEOF): - return -1, kleverr.Newf("%w: short message", ErrCorrupted) + return -1, errShortMessage case errors.Is(err, io.EOF): - return -1, kleverr.Newf("%w: no message", ErrCorrupted) + return -1, errNoMessage default: - return -1, kleverr.Newf("could not read message: %w", err) + return -1, fmt.Errorf("read message: %w", err) } actualCRC := crc32.Checksum(messageBytes, crc32cTable) if expectedCRC != actualCRC { - return -1, kleverr.Newf("%w: invalid crc: expected=%d; actual=%d", ErrCorrupted, expectedCRC, actualCRC) + return -1, errCrcFailed } if keySize > 0 { @@ -222,11 +220,11 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err func (r *Reader) Close() error { if r.ra != nil { if err := r.ra.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("write mem log close: %w", err) } } else { if err := r.r.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return fmt.Errorf("write log close: %w", err) } } return nil diff --git a/message/message.go b/message/message.go index 3af2c55..2d7c3c2 100644 --- a/message/message.go +++ b/message/message.go @@ -5,8 +5,6 @@ import ( "fmt" "strings" "time" - - "github.com/klev-dev/kleverr" ) const ( @@ -23,13 +21,6 @@ const ( var ErrInvalidOffset = errors.New("invalid offset") var ErrNotFound = errors.New("not found") -func ValidateOffset(offset int64) error { - if offset < OffsetOldest { - return kleverr.Newf("%w: %d is not a valid offset", ErrInvalidOffset, offset) - } - return nil -} - type Message struct { Offset int64 Time time.Time diff --git a/notify.go b/notify.go index 74f69e7..361801a 100644 --- a/notify.go +++ b/notify.go @@ -4,8 +4,6 @@ import ( "context" "errors" "sync/atomic" - - "github.com/klev-dev/kleverr" ) var ErrOffsetNotifyClosed = errors.New("offset notify already closed") @@ -36,7 +34,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { b, ok := <-w.barrier if !ok { // already closed, return error - return kleverr.Ret(ErrOffsetNotifyClosed) + return ErrOffsetNotifyClosed } // probe the current offset @@ -55,7 +53,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { case <-b: return nil case <-ctx.Done(): - return kleverr.Ret(ctx.Err()) + return ctx.Err() } } @@ -84,7 +82,7 @@ func (w *OffsetNotify) Close() error { b, ok := <-w.barrier if !ok { // already closed, return an error - return kleverr.Ret(ErrOffsetNotifyClosed) + return ErrOffsetNotifyClosed } // close the current barrier, e.g. broadcasting update diff --git a/reader.go b/reader.go index 2f71553..fd8749f 100644 --- a/reader.go +++ b/reader.go @@ -2,7 +2,6 @@ package klevdb import ( "bytes" - "errors" "sync" "sync/atomic" "time" @@ -133,14 +132,16 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 } positions, err := index.Keys(keyHash) - if err != nil { - if errors.Is(err, message.ErrNotFound) { - nextOffset, err := index.GetNextOffset() - if err != nil { - return OffsetInvalid, nil, err - } - return nextOffset, nil, nil + switch { + case err == nil: + break + case err == message.ErrNotFound: + nextOffset, err := index.GetNextOffset() + if err != nil { + return OffsetInvalid, nil, err } + return nextOffset, nil, nil + default: return OffsetInvalid, nil, err } diff --git a/segment/index.go b/segment/index.go index dadcc3d..7458241 100644 --- a/segment/index.go +++ b/segment/index.go @@ -2,7 +2,6 @@ package segment import ( "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/kleverr" ) type Offsetter interface { @@ -61,11 +60,10 @@ func Get[S ~[]O, O Offsetter](segments S, offset int64) (O, int, error) { switch { case offset < beginSegment.GetOffset(): var v O - err := message.ErrNotFound if beginSegment.GetOffset() == 0 { - err = message.ErrInvalidOffset + return v, -1, message.ErrInvalidOffset } - return v, -1, kleverr.Newf("%w: %d is before beginning", err, offset) + return v, -1, message.ErrNotFound case offset == beginSegment.GetOffset(): return beginSegment, 0, nil } diff --git a/segment/segment.go b/segment/segment.go index 6bdbf33..ba06b64 100644 --- a/segment/segment.go +++ b/segment/segment.go @@ -12,7 +12,6 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/kleverr" ) type Segment struct { @@ -46,12 +45,12 @@ type Stats struct { func (s Segment) Stat(params index.Params) (Stats, error) { logStat, err := os.Stat(s.Log) if err != nil { - return Stats{}, kleverr.Newf("could not stat log: %w", err) + return Stats{}, fmt.Errorf("stat log: %w", err) } indexStat, err := os.Stat(s.Index) if err != nil { - return Stats{}, kleverr.Newf("could not stat index: %w", err) + return Stats{}, fmt.Errorf("stat index: %w", err) } return Stats{ @@ -61,6 +60,9 @@ func (s Segment) Stat(params index.Params) (Stats, error) { }, nil } +var errIndexSize = fmt.Errorf("%w: incorrect size", index.ErrCorrupted) +var errIndexItem = fmt.Errorf("%w: incorrect item", index.ErrCorrupted) + func (s Segment) Check(params index.Params) error { log, err := message.OpenReader(s.Log) if err != nil { @@ -75,7 +77,7 @@ func (s Segment) Check(params index.Params) error { if errors.Is(err, io.EOF) { break } else if err != nil { - return kleverr.Newf("%s: %w", s.Log, err) + return err } item := params.NewItem(msg, position, indexTime) @@ -91,11 +93,11 @@ func (s Segment) Check(params index.Params) error { case err != nil: return err case len(logIndex) != len(items): - return kleverr.Newf("%s: incorrect index size: %w", s.Index, index.ErrCorrupted) + return errIndexSize default: for i, item := range logIndex { if item != items[i] { - return kleverr.Newf("%s: incorrect index item: %w", s.Index, index.ErrCorrupted) + return errIndexItem } } } @@ -144,20 +146,17 @@ func (s Segment) Recover(params index.Params) error { if err := log.Close(); err != nil { return err } - if err := restore.Sync(); err != nil { - return err - } - if err := restore.Close(); err != nil { + if err := restore.SyncAndClose(); err != nil { return err } if corrupted { if err := os.Rename(restore.Path, log.Path); err != nil { - return kleverr.Newf("could not rename restore: %w", err) + return fmt.Errorf("restore log rename: %w", err) } } else { if err := os.Remove(restore.Path); err != nil { - return kleverr.Newf("could not delete restore: %w", err) + return fmt.Errorf("restore log delete: %w", err) } } @@ -182,7 +181,7 @@ func (s Segment) Recover(params index.Params) error { if corruptedIndex { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("could not remove corrupted index: %w", err) + return fmt.Errorf("restore index delete: %w", err) } } @@ -194,7 +193,7 @@ func (s Segment) NeedsReindex() (bool, error) { case os.IsNotExist(err): return true, nil case err != nil: - return false, kleverr.Newf("could not stat index: %w", err) + return false, fmt.Errorf("needs reindex stat: %w", err) case info.Size() == 0: return true, nil default: @@ -250,20 +249,20 @@ func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]inde func (s Segment) Backup(targetDir string) error { logName, err := filepath.Rel(s.Dir, s.Log) if err != nil { - return kleverr.Newf("could not rel log: %w", err) + return fmt.Errorf("backup log rel: %w", err) } targetLog := filepath.Join(targetDir, logName) if err := copyFile(s.Log, targetLog); err != nil { - return kleverr.Newf("could not copy log: %w", err) + return fmt.Errorf("backup log copy: %w", err) } indexName, err := filepath.Rel(s.Dir, s.Index) if err != nil { - return kleverr.Newf("could not rel index: %w", err) + return fmt.Errorf("backup index rel: %w", err) } targetIndex := filepath.Join(targetDir, indexName) if err := copyFile(s.Index, targetIndex); err != nil { - return kleverr.Newf("could not copy index: %w", err) + return fmt.Errorf("backup index copy: %w", err) } return nil @@ -272,7 +271,7 @@ func (s Segment) Backup(targetDir string) error { func (s Segment) ForRewrite() (Segment, error) { randStr, err := randStr(8) if err != nil { - return Segment{}, nil + return Segment{}, err } s.Log = fmt.Sprintf("%s.rewrite.%s", s.Log, randStr) @@ -282,11 +281,11 @@ func (s Segment) ForRewrite() (Segment, error) { func (olds Segment) Rename(news Segment) error { if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("could not rename log: %w", err) + return fmt.Errorf("rename log rename: %w", err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("could not rename index: %w", err) + return fmt.Errorf("rename index rename: %w", err) } return nil @@ -295,15 +294,15 @@ func (olds Segment) Rename(news Segment) error { func (olds Segment) Override(news Segment) error { // remove index segment so we don't have invalid index if err := os.Remove(news.Index); err != nil { - return kleverr.Newf("could not delete index: %w", err) + return fmt.Errorf("override index delete: %w", err) } if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("could not rename log: %w", err) + return fmt.Errorf("override log rename: %w", err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("could not rename index: %w", err) + return fmt.Errorf("override index rename: %w", err) } return nil @@ -311,10 +310,10 @@ func (olds Segment) Override(news Segment) error { func (s Segment) Remove() error { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("could not delete index: %w", err) + return fmt.Errorf("remove index delete: %w", err) } if err := os.Remove(s.Log); err != nil { - return kleverr.Newf("could not delete log: %w", err) + return fmt.Errorf("remove log delete: %w", err) } return nil } diff --git a/segment/segments.go b/segment/segments.go index c16eceb..e29e4d0 100644 --- a/segment/segments.go +++ b/segment/segments.go @@ -2,18 +2,18 @@ package segment import ( "errors" + "fmt" "os" "strconv" "strings" "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/kleverr" ) func Find(dir string) ([]Segment, error) { files, err := os.ReadDir(dir) if err != nil { - return nil, kleverr.Newf("could not list dir: %w", err) + return nil, fmt.Errorf("find read dir: %w", err) } var segments []Segment @@ -23,7 +23,7 @@ func Find(dir string) ([]Segment, error) { offset, err := strconv.ParseInt(offsetStr, 10, 64) if err != nil { - return nil, kleverr.Newf("parse offset failed: %w", err) + return nil, fmt.Errorf("find parse offset: %w", err) } segments = append(segments, New(dir, offset)) @@ -49,7 +49,7 @@ func Stat(segments []Segment, params index.Params) (Stats, error) { for _, seg := range segments { segStat, err := seg.Stat(params) if err != nil { - return Stats{}, err + return Stats{}, fmt.Errorf("stat %d: %w", seg.Offset, err) } total.Segments += segStat.Segments @@ -68,8 +68,11 @@ func CheckDir(dir string, params index.Params) error { case len(segments) == 0: return nil default: - s := segments[len(segments)-1] - return s.Check(params) + seg := segments[len(segments)-1] + if err := seg.Check(params); err != nil { + return fmt.Errorf("check %d: %w", seg.Offset, err) + } + return nil } } @@ -82,31 +85,33 @@ func RecoverDir(dir string, params index.Params) error { case len(segments) == 0: return nil default: - s := segments[len(segments)-1] - return s.Recover(params) + seg := segments[len(segments)-1] + if err := seg.Recover(params); err != nil { + return fmt.Errorf("recover %d: %w", seg.Offset, err) + } + return nil } } func BackupDir(dir, target string) error { - segments, err := Find(dir) - switch { + switch segments, err := Find(dir); { case errors.Is(err, os.ErrNotExist): return nil case err != nil: return err - } + default: + if err := os.MkdirAll(target, 0700); err != nil { + return fmt.Errorf("backup dir create: %w", err) + } - if err := os.MkdirAll(target, 0700); err != nil { - return kleverr.Newf("could not create backup dir: %w", err) + return Backup(segments, target) } - - return Backup(segments, target) } func Backup(segments []Segment, dir string) error { for _, seg := range segments { if err := seg.Backup(dir); err != nil { - return kleverr.Newf("could not backup segment %d: %w", seg.Offset, err) + return fmt.Errorf("backup %d: %w", seg.Offset, err) } } diff --git a/segment/utils.go b/segment/utils.go index 654e24a..d2dd92e 100644 --- a/segment/utils.go +++ b/segment/utils.go @@ -2,17 +2,17 @@ package segment import ( "crypto/rand" + "fmt" "io" "os" - "github.com/klev-dev/kleverr" "github.com/mr-tron/base58" ) func randStr(length int) (string, error) { k := make([]byte, length) if _, err := io.ReadFull(rand.Reader, k); err != nil { - return "", kleverr.Ret(err) + return "", fmt.Errorf("rand read: %w", err) } return base58.Encode(k), nil } @@ -20,20 +20,20 @@ func randStr(length int) (string, error) { func copyFile(src, dst string) error { fsrc, err := os.Open(src) if err != nil { - return kleverr.Newf("could not open src: %w", err) + return fmt.Errorf("copy src open: %w", err) } defer fsrc.Close() stat, err := fsrc.Stat() if err != nil { - return kleverr.Newf("could not stat src: %w", err) + return fmt.Errorf("copy src stat: %w", err) } fdst, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if os.IsExist(err) { switch dstStat, err := os.Stat(dst); { case err != nil: - return kleverr.Newf("could not stat dst: %w", err) + return fmt.Errorf("copy dst stat: %w", err) case stat.Size() == dstStat.Size() && stat.ModTime().Equal(dstStat.ModTime()): // TODO do we need a safer version of this? return nil @@ -41,25 +41,25 @@ func copyFile(src, dst string) error { fdst, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) } if err != nil { - return kleverr.Newf("could not open dst: %w", err) + return fmt.Errorf("copy dst open: %w", err) } defer fdst.Close() switch n, err := io.Copy(fdst, fsrc); { case err != nil: - return kleverr.Newf("could not copy: %w", err) + return fmt.Errorf("copy: %w", err) case n < stat.Size(): - return kleverr.Newf("could not copy all data (%d/%d)", n, stat.Size()) + return fmt.Errorf("partial copy (%d/%d)", n, stat.Size()) } if err := fdst.Sync(); err != nil { - return kleverr.Newf("could not sync dst: %w", err) + return fmt.Errorf("copy dst sync: %w", err) } if err := fdst.Close(); err != nil { - return kleverr.Newf("could not close dst: %w", err) + return fmt.Errorf("copy dst close: %w", err) } if err := os.Chtimes(dst, stat.ModTime(), stat.ModTime()); err != nil { - return kleverr.Newf("could not set dst time: %w", err) + return fmt.Errorf("copy dst chtimes: %w", err) } return nil diff --git a/typed.go b/typed.go index 8dd956b..8fb3607 100644 --- a/typed.go +++ b/typed.go @@ -13,37 +13,53 @@ type TMessage[K any, V any] struct { // TLog is a typed log type TLog[K any, V any] interface { + // Publish see Log.Publish Publish(messages []TMessage[K, V]) (nextOffset int64, err error) + // NextOffset see Log.NextOffset NextOffset() (nextOffset int64, err error) + // Consume see Log.Consume Consume(offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // ConsumeByKey see Log.ConsumeByKey ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // Get see Log.Get Get(offset int64) (message TMessage[K, V], err error) + // GetByKey see Log.GetByKey GetByKey(key K, empty bool) (message TMessage[K, V], err error) + // GetByTime see Log.GetByTime GetByTime(start time.Time) (message TMessage[K, V], err error) + // Delete see Log.Delete Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) + // Size see Log.Size Size(m Message) int64 + // Stat see Log.Stat Stat() (Stats, error) + // Backup see Log.Backup Backup(dir string) error + // Sync see Log.Sync Sync() (nextOffset int64, err error) + // GC see Log.GC GC(unusedFor time.Duration) error + // Close see Log.Close Close() error + // Raw returns the wrapped in log Raw() Log } +// OpenT opens a typed log with specified key/value codecs func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error) { l, err := Open(dir, opts) if err != nil { @@ -52,6 +68,7 @@ func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec return &tlog[K, V]{l, keyCodec, valueCodec}, nil } +// WrapT wraps a log with specified key/value codecs func WrapT[K any, V any](l Log, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error) { return &tlog[K, V]{l, keyCodec, valueCodec}, nil } diff --git a/typed_blocking.go b/typed_blocking.go index a97e56e..17b9c51 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -2,14 +2,18 @@ package klevdb import "context" +// TBlockingLog enhances tlog adding blocking consume type TBlockingLog[K any, V any] interface { TLog[K, V] + // ConsumeBlocking see BlockingLog.ConsumeBlocking ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // ConsumeByKeyBlocking see BlockingLog.ConsumeByKeyBlocking ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) } +// OpenBlocking opens tlog and wraps it with support for blocking consume func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) { l, err := OpenT(dir, opts, keyCodec, valueCodec) if err != nil { @@ -18,6 +22,7 @@ func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], va return WrapTBlocking(l) } +// WrapBlocking wraps tlog with support for blocking consume func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) { next, err := l.NextOffset() if err != nil { @@ -33,6 +38,10 @@ type tlogBlocking[K any, V any] struct { func (l *tlogBlocking[K, V]) Publish(tmessages []TMessage[K, V]) (int64, error) { nextOffset, err := l.TLog.Publish(tmessages) + if err != nil { + return OffsetInvalid, err + } + l.notify.Set(nextOffset) return nextOffset, err } diff --git a/typed_codec.go b/typed_codec.go index 30ebf2a..cfa4e5d 100644 --- a/typed_codec.go +++ b/typed_codec.go @@ -3,15 +3,16 @@ package klevdb import ( "encoding/binary" "encoding/json" - - "github.com/klev-dev/kleverr" + "errors" ) +// Codec is interface satisfied by all codecs type Codec[T any] interface { Encode(t T, empty bool) (b []byte, err error) Decode(b []byte) (t T, empty bool, err error) } +// JsonCodec supports coding values as a JSON type JsonCodec[T any] struct{} func (c JsonCodec[T]) Encode(t T, empty bool) ([]byte, error) { @@ -48,6 +49,7 @@ func (c stringOptCodec) Decode(b []byte) (string, bool, error) { return s, false, err } +// StringOptCodec supports coding an optional string, e.g. differantiates between "" and nil strings var StringOptCodec = stringOptCodec{} type stringCodec struct{} @@ -60,6 +62,7 @@ func (c stringCodec) Decode(b []byte) (string, bool, error) { return string(b), false, nil } +// StringCodec supports coding a string var StringCodec = stringCodec{} type varintCodec struct{} @@ -71,15 +74,23 @@ func (c varintCodec) Encode(t int64, empty bool) ([]byte, error) { return binary.AppendVarint(nil, t), nil } +var errShortBuffer = errors.New("varint: short buffer") +var errOverflow = errors.New("varint: overflow") + func (c varintCodec) Decode(b []byte) (int64, bool, error) { if b == nil { return 0, true, nil } t, n := binary.Varint(b) - if n <= 0 { - return 0, true, kleverr.Newf("invalid varint: %d", n) + switch { + case n == 0: + return 0, true, errShortBuffer + case n < 0: + return 0, true, errOverflow + default: + return t, false, nil } - return t, false, nil } +// VarintCodec supports coding integers as varint var VarintCodec = varintCodec{} diff --git a/writer.go b/writer.go index 39d718e..eda6765 100644 --- a/writer.go +++ b/writer.go @@ -11,7 +11,6 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) type writer struct { @@ -113,7 +112,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { if err := rs.Segment.Remove(); err != nil { return nil, nil, err } - return nil, nil, kleverr.Newf("delete failed: %w", errSegmentChanged) + return nil, nil, errSegmentChanged } if err := w.Close(); err != nil {