From 3a872bcd45b99a2c63202315ec5b51213b807801 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Wed, 12 Mar 2025 08:44:43 -0400 Subject: [PATCH 1/3] use time.now less in hot paths --- log.go | 6 ++++-- reader.go | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/log.go b/log.go index b352b32..fefe369 100644 --- a/log.go +++ b/log.go @@ -259,6 +259,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { } hash := index.KeyHashEncoded(index.KeyHash(key)) + tctx := time.Now().UnixMicro() l.readersMu.RLock() defer l.readersMu.RUnlock() @@ -266,7 +267,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { for i := len(l.readers) - 1; i >= 0; i-- { rdr := l.readers[i] - switch msg, err := rdr.GetByKey(key, hash); { + switch msg, err := rdr.GetByKey(key, hash, tctx); { case err == nil: return msg, nil case err == index.ErrKeyNotFound: @@ -294,6 +295,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { } ts := start.UnixMicro() + tctx := time.Now().UnixMicro() l.readersMu.RLock() defer l.readersMu.RUnlock() @@ -301,7 +303,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { for i := len(l.readers) - 1; i >= 0; i-- { rdr := l.readers[i] - switch msg, err := rdr.GetByTime(ts); { + switch msg, err := rdr.GetByTime(ts, tctx); { case err == nil: return msg, nil case err == index.ErrTimeBeforeStart: diff --git a/reader.go b/reader.go index 5d94c3f..cdba6ef 100644 --- a/reader.go +++ b/reader.go @@ -75,7 +75,7 @@ func (r *reader) GetOffset() int64 { } func (r *reader) GetNextOffset() (int64, error) { - index, err := r.getIndex() + index, err := r.getIndex(time.Now().UnixMicro()) if err != nil { return 0, err } @@ -83,7 +83,7 @@ func (r *reader) GetNextOffset() (int64, error) { } func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, error) { - index, err := r.getIndex() + index, err := r.getIndex(time.Now().UnixMicro()) if err != nil { return OffsetInvalid, nil, err } @@ -118,7 +118,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro } func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) { - ix, err := r.getIndex() + ix, err := r.getIndex(time.Now().UnixMicro()) if err != nil { return OffsetInvalid, nil, err } @@ -180,7 +180,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 } func (r *reader) Get(offset int64) (message.Message, error) { - index, err := r.getIndex() + index, err := r.getIndex(time.Now().UnixMicro()) if err != nil { return message.Invalid, err } @@ -199,8 +199,8 @@ func (r *reader) Get(offset int64) (message.Message, error) { return messages.Get(position) } -func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { - ix, err := r.getIndex() +func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, error) { + ix, err := r.getIndex(tctx) if err != nil { return message.Invalid, err } @@ -229,8 +229,8 @@ func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { return message.Invalid, index.ErrKeyNotFound } -func (r *reader) GetByTime(ts int64) (message.Message, error) { - index, err := r.getIndex() +func (r *reader) GetByTime(ts int64, tctx int64) (message.Message, error) { + index, err := r.getIndex(tctx) if err != nil { return message.Invalid, err } @@ -297,8 +297,8 @@ func (r *reader) Delete(rs *segment.RewriteSegment) (*reader, error) { return r, nil } -func (r *reader) getIndex() (indexer, error) { - atomic.StoreInt64(&r.indexLastAccess, time.Now().UnixMicro()) +func (r *reader) getIndex(tctx int64) (indexer, error) { + atomic.StoreInt64(&r.indexLastAccess, tctx) r.indexMu.RLock() if ix := r.index; ix != nil { From e690f6672eb0c1d11205dbe9261999f7b167a369 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Wed, 12 Mar 2025 18:33:00 -0400 Subject: [PATCH 2/3] use separate paths for get index --- reader.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/reader.go b/reader.go index cdba6ef..0775af2 100644 --- a/reader.go +++ b/reader.go @@ -75,7 +75,7 @@ func (r *reader) GetOffset() int64 { } func (r *reader) GetNextOffset() (int64, error) { - index, err := r.getIndex(time.Now().UnixMicro()) + index, err := r.getIndexNow() if err != nil { return 0, err } @@ -83,7 +83,7 @@ func (r *reader) GetNextOffset() (int64, error) { } func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, error) { - index, err := r.getIndex(time.Now().UnixMicro()) + index, err := r.getIndexNow() if err != nil { return OffsetInvalid, nil, err } @@ -118,7 +118,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro } func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) { - ix, err := r.getIndex(time.Now().UnixMicro()) + ix, err := r.getIndexNow() if err != nil { return OffsetInvalid, nil, err } @@ -180,7 +180,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 } func (r *reader) Get(offset int64) (message.Message, error) { - index, err := r.getIndex(time.Now().UnixMicro()) + index, err := r.getIndexNow() if err != nil { return message.Invalid, err } @@ -200,7 +200,7 @@ func (r *reader) Get(offset int64) (message.Message, error) { } func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, error) { - ix, err := r.getIndex(tctx) + ix, err := r.getIndexAt(tctx) if err != nil { return message.Invalid, err } @@ -230,7 +230,7 @@ func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, err } func (r *reader) GetByTime(ts int64, tctx int64) (message.Message, error) { - index, err := r.getIndex(tctx) + index, err := r.getIndexAt(tctx) if err != nil { return message.Invalid, err } @@ -297,9 +297,17 @@ func (r *reader) Delete(rs *segment.RewriteSegment) (*reader, error) { return r, nil } -func (r *reader) getIndex(tctx int64) (indexer, error) { +func (r *reader) getIndexAt(tctx int64) (indexer, error) { atomic.StoreInt64(&r.indexLastAccess, tctx) + return r.getIndexMarked() +} + +func (r *reader) getIndexNow() (indexer, error) { + atomic.StoreInt64(&r.indexLastAccess, time.Now().UnixMicro()) + return r.getIndexMarked() +} +func (r *reader) getIndexMarked() (indexer, error) { r.indexMu.RLock() if ix := r.index; ix != nil { defer r.indexMu.RUnlock() From 8280b44edd08e3329e76496920e49929de52c385 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Wed, 12 Mar 2025 18:42:22 -0400 Subject: [PATCH 3/3] use atomic; reorder methods --- reader.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/reader.go b/reader.go index 0775af2..c6c1418 100644 --- a/reader.go +++ b/reader.go @@ -20,11 +20,11 @@ type reader struct { messages *message.Reader messagesMu sync.RWMutex - messagesInuse int64 + messagesInuse atomic.Int64 index indexer indexMu sync.RWMutex - indexLastAccess int64 + indexLastAccess atomic.Int64 } type indexer interface { @@ -108,7 +108,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro if err != nil { return OffsetInvalid, nil, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) msgs, err := messages.Consume(position, maxPosition, maxCount) if err != nil { @@ -149,7 +149,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 if err != nil { return OffsetInvalid, nil, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) var msgs []message.Message for i := 0; i < len(positions); i++ { @@ -194,7 +194,7 @@ func (r *reader) Get(offset int64) (message.Message, error) { if err != nil { return message.Invalid, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) return messages.Get(position) } @@ -214,7 +214,7 @@ func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, err if err != nil { return message.Invalid, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) for i := len(positions) - 1; i >= 0; i-- { msg, err := messages.Get(positions[i]) @@ -244,7 +244,7 @@ func (r *reader) GetByTime(ts int64, tctx int64) (message.Message, error) { if err != nil { return message.Invalid, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) return messages.Get(position) } @@ -297,13 +297,13 @@ func (r *reader) Delete(rs *segment.RewriteSegment) (*reader, error) { return r, nil } -func (r *reader) getIndexAt(tctx int64) (indexer, error) { - atomic.StoreInt64(&r.indexLastAccess, tctx) +func (r *reader) getIndexNow() (indexer, error) { + r.indexLastAccess.Store(time.Now().UnixMicro()) return r.getIndexMarked() } -func (r *reader) getIndexNow() (indexer, error) { - atomic.StoreInt64(&r.indexLastAccess, time.Now().UnixMicro()) +func (r *reader) getIndexAt(tctx int64) (indexer, error) { + r.indexLastAccess.Store(tctx) return r.getIndexMarked() } @@ -334,7 +334,7 @@ func (r *reader) getIndexMarked() (indexer, error) { func (r *reader) getMessages() (*message.Reader, error) { r.messagesMu.RLock() if msgs := r.messages; msgs != nil { - atomic.AddInt64(&r.messagesInuse, 1) + r.messagesInuse.Add(1) r.messagesMu.RUnlock() return msgs, nil } @@ -344,7 +344,7 @@ func (r *reader) getMessages() (*message.Reader, error) { defer r.messagesMu.Unlock() if msgs := r.messages; msgs != nil { - atomic.AddInt64(&r.messagesInuse, 1) + r.messagesInuse.Add(1) return msgs, nil } @@ -354,7 +354,7 @@ func (r *reader) getMessages() (*message.Reader, error) { } r.messages = msgs - atomic.AddInt64(&r.messagesInuse, 1) + r.messagesInuse.Add(1) return msgs, nil } @@ -371,7 +371,7 @@ func (r *reader) GC(unusedFor time.Duration) error { return nil } - indexLastAccess := time.UnixMicro(atomic.LoadInt64(&r.indexLastAccess)) + indexLastAccess := time.UnixMicro(r.indexLastAccess.Load()) if time.Since(indexLastAccess) < unusedFor { // only unload segments unused for defined time return nil @@ -382,7 +382,7 @@ func (r *reader) GC(unusedFor time.Duration) error { r.messagesMu.Lock() defer r.messagesMu.Unlock() - if r.messages == nil || atomic.LoadInt64(&r.messagesInuse) > 0 { + if r.messages == nil || r.messagesInuse.Load() > 0 { return nil }