From 82b90085ddc07a253bd7142826642e619262e175 Mon Sep 17 00:00:00 2001 From: Akis Kesoglou Date: Wed, 15 Nov 2017 13:07:45 +0200 Subject: [PATCH 1/4] Add methods to efficiently read and write a single byte This reduces allocations and greatly improves performance when circbuf is used in tight loops that work with single byte streams. Signed-off-by: Akis Kesoglou --- circbuf.go | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/circbuf.go b/circbuf.go index de3cb94..9dd8eb9 100644 --- a/circbuf.go +++ b/circbuf.go @@ -10,6 +10,7 @@ import ( // are retained. type Buffer struct { data []byte + out []byte size int64 writeCursor int64 written int64 @@ -25,6 +26,7 @@ func NewBuffer(size int64) (*Buffer, error) { b := &Buffer{ size: size, data: make([]byte, size), + out: make([]byte, size), } return b, nil } @@ -54,6 +56,12 @@ func (b *Buffer) Write(buf []byte) (int, error) { return n, nil } +func (b *Buffer) WriteByte(c byte) { + b.data[b.writeCursor] = c + b.writeCursor = ((b.writeCursor + 1) % b.size) + b.written += 1 +} + // Size returns the size of the buffer func (b *Buffer) Size() int64 { return b.size @@ -65,21 +73,34 @@ func (b *Buffer) TotalWritten() int64 { } // Bytes provides a slice of the bytes written. This -// slice should not be written to. +// slice should not be written to. The underlying array +// may point to data that will be overwritten by a subsequent +// call to Bytes. It does no allocation. func (b *Buffer) Bytes() []byte { switch { case b.written >= b.size && b.writeCursor == 0: return b.data case b.written > b.size: - out := make([]byte, b.size) - copy(out, b.data[b.writeCursor:]) - copy(out[b.size-b.writeCursor:], b.data[:b.writeCursor]) - return out + copy(b.out, b.data[b.writeCursor:]) + copy(b.out[b.size-b.writeCursor:], b.data[:b.writeCursor]) + return b.out default: return b.data[:b.writeCursor] } } +// Gets a single byte out of the buffer, at the given position. +func (b *Buffer) Get(i int64) (byte, error) { + switch { + case i >= b.written || i >= b.size: + return 0, fmt.Errorf("Index out of bounds: %v", i) + case b.written > b.size: + return b.data[(b.writeCursor + i) % b.size], nil + default: + return b.data[i], nil + } +} + // Reset resets the buffer so it has no content. func (b *Buffer) Reset() { b.writeCursor = 0 From 074857fc0cfa6b58cebfcb1e514c798166a978f3 Mon Sep 17 00:00:00 2001 From: Akis Kesoglou Date: Tue, 21 Nov 2017 14:55:38 +0200 Subject: [PATCH 2/4] Fix linting and formatting issues Signed-off-by: Akis Kesoglou --- circbuf.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/circbuf.go b/circbuf.go index 9dd8eb9..381e096 100644 --- a/circbuf.go +++ b/circbuf.go @@ -26,7 +26,7 @@ func NewBuffer(size int64) (*Buffer, error) { b := &Buffer{ size: size, data: make([]byte, size), - out: make([]byte, size), + out: make([]byte, size), } return b, nil } @@ -56,10 +56,12 @@ func (b *Buffer) Write(buf []byte) (int, error) { return n, nil } -func (b *Buffer) WriteByte(c byte) { +// WriteByte writes a single byte into the buffer. +func (b *Buffer) WriteByte(c byte) error { b.data[b.writeCursor] = c b.writeCursor = ((b.writeCursor + 1) % b.size) - b.written += 1 + b.written++ + return nil } // Size returns the size of the buffer @@ -89,13 +91,13 @@ func (b *Buffer) Bytes() []byte { } } -// Gets a single byte out of the buffer, at the given position. +// Get returns a single byte out of the buffer, at the given position. func (b *Buffer) Get(i int64) (byte, error) { switch { case i >= b.written || i >= b.size: return 0, fmt.Errorf("Index out of bounds: %v", i) case b.written > b.size: - return b.data[(b.writeCursor + i) % b.size], nil + return b.data[(b.writeCursor+i)%b.size], nil default: return b.data[i], nil } From 92de78e4bfe797aa7a4a027a42e644ddf321dd2f Mon Sep 17 00:00:00 2001 From: Akis Kesoglou Date: Tue, 21 Nov 2017 19:45:01 +0200 Subject: [PATCH 3/4] Add tests for single-byte read/write methods Signed-off-by: Akis Kesoglou --- circbuf_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/circbuf_test.go b/circbuf_test.go index a3ed642..eb22fcf 100644 --- a/circbuf_test.go +++ b/circbuf_test.go @@ -196,3 +196,46 @@ func TestBuffer_Reset(t *testing.T) { t.Fatalf("bad: %v", string(buf.Bytes())) } } + +func TestBuffer_WriteByte(t *testing.T) { + inp := []byte("hello world") + + buf, err := NewBuffer(3) + if err != nil { + t.Fatalf("err: %v", err) + } + + for _, b := range inp { + buf.WriteByte(b) + } + + expect := []byte("rld") + actual := buf.Bytes() + if !bytes.Equal(actual, expect) { + t.Fatalf("bad: %v", actual) + } +} + +func TestBuffer_ReadByte(t *testing.T) { + inp := []byte("hello world") + + buf, err := NewBuffer(int64(len(inp))) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, err := buf.Write([]byte("hell")); err != nil { + t.Fatalf("err: %v", err) + } + + if _, err := buf.Write(inp); err != nil { + t.Fatalf("err: %v", err) + } + + for i, expect := range inp { + actual, _ := buf.Get(int64(i)) + if expect != actual { + t.Fatalf("bad data at index: buf[%v] = %v", i, actual) + } + } +} From 6912bf9840a2600fd24bd36f96784ca4783a6484 Mon Sep 17 00:00:00 2001 From: Leandro Motta Barros Date: Mon, 17 May 2021 14:35:17 -0300 Subject: [PATCH 4/4] Add optimized path for power-of-two-sized buffers Some operations on the circular buffer make use of modulo arithmetic, which was implemented (not surprisingly!) with the modulo operator `%`. It turns out that the modulo operator is quite slow and therefore, on tight loops, a substantial portion of the run time may be spent on it. This commit adds a separate code path for buffers with power-of-two sizes. For these, we can implement modulo arithmetic substantially more efficiently using some bit manipulation. This is not a breaking change: the public interface is exactly the same as before. An optimized buffer is transparently created under the hood whenever the requested size is a power-of-two. The commit also adds benchmarks that perform various operations on buffers that are and that are not power-of-two-sized. --- LICENSE | 0 anybuf.go | 38 ++++++++++++ basebuf.go | 71 ++++++++++++++++++++++ benchmark_test.go | 148 ++++++++++++++++++++++++++++++++++++++++++++++ circbuf.go | 145 +++++++++++++++------------------------------ circbuf_test.go | 48 ++++++++++++--- po2buf.go | 36 +++++++++++ 7 files changed, 382 insertions(+), 104 deletions(-) mode change 100755 => 100644 LICENSE create mode 100644 anybuf.go create mode 100644 basebuf.go create mode 100644 benchmark_test.go create mode 100644 po2buf.go diff --git a/LICENSE b/LICENSE old mode 100755 new mode 100644 diff --git a/anybuf.go b/anybuf.go new file mode 100644 index 0000000..57e901c --- /dev/null +++ b/anybuf.go @@ -0,0 +1,38 @@ +package circbuf + +import ( + "fmt" +) + +// anyBuffer implements a circular buffer of any size. +type anyBuffer struct { + baseBuffer +} + +// Write writes up to len(buf) bytes to the internal ring, +// overriding older data if necessary. +func (b *anyBuffer) Write(buf []byte) (int, error) { + n := b.write(buf) + b.writeCursor = ((b.writeCursor + n) % b.size) + return len(buf), nil +} + +// WriteByte writes a single byte into the buffer. +func (b *anyBuffer) WriteByte(c byte) error { + b.data[b.writeCursor] = c + b.writeCursor = ((b.writeCursor + 1) % b.size) + b.written++ + return nil +} + +// Get returns a single byte out of the buffer, at the given position. +func (b *anyBuffer) Get(i int64) (byte, error) { + switch { + case i >= b.written || i >= b.size: + return 0, fmt.Errorf("Index out of bounds: %v", i) + case b.written > b.size: + return b.data[(b.writeCursor+i)%b.size], nil + default: + return b.data[i], nil + } +} diff --git a/basebuf.go b/basebuf.go new file mode 100644 index 0000000..a01e4d4 --- /dev/null +++ b/basebuf.go @@ -0,0 +1,71 @@ +package circbuf + +type baseBuffer struct { + data []byte + out []byte + size int64 + writeCursor int64 + written int64 +} + +// Size returns the size of the buffer +func (b *baseBuffer) Size() int64 { + return b.size +} + +// TotalWritten provides the total number of bytes written +func (b *baseBuffer) TotalWritten() int64 { + return b.written +} + +// Bytes provides a slice of the bytes written. This +// slice should not be written to. The underlying array +// may point to data that will be overwritten by a subsequent +// call to Bytes. It does no allocation. +func (b *baseBuffer) Bytes() []byte { + switch { + case b.written >= b.size && b.writeCursor == 0: + return b.data + case b.written > b.size: + copy(b.out, b.data[b.writeCursor:]) + copy(b.out[b.size-b.writeCursor:], b.data[:b.writeCursor]) + return b.out + default: + return b.data[:b.writeCursor] + } +} + +// Reset resets the buffer so it has no content. +func (b *baseBuffer) Reset() { + b.writeCursor = 0 + b.written = 0 +} + +// String returns the contents of the buffer as a string +func (b *baseBuffer) String() string { + return string(b.Bytes()) +} + +// write writes len(buf) bytes to the circular buffer and returns by how much +// the writeCursor must be incremented. (This function does not increment the +// writeCursor!) +func (b *baseBuffer) write(buf []byte) int64 { + // Account for total bytes written + n := len(buf) + b.written += int64(n) + + // If the buffer is larger than ours, then we only care + // about the last size bytes anyways + if int64(n) > b.size { + buf = buf[int64(n)-b.size:] + } + + // Copy in place + remain := b.size - b.writeCursor + copy(b.data[b.writeCursor:], buf) + if int64(len(buf)) > remain { + copy(b.data, buf[remain:]) + } + + return int64(len(buf)) +} diff --git a/benchmark_test.go b/benchmark_test.go new file mode 100644 index 0000000..8064096 --- /dev/null +++ b/benchmark_test.go @@ -0,0 +1,148 @@ +package circbuf + +import ( + "testing" +) + +// k is the number of repetitions per operation. +const k = 1000 + +// benchmarkWrite benchmarks Write, by writing writeSize to a Buffer that is +// bufSize bytes long. +func benchmarkWrite(b *testing.B, bufSize, writeSize int64) { + data := make([]byte, writeSize) + buf, err := NewBuffer(bufSize) + if err != nil { + b.Fatalf("creating buffer of size %v: %v", bufSize, err) + } + b.SetBytes(k * writeSize) + + for i := 0; i < b.N; i++ { + for j := 0; j < k; j++ { + _, _ = buf.Write(data) + } + } +} + +func Benchmark_Write_1024_500(b *testing.B) { + benchmarkWrite(b, 1024, 500) +} + +func Benchmark_Write_1025_500(b *testing.B) { + benchmarkWrite(b, 1025, 500) +} + +func Benchmark_Write_1024_5000(b *testing.B) { + benchmarkWrite(b, 1024, 5000) +} + +func Benchmark_Write_1025_5000(b *testing.B) { + benchmarkWrite(b, 1025, 5000) +} + +func Benchmark_Write_65536_5000(b *testing.B) { + benchmarkWrite(b, 65536, 5000) +} + +func Benchmark_Write_65537_5000(b *testing.B) { + benchmarkWrite(b, 65537, 5000) +} + +func Benchmark_Write_1024_5(b *testing.B) { + benchmarkWrite(b, 1024, 5) +} + +func Benchmark_Write_1025_5(b *testing.B) { + benchmarkWrite(b, 1025, 5) +} + +// benchmarkWriteByte benchmarks WriteByte, on a Buffer that is bufSize bytes +// long. +func benchmarkWriteByte(b *testing.B, bufSize int64) { + buf, err := NewBuffer(bufSize) + if err != nil { + b.Fatalf("creating buffer of size %v: %v", bufSize, err) + } + b.SetBytes(k) + + for i := 0; i < b.N; i++ { + for j := 0; j < k; j++ { + _ = buf.WriteByte(0xba) + } + } +} + +func Benchmark_WriteByte_1024(b *testing.B) { + benchmarkWriteByte(b, 1024) +} + +func Benchmark_WriteByte_1025(b *testing.B) { + benchmarkWriteByte(b, 1025) +} + +func Benchmark_WriteByte_65536(b *testing.B) { + benchmarkWriteByte(b, 65536) +} + +func Benchmark_WriteByte_65537(b *testing.B) { + benchmarkWriteByte(b, 65537) +} + +// benchmarkGet benchmarks Get with a buffer that is bufSize bytes long and was +// written to proportionally to fillRatio (so, for example, fillRatio == 0.5 +// means write to half of the buffer, by fillRatio == 1 means write to every +// byte of the buffer, and fillRatio == 2 means write twice as much data as it +// fits in the buffer). +func benchmarkGet(b *testing.B, bufSize int64, fillRatio float64) { + buf, err := NewBuffer(bufSize) + if err != nil { + b.Fatalf("creating buffer of size %v: %v", bufSize, err) + } + + writeSize := int64(float64(bufSize) * fillRatio) + data := make([]byte, writeSize) + _, err = buf.Write(data) + if err != nil { + b.Fatalf("writing data to buffer: %v", err) + } + + readLimit := bufSize + if bufSize > writeSize { + readLimit = writeSize + } + + b.SetBytes(k) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for j := 0; j < k; j++ { + for h := int64(0); h < readLimit; h++ { + _, _ = buf.Get(h) + } + } + } +} + +func Benchmark_Get_HalfFull_1024(b *testing.B) { + benchmarkGet(b, 1024, 0.5) +} + +func Benchmark_Get_HalfFull_1025(b *testing.B) { + benchmarkGet(b, 1025, 0.5) +} + +func Benchmark_Get_Full_1024(b *testing.B) { + benchmarkGet(b, 1024, 1.0) +} + +func Benchmark_Get_Full_1025(b *testing.B) { + benchmarkGet(b, 1025, 1.0) +} + +func Benchmark_Get_TwiceFull_1024(b *testing.B) { + benchmarkGet(b, 1024, 2.0) +} + +func Benchmark_Get_TwiceFull_1025(b *testing.B) { + benchmarkGet(b, 1025, 2.0) +} diff --git a/circbuf.go b/circbuf.go index 381e096..251e911 100644 --- a/circbuf.go +++ b/circbuf.go @@ -1,115 +1,66 @@ package circbuf -import ( - "fmt" -) +import "fmt" -// Buffer implements a circular buffer. It is a fixed size, -// and new writes overwrite older data, such that for a buffer -// of size N, for any amount of writes, only the last N bytes -// are retained. -type Buffer struct { - data []byte - out []byte - size int64 - writeCursor int64 - written int64 -} +// Buffer is a circular buffer. It has a fixed size, and new writes overwrite +// older data, such that for a buffer of size N, for any amount of writes, only +// the last N bytes are retained. +type Buffer interface { + // Write writes up to len(buf) bytes to the internal ring, overriding older + // data if necessary. Returns the number of bytes written and any occasional + // error. + Write(buf []byte) (int, error) -// NewBuffer creates a new buffer of a given size. The size -// must be greater than 0. -func NewBuffer(size int64) (*Buffer, error) { - if size <= 0 { - return nil, fmt.Errorf("Size must be positive") - } + // WriteByte writes a single byte into the buffer. + WriteByte(c byte) error - b := &Buffer{ - size: size, - data: make([]byte, size), - out: make([]byte, size), - } - return b, nil -} + // Size returns the size of the buffer + Size() int64 -// Write writes up to len(buf) bytes to the internal ring, -// overriding older data if necessary. -func (b *Buffer) Write(buf []byte) (int, error) { - // Account for total bytes written - n := len(buf) - b.written += int64(n) + // TotalWritten provides the total number of bytes written. + TotalWritten() int64 - // If the buffer is larger than ours, then we only care - // about the last size bytes anyways - if int64(n) > b.size { - buf = buf[int64(n)-b.size:] - } + // Bytes provides a slice of the bytes written. This + // slice should not be written to. The underlying array + // may point to data that will be overwritten by a subsequent + // call to Bytes. It shall do no allocation. + Bytes() []byte - // Copy in place - remain := b.size - b.writeCursor - copy(b.data[b.writeCursor:], buf) - if int64(len(buf)) > remain { - copy(b.data, buf[remain:]) - } + // Get returns a single byte out of the buffer, at the given position. + Get(i int64) (byte, error) - // Update location of the cursor - b.writeCursor = ((b.writeCursor + int64(len(buf))) % b.size) - return n, nil -} + // Reset resets the buffer so it has no content. + Reset() -// WriteByte writes a single byte into the buffer. -func (b *Buffer) WriteByte(c byte) error { - b.data[b.writeCursor] = c - b.writeCursor = ((b.writeCursor + 1) % b.size) - b.written++ - return nil + // String returns the contents of the buffer as a string. + String() string } -// Size returns the size of the buffer -func (b *Buffer) Size() int64 { - return b.size -} - -// TotalWritten provides the total number of bytes written -func (b *Buffer) TotalWritten() int64 { - return b.written -} - -// Bytes provides a slice of the bytes written. This -// slice should not be written to. The underlying array -// may point to data that will be overwritten by a subsequent -// call to Bytes. It does no allocation. -func (b *Buffer) Bytes() []byte { - switch { - case b.written >= b.size && b.writeCursor == 0: - return b.data - case b.written > b.size: - copy(b.out, b.data[b.writeCursor:]) - copy(b.out[b.size-b.writeCursor:], b.data[:b.writeCursor]) - return b.out - default: - return b.data[:b.writeCursor] +// NewBuffer creates a new buffer of a given size. The size +// must be greater than 0. +func NewBuffer(size int64) (Buffer, error) { + if size <= 0 { + return nil, fmt.Errorf("Size must be positive") } -} -// Get returns a single byte out of the buffer, at the given position. -func (b *Buffer) Get(i int64) (byte, error) { - switch { - case i >= b.written || i >= b.size: - return 0, fmt.Errorf("Index out of bounds: %v", i) - case b.written > b.size: - return b.data[(b.writeCursor+i)%b.size], nil - default: - return b.data[i], nil + if (size & (size - 1)) == 0 { + b := &po2Buffer{ + baseBuffer{ + size: size, + data: make([]byte, size), + out: make([]byte, size), + }, + } + return b, nil } -} -// Reset resets the buffer so it has no content. -func (b *Buffer) Reset() { - b.writeCursor = 0 - b.written = 0 -} + b := &anyBuffer{ + baseBuffer{ + size: size, + data: make([]byte, size), + out: make([]byte, size), + }, + } + return b, nil -// String returns the contents of the buffer as a string -func (b *Buffer) String() string { - return string(b.Bytes()) } diff --git a/circbuf_test.go b/circbuf_test.go index eb22fcf..8cd17bd 100644 --- a/circbuf_test.go +++ b/circbuf_test.go @@ -7,7 +7,8 @@ import ( ) func TestBuffer_Impl(t *testing.T) { - var _ io.Writer = &Buffer{} + var _ io.Writer = &anyBuffer{} + var _ io.Writer = &po2Buffer{} } func TestBuffer_ShortWrite(t *testing.T) { @@ -197,10 +198,14 @@ func TestBuffer_Reset(t *testing.T) { } } -func TestBuffer_WriteByte(t *testing.T) { +func testBuffer_WriteByte(t *testing.T, wantPO2 bool) { inp := []byte("hello world") - buf, err := NewBuffer(3) + bufSize := int64(3) + if wantPO2 { + bufSize = 4 + } + buf, err := NewBuffer(bufSize) if err != nil { t.Fatalf("err: %v", err) } @@ -209,24 +214,45 @@ func TestBuffer_WriteByte(t *testing.T) { buf.WriteByte(b) } - expect := []byte("rld") + expect := []byte("rld") // 3 bytes + if wantPO2 { + expect = []byte("orld") // 4 bytes + } + actual := buf.Bytes() if !bytes.Equal(actual, expect) { t.Fatalf("bad: %v", actual) } } -func TestBuffer_ReadByte(t *testing.T) { - inp := []byte("hello world") +func TestBuffer_WriteByte_Any(t *testing.T) { + testBuffer_WriteByte(t, false) +} + +func TestBuffer_WriteByte_PO2(t *testing.T) { + testBuffer_WriteByte(t, true) +} + +func testBuffer_Get(t *testing.T, inp []byte) { + initialData := []byte("hell") + if len(inp) < len(initialData) { + t.Fatalf("input too short for this test case: %q", inp) + } buf, err := NewBuffer(int64(len(inp))) if err != nil { t.Fatalf("err: %v", err) } - if _, err := buf.Write([]byte("hell")); err != nil { + if _, err := buf.Write(initialData); err != nil { t.Fatalf("err: %v", err) } + for i, expect := range initialData { + actual, _ := buf.Get(int64(i)) + if expect != actual { + t.Fatalf("bad data at index: buf[%v] = %v", i, actual) + } + } if _, err := buf.Write(inp); err != nil { t.Fatalf("err: %v", err) @@ -239,3 +265,11 @@ func TestBuffer_ReadByte(t *testing.T) { } } } + +func TestBuffer_Get_Any(t *testing.T) { + testBuffer_Get(t, []byte("hello world")) // 11 bytes +} + +func TestBuffer_Get_PO2(t *testing.T) { + testBuffer_Get(t, []byte("hey, hello world")) // 16 bytes +} diff --git a/po2buf.go b/po2buf.go new file mode 100644 index 0000000..4bc2864 --- /dev/null +++ b/po2buf.go @@ -0,0 +1,36 @@ +package circbuf + +import "fmt" + +// po2Buffer implements a circular buffer with a size that is a power of two. +type po2Buffer struct { + baseBuffer +} + +// Write writes up to len(buf) bytes to the internal ring, +// overriding older data if necessary. +func (b *po2Buffer) Write(buf []byte) (int, error) { + n := b.write(buf) + b.writeCursor = ((b.writeCursor + n) & (b.size - 1)) + return len(buf), nil +} + +// WriteByte writes a single byte into the buffer. +func (b *po2Buffer) WriteByte(c byte) error { + b.data[b.writeCursor] = c + b.writeCursor = ((b.writeCursor + 1) & (b.size - 1)) + b.written++ + return nil +} + +// Get returns a single byte out of the buffer, at the given position. +func (b *po2Buffer) Get(i int64) (byte, error) { + switch { + case i >= b.written || i >= b.size: + return 0, fmt.Errorf("Index out of bounds: %v", i) + case b.written > b.size: + return b.data[(b.writeCursor+i)&(b.size-1)], nil + default: + return b.data[i], nil + } +}