From e2af8f41de2d2db11ac0521f2ebb60156e96985a Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Sun, 28 Dec 2025 22:05:32 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=87=86=E5=A4=87=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/test.yml | 2 +- LICENSE | 2 +- .../{performance_test.go => basic_test.go} | 0 _examples/purge.go | 58 ----- _examples/spin_lock.go | 33 --- config.go | 68 ----- config_test.go | 137 ---------- executor.go | 206 --------------- executor_test.go | 238 ------------------ go.mod | 2 +- limiter.go | 2 +- limiter_test.go | 2 +- option.go | 93 ++----- option_test.go | 136 ++-------- pkg/spinlock/spin_lock.go | 36 --- pkg/spinlock/spin_lock_test.go | 59 ----- random.go | 40 --- random_test.go | 55 ---- round_robin.go | 38 --- round_robin_test.go | 55 ---- worker.go | 88 ------- worker_test.go | 108 -------- 22 files changed, 48 insertions(+), 1410 deletions(-) rename _examples/{performance_test.go => basic_test.go} (100%) delete mode 100644 _examples/purge.go delete mode 100644 _examples/spin_lock.go delete mode 100644 config.go delete mode 100644 config_test.go delete mode 100644 executor.go delete mode 100644 executor_test.go delete mode 100644 pkg/spinlock/spin_lock.go delete mode 100644 pkg/spinlock/spin_lock_test.go delete mode 100644 random.go delete mode 100644 random_test.go delete mode 100644 round_robin.go delete mode 100644 round_robin_test.go delete mode 100644 worker.go delete mode 100644 worker_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index caced66..6c44d3d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - name: Setup uses: actions/setup-go@v4 with: - go-version: "1.23" + go-version: "1.25" - run: go version diff --git a/LICENSE b/LICENSE index e69ceec..4becfb5 100755 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2023 FishGoddess +Copyright (c) 2025 FishGoddess Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), diff --git a/_examples/performance_test.go b/_examples/basic_test.go similarity index 100% rename from _examples/performance_test.go rename to _examples/basic_test.go diff --git a/_examples/purge.go b/_examples/purge.go deleted file mode 100644 index 76d3e87..0000000 --- a/_examples/purge.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package main - -import ( - "fmt" - "math/rand" - "time" - - "github.com/FishGoddess/goes" -) - -func watchExecutorWorkers(executor *goes.Executor) { - for { - fmt.Printf("workers --> %d\n", executor.AvailableWorkers()) - time.Sleep(time.Second) - } -} - -func main() { - // Creates a purge-active executor with 16 workers. - // The purge task will be executed every purge interval and purge workers not in lifetime. - purgeInterval := time.Minute - workerLifetime := time.Minute - - executor := goes.NewExecutor(16, goes.WithPurgeActive(purgeInterval, workerLifetime)) - defer executor.Close() - - go watchExecutorWorkers(executor) - - // Submit some tasks. - for i := 0; i < 200; i++ { - no := i - - executor.Submit(func() { - r := 3000 + rand.Intn(1000) - fmt.Printf("task %d --> %d\n", no, r) - time.Sleep(time.Duration(r) * time.Millisecond) - }) - } - - time.Sleep(time.Minute + 5*time.Second) - fmt.Println("Wait! You will see the workers are decreasing and increasing...") - - for i := 0; i < 10; i++ { - no := i - - executor.Submit(func() { - r := 10000 + rand.Intn(1000) - fmt.Printf("task %d --> %d\n", no, r) - time.Sleep(time.Duration(r) * time.Millisecond) - }) - } - - time.Sleep(2 * time.Minute) -} diff --git a/_examples/spin_lock.go b/_examples/spin_lock.go deleted file mode 100644 index bb4bbb3..0000000 --- a/_examples/spin_lock.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package main - -import ( - "fmt" - "sync" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -func main() { - // It's an implementation of sync.Locker, so you can use it as a mutex. - spin := spinlock.New() - - var total int - var wg sync.WaitGroup - for i := 0; i < 10000; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - spin.Lock() - total++ - spin.Unlock() - }() - } - - wg.Wait() - fmt.Printf("total is %d\n", total) -} diff --git a/config.go b/config.go deleted file mode 100644 index 1750ebe..0000000 --- a/config.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "sync" - "time" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -type config struct { - workerNum int - workerQueueSize int - workerLifetime time.Duration - purgeInterval time.Duration - nowFunc func() time.Time - recoverFunc func(r any) - newLockerFunc func() sync.Locker - newSchedulerFunc func(workers ...*worker) scheduler -} - -func newDefaultConfig(workerNum int) *config { - return &config{ - workerNum: workerNum, - workerQueueSize: 64, - workerLifetime: 0, - purgeInterval: 0, - nowFunc: nil, - recoverFunc: nil, - newLockerFunc: nil, - newSchedulerFunc: nil, - } -} - -func (c *config) now() time.Time { - if c.nowFunc == nil { - return time.Now() - } - - return c.nowFunc() -} - -func (c *config) recoverable() bool { - return c.recoverFunc != nil -} - -func (c *config) recover(r any) { - c.recoverFunc(r) -} - -func (c *config) newLocker() sync.Locker { - if c.newLockerFunc == nil { - return spinlock.New() - } - - return c.newLockerFunc() -} - -func (c *config) newScheduler(workers ...*worker) scheduler { - if c.newSchedulerFunc == nil { - return newRoundRobinScheduler(workers...) - } - - return c.newSchedulerFunc(workers...) -} diff --git a/config_test.go b/config_test.go deleted file mode 100644 index 26ce362..0000000 --- a/config_test.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -// go test -v -cover -run=^TestNewDefaultConfig$ -func TestNewDefaultConfig(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - if conf.workerNum != workerNum { - t.Fatalf("conf.workerNum %d != workerNum %d", conf.workerNum, workerNum) - } -} - -// go test -v -cover -run=^TestConfigNow$ -func TestConfigNow(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - got := conf.now().Unix() - want := time.Now().Unix() - if got != want { - t.Fatalf("got %v != want %v", got, want) - } - - wantTime := time.Unix(123456789, 0) - conf.nowFunc = func() time.Time { - return wantTime - } - - got = conf.now().Unix() - want = wantTime.Unix() - if got != want { - t.Fatalf("got %v != want %v", got, want) - } -} - -// go test -v -cover -run=^TestConfigRecover$ -func TestConfigRecover(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - if conf.recoverable() { - t.Fatalf("conf.recoverable() is wrong") - } - - got := 0 - conf.recoverFunc = func(r any) { - got = r.(int) - } - - if !conf.recoverable() { - t.Fatalf("conf.recoverable() is wrong") - } - - want := 1 - conf.recover(want) - - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - defer func() { - if r := recover(); r == nil { - t.Fatal("conf.recover should panic") - } - }() - - conf.recoverFunc = nil - conf.recover(0) -} - -// go test -v -cover -run=^TestConfigNewLocker$ -func TestConfigNewLocker(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - got := conf.newLocker() - if _, ok := got.(*spinlock.Lock); !ok { - t.Fatalf("got %T is not *spinlock.Lock", got) - } - - want := &sync.Mutex{} - conf.newLockerFunc = func() sync.Locker { - return want - } - - got = conf.newLocker() - if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) { - t.Fatalf("got %p != want %p", got, want) - } -} - -// go test -v -cover -run=^TestConfigNewScheduler$ -func TestConfigNewScheduler(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - workers := make([]*worker, workerNum) - got := conf.newScheduler(workers...) - - if _, ok := got.(*roundRobinScheduler); !ok { - t.Fatalf("got %T is not *roundRobinScheduler", got) - } - - want := &roundRobinScheduler{} - conf.newSchedulerFunc = func(workers ...*worker) scheduler { - want.workers = workers - return want - } - - got = conf.newScheduler(workers...) - if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) { - t.Fatalf("got %p != want %p", got, want) - } - - scheduler, ok := got.(*roundRobinScheduler) - if !ok { - t.Fatalf("got %T is not *roundRobinScheduler", got) - } - - gotLen := len(scheduler.workers) - if gotLen != workerNum { - t.Fatalf("gotLen %d != workerNum %d", gotLen, workerNum) - } -} diff --git a/executor.go b/executor.go deleted file mode 100644 index bfbab7e..0000000 --- a/executor.go +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "errors" - "sync" - "time" -) - -var ( - ErrExecutorIsClosed = errors.New("goes: executor is closed") - ErrWorkerIsNil = errors.New("goes: worker is nil") -) - -// Task is a function can be executed by executor. -type Task = func() - -// Executor executes tasks concurrently using limited goroutines. -// You can specify the number of workers and the queue size of each worker. -type Executor struct { - conf *config - - workers []*worker - scheduler scheduler - closeCh chan struct{} - closed bool - - wg sync.WaitGroup - lock sync.Locker -} - -// NewExecutor creates a new executor with given worker number and options. -func NewExecutor(workerNum int, opts ...Option) *Executor { - conf := newDefaultConfig(workerNum) - for _, opt := range opts { - opt.applyTo(conf) - } - - if conf.workerNum <= 0 { - panic("goes: executor's worker num <= 0") - } - - if conf.workerQueueSize <= 0 { - panic("goes: worker's queue size <= 0") - } - - if conf.purgeInterval > 0 && conf.purgeInterval < time.Minute { - panic("goes: executor's purge interval < 1 minute") - } - - if conf.workerLifetime > 0 && conf.workerLifetime < time.Minute { - panic("goes: executor's worker lifetime < 1 minute") - } - - workers := make([]*worker, 0, conf.workerNum) - executor := &Executor{ - conf: conf, - workers: workers, - scheduler: conf.newScheduler(), - closeCh: make(chan struct{}, 1), - closed: false, - lock: conf.newLocker(), - } - - executor.spawnWorker() - executor.runPurgeTask() - return executor -} - -func (e *Executor) spawnWorker() *worker { - worker := newWorker(e) - - e.workers = append(e.workers, worker) - e.scheduler.Set(e.workers) - return worker -} - -func (e *Executor) purgeActive() bool { - return e.conf.purgeInterval > 0 && e.conf.workerLifetime > 0 -} - -func (e *Executor) purgeWorkers() { - e.lock.Lock() - defer e.lock.Unlock() - - now := e.conf.now() - purgeable := false - - isPurgeable := func(worker *worker) bool { - return worker.WaitingTasks() <= 0 && now.Sub(worker.AcceptTime()) >= e.conf.workerLifetime - } - - // Check if we need to purge workers. - for _, worker := range e.workers { - if isPurgeable(worker) { - purgeable = true - break - } - } - - if !purgeable { - return - } - - // Purge workers and we will keep one worker at least. - newWorkers := make([]*worker, 0, len(e.workers)) - for _, worker := range e.workers { - if len(newWorkers) <= 0 { - newWorkers = append(newWorkers, worker) - continue - } - - if isPurgeable(worker) { - worker.Done() - } else { - newWorkers = append(newWorkers, worker) - } - } - - e.workers = newWorkers - e.scheduler.Set(e.workers) -} - -func (e *Executor) runPurgeTask() { - if !e.purgeActive() { - return - } - - go func() { - ticker := time.NewTicker(e.conf.purgeInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - e.purgeWorkers() - case <-e.closeCh: - return - } - } - }() -} - -// AvailableWorkers returns the number of workers available. -func (e *Executor) AvailableWorkers() int { - e.lock.Lock() - defer e.lock.Unlock() - - return len(e.workers) -} - -// Submit submits a task to be handled by workers. -func (e *Executor) Submit(task Task) error { - e.lock.Lock() - - if e.closed { - e.lock.Unlock() - - return ErrExecutorIsClosed - } - - worker := e.scheduler.Get() - if worker == nil { - e.lock.Unlock() - - return ErrWorkerIsNil - } - - // 1. We don't need to create a new worker if we got a worker with no tasks. - // 2. The number of workers has reached the limit, so we can only use the worker we got. - if worker.WaitingTasks() > 0 && len(e.workers) < e.conf.workerNum { - worker = e.spawnWorker() - } - - if e.purgeActive() { - worker.SetAcceptTime(e.conf.now()) - } - - e.lock.Unlock() - worker.Accept(task) - return nil -} - -// Wait waits all tasks to be handled. -func (e *Executor) Wait() { - e.wg.Wait() -} - -// Close closes the executor after handling all tasks. -func (e *Executor) Close() { - e.lock.Lock() - defer e.lock.Unlock() - - for _, worker := range e.workers { - worker.Done() - } - - close(e.closeCh) - e.closed = true - e.workers = nil - e.scheduler.Set(nil) - e.Wait() -} diff --git a/executor_test.go b/executor_test.go deleted file mode 100644 index f3cbb50..0000000 --- a/executor_test.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "sync" - "testing" - "time" -) - -// go test -v -cover -run=^TestNewExecutor$ -func TestNewExecutor(t *testing.T) { - workerNum := 16 - - t.Run("ok", func(t *testing.T) { - defer func() { - if r := recover(); r != nil { - t.Fatalf("r should not panic") - } - }() - - executor := NewExecutor(workerNum, WithWorkerQueueSize(256), WithPurgeActive(time.Minute, time.Minute)) - defer executor.Close() - }) - - testCase := func(t *testing.T, workerNum int, opts ...Option) { - defer func() { - if r := recover(); r == nil { - t.Fatalf("r should panic") - } - }() - - executor := NewExecutor(workerNum, opts...) - defer executor.Close() - } - - t.Run("worker num", func(t *testing.T) { - testCase(t, 0) - }) - - t.Run("worker queue size", func(t *testing.T) { - testCase(t, workerNum, WithWorkerQueueSize(0)) - }) - - t.Run("purge task 1", func(t *testing.T) { - testCase(t, workerNum, WithPurgeActive(time.Millisecond, 0)) - }) - - t.Run("purge task 2", func(t *testing.T) { - testCase(t, workerNum, WithPurgeActive(0, time.Millisecond)) - }) - - t.Run("purge task 3", func(t *testing.T) { - testCase(t, workerNum, WithPurgeActive(time.Millisecond, time.Millisecond)) - }) -} - -// go test -v -cover -run=^TestExecutor$ -func TestExecutor(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - - var countMap = make(map[int64]int, 16) - var lock sync.Mutex - - totalCount := 10 * workerNum - for i := 0; i < totalCount; i++ { - executor.Submit(func() { - now := time.Now().UnixMilli() / 10 - - lock.Lock() - countMap[now] = countMap[now] + 1 - lock.Unlock() - - time.Sleep(10 * time.Millisecond) - }) - } - - executor.Close() - executor.Wait() - - gotTotalCount := 0 - for now, count := range countMap { - gotTotalCount = gotTotalCount + count - - if count != workerNum { - t.Fatalf("now %d: count %d != workerNum %d", now, count, workerNum) - } - } - - if gotTotalCount != totalCount { - t.Fatalf("gotTotalCount %d != totalCount %d", gotTotalCount, totalCount) - } -} - -// go test -v -cover -run=^TestExecutorError$ -func TestExecutorError(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - executor.Close() - - err := executor.Submit(func() {}) - if err != ErrExecutorIsClosed { - t.Fatalf("err %v != ErrExecutorIsClosed %v", err, ErrExecutorIsClosed) - } - - executor = NewExecutor(workerNum) - executor.scheduler = &roundRobinScheduler{} - - err = executor.Submit(func() {}) - if err != ErrWorkerIsNil { - t.Fatalf("err %v != ErrWorkerIsNil %v", err, ErrWorkerIsNil) - } -} - -// go test -v -cover -run=^TestExecutorAvailableWorkers$ -func TestExecutorAvailableWorkers(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - - if len(executor.workers) != 1 { - t.Fatalf("len(executor.workers) %d != 1", len(executor.workers)) - } - - got := executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - executor.workers = make([]*worker, workerNum) - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } -} - -// go test -v -cover -run=^TestExecutorSpawnWorker$ -func TestExecutorSpawnWorker(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - - got := executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - for range workerNum { - executor.Submit(func() {}) - time.Sleep(time.Millisecond) - } - - got = executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - for range workerNum * 2 { - executor.Submit(func() { - time.Sleep(time.Millisecond) - }) - } - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } -} - -// go test -v -cover -run=^TestExecutorDynamicScaling$ -func TestExecutorDynamicScaling(t *testing.T) { - testCase := func(t *testing.T, workerNum int) { - executor := NewExecutor(workerNum) - executor.conf.purgeInterval = time.Millisecond - executor.conf.workerLifetime = 2 * time.Millisecond - executor.runPurgeTask() - defer executor.Close() - - got := executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - for range workerNum * 2 { - executor.Submit(func() { - time.Sleep(time.Millisecond) - }) - } - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - time.Sleep(500 * time.Microsecond) - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - time.Sleep(5 * time.Millisecond) - - got = executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - for range workerNum * 2 { - executor.Submit(func() { - time.Sleep(time.Millisecond) - }) - } - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - time.Sleep(5 * time.Millisecond) - - got = executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - } - - workerNums := []int{1, 2, 4, 16, 64, 256, 1024} - for _, workerNum := range workerNums { - name := fmt.Sprintf("worker num %d", workerNum) - t.Run(name, func(t *testing.T) { - testCase(t, workerNum) - }) - } -} diff --git a/go.mod b/go.mod index bd0e63a..828f3cf 100755 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/FishGoddess/goes -go 1.23 +go 1.25 diff --git a/limiter.go b/limiter.go index 8eb3fb3..2be40dc 100644 --- a/limiter.go +++ b/limiter.go @@ -1,4 +1,4 @@ -// Copyright 2023 FishGoddess. All rights reserved. +// Copyright 2025 FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. diff --git a/limiter_test.go b/limiter_test.go index 3096605..a5f7c42 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 FishGoddess. All rights reserved. +// Copyright 2025 FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. diff --git a/option.go b/option.go index 2924716..f4fcb6d 100644 --- a/option.go +++ b/option.go @@ -5,95 +5,44 @@ package goes import ( - "sync" "time" - - "github.com/FishGoddess/goes/pkg/spinlock" ) -// Option is for setting config. -type Option func(conf *config) - -func (o Option) applyTo(conf *config) { - o(conf) -} - -// WithWorkerQueueSize sets the queue size of worker. -func WithWorkerQueueSize(size int) Option { - return func(conf *config) { - conf.workerQueueSize = size - } -} - -// WithPurgeActive sets the purge interval of executor and the lifetime of worker. -func WithPurgeActive(purgeInterval time.Duration, workerLifetime time.Duration) Option { - return func(conf *config) { - conf.purgeInterval = purgeInterval - conf.workerLifetime = workerLifetime - } -} - -// WithNowFunc sets the now function. -func WithNowFunc(nowFunc func() time.Time) Option { - return func(conf *config) { - conf.nowFunc = nowFunc - } -} - -// WithRecoverFunc sets the recover function. -func WithRecoverFunc(recoverFunc func(r any)) Option { - return func(conf *config) { - conf.recoverFunc = recoverFunc - } -} - -// WithNewLockerFunc sets the new locker function. -func WithNewLockerFunc(newLockerFunc func() sync.Locker) Option { - return func(conf *config) { - conf.newLockerFunc = newLockerFunc - } +type config struct { + workerNum int + workerQueueSize int + now func() time.Time + handlePanic func(r any) } -// WithSpinLock sets the new locker function returns spin lock. -func WithSpinLock() Option { - newLockerFunc := func() sync.Locker { - return spinlock.New() - } - - return func(conf *config) { - conf.newLockerFunc = newLockerFunc +func newConfig(workerNum int) *config { + return &config{ + workerNum: workerNum, + workerQueueSize: 256, + now: time.Now, + handlePanic: nil, } } -// WithSyncMutex sets the new locker function returns sync mutex. -func WithSyncMutex() Option { - newLockerFunc := func() sync.Locker { - return new(sync.Mutex) - } +type Option func(conf *config) +// WithWorkerQueueSize sets the queue size of worker. +func WithWorkerQueueSize(size int) Option { return func(conf *config) { - conf.newLockerFunc = newLockerFunc + conf.workerQueueSize = size } } -// WithRoundRobinScheduler sets the new scheduler function using round robin strategy. -func WithRoundRobinScheduler() Option { - newSchedulerFunc := func(workers ...*worker) scheduler { - return newRoundRobinScheduler(workers...) - } - +// WithNow sets the now function. +func WithNow(now func() time.Time) Option { return func(conf *config) { - conf.newSchedulerFunc = newSchedulerFunc + conf.now = now } } -// WithRandomScheduler sets the new scheduler function using random strategy. -func WithRandomScheduler() Option { - newSchedulerFunc := func(workers ...*worker) scheduler { - return newRandomScheduler(workers...) - } - +// WithHandlePanic sets the handle panic function. +func WithHandlePanic(handlePanic func(r any)) Option { return func(conf *config) { - conf.newSchedulerFunc = newSchedulerFunc + conf.handlePanic = handlePanic } } diff --git a/option_test.go b/option_test.go index de3c571..433d11c 100644 --- a/option_test.go +++ b/option_test.go @@ -6,138 +6,46 @@ package goes import ( "fmt" - "sync" "testing" "time" - - "github.com/FishGoddess/goes/pkg/spinlock" ) // go test -v -cover -run=^TestWithWorkerQueueSize$ func TestWithWorkerQueueSize(t *testing.T) { - workerNum := 16 - workerQueueSize := 256 - conf := newDefaultConfig(workerNum) + conf := &config{workerQueueSize: 0} + + workerQueueSize := 1024 WithWorkerQueueSize(workerQueueSize)(conf) if conf.workerQueueSize != workerQueueSize { - t.Fatalf("conf.workerQueueSize %d != workerQueueSize %d", conf.workerQueueSize, workerQueueSize) - } -} - -// go test -v -cover -run=^TestWithPurgeActive$ -func TestWithPurgeActive(t *testing.T) { - workerNum := 16 - purgeInterval := time.Minute - workerLifetime := 3 * time.Minute - conf := newDefaultConfig(workerNum) - WithPurgeActive(purgeInterval, workerLifetime)(conf) - - if conf.purgeInterval != purgeInterval { - t.Fatalf("conf.purgeInterval %d != purgeInterval %d", conf.purgeInterval, purgeInterval) - } - - if conf.workerLifetime != workerLifetime { - t.Fatalf("conf.workerLifetime %d != workerLifetime %d", conf.workerLifetime, workerLifetime) - } -} - -// go test -v -cover -run=^TestWithNowFunc$ -func TestWithNowFunc(t *testing.T) { - workerNum := 16 - nowFunc := func() time.Time { return time.Now() } - conf := newDefaultConfig(workerNum) - WithNowFunc(nowFunc)(conf) - - if fmt.Sprintf("%p", conf.nowFunc) != fmt.Sprintf("%p", nowFunc) { - t.Fatalf("conf.nowFunc %p != nowFunc %p", conf.nowFunc, nowFunc) - } -} - -// go test -v -cover -run=^TestWithRecoverFunc$ -func TestWithRecoverFunc(t *testing.T) { - workerNum := 16 - recoverFunc := func(r any) {} - conf := newDefaultConfig(workerNum) - WithRecoverFunc(recoverFunc)(conf) - - if fmt.Sprintf("%p", conf.recoverFunc) != fmt.Sprintf("%p", recoverFunc) { - t.Fatalf("conf.recoverFunc %p != recoverFunc %p", conf.recoverFunc, recoverFunc) + t.Fatalf("got %d != want %d", conf.workerQueueSize, workerQueueSize) } } -// go test -v -cover -run=^TestWithNewLockerFunc$ -func TestWithNewLockerFunc(t *testing.T) { - workerNum := 16 - newLockerFunc := func() sync.Locker { return nil } - conf := newDefaultConfig(workerNum) - WithNewLockerFunc(newLockerFunc)(conf) +// go test -v -cover -run=^TestWithNow$ +func TestWithNow(t *testing.T) { + conf := &config{now: nil} - if fmt.Sprintf("%p", conf.newLockerFunc) != fmt.Sprintf("%p", newLockerFunc) { - t.Fatalf("conf.newLockerFunc %p != newLockerFunc %p", conf.newLockerFunc, newLockerFunc) - } -} - -// go test -v -cover -run=^TestWithSpinLock$ -func TestWithSpinLock(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithSpinLock()(conf) - - got := conf.newLockerFunc() - if _, ok := got.(*spinlock.Lock); !ok { - t.Fatalf("got %T is not *spinlock.Lock", got) - } -} - -// go test -v -cover -run=^TestWithSyncMutex$ -func TestWithSyncMutex(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithSyncMutex()(conf) - - got := conf.newLockerFunc() - if _, ok := got.(*sync.Mutex); !ok { - t.Fatalf("got %T is not *sync.Mutex", got) - } -} - -// go test -v -cover -run=^TestWithRoundRobinScheduler$ -func TestWithRoundRobinScheduler(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithRoundRobinScheduler()(conf) - - workers := make([]*worker, 0, workerNum) - got := conf.newSchedulerFunc(workers...) + now := func() time.Time { return time.Now() } + WithNow(now)(conf) - scheduler, ok := got.(*roundRobinScheduler) - if !ok { - t.Fatalf("got %T is not *roundRobinScheduler", got) - } - - gotCap := cap(scheduler.workers) - if gotCap != workerNum { - t.Fatalf("gotCap %d != workerNum %d", gotCap, workerNum) + got := fmt.Sprintf("%p", conf.now) + want := fmt.Sprintf("%p", now) + if got != want { + t.Fatalf("got %s != want %s", got, want) } } -// go test -v -cover -run=^TestWithRandomScheduler$ -func TestWithRandomScheduler(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithRandomScheduler()(conf) +// go test -v -cover -run=^TestWithHandlePanic$ +func TestWithHandlePanic(t *testing.T) { + conf := &config{handlePanic: nil} - workers := make([]*worker, 0, workerNum) - got := conf.newSchedulerFunc(workers...) - - scheduler, ok := got.(*randomScheduler) - if !ok { - t.Fatalf("got %T is not *randomScheduler", got) - } + handlePanic := func(r any) {} + WithHandlePanic(handlePanic)(conf) - gotCap := cap(scheduler.workers) - if gotCap != workerNum { - t.Fatalf("gotCap %d != workerNum %d", gotCap, workerNum) + got := fmt.Sprintf("%p", conf.handlePanic) + want := fmt.Sprintf("%p", handlePanic) + if got != want { + t.Fatalf("got %s != want %s", got, want) } } diff --git a/pkg/spinlock/spin_lock.go b/pkg/spinlock/spin_lock.go deleted file mode 100644 index 26ca2bb..0000000 --- a/pkg/spinlock/spin_lock.go +++ /dev/null @@ -1,36 +0,0 @@ -package spinlock - -import ( - "runtime" - "sync" - "sync/atomic" -) - -const maxBackoff = 16 - -type Lock uint32 - -// New creates a new spin lock. -func New() sync.Locker { - return new(Lock) -} - -// Lock locks with the spin lock. -func (l *Lock) Lock() { - backoff := 1 - - for !atomic.CompareAndSwapUint32((*uint32)(l), 0, 1) { - for i := 0; i < backoff; i++ { - runtime.Gosched() - } - - if backoff < maxBackoff { - backoff <<= 1 - } - } -} - -// Unlock unlocks with the spin lock. -func (l *Lock) Unlock() { - atomic.StoreUint32((*uint32)(l), 0) -} diff --git a/pkg/spinlock/spin_lock_test.go b/pkg/spinlock/spin_lock_test.go deleted file mode 100644 index 0a4eb4d..0000000 --- a/pkg/spinlock/spin_lock_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package spinlock - -import ( - "sync" - "testing" - "time" -) - -// go test -v -cover -run=^TestSpinLock$ -func TestSpinLock(t *testing.T) { - lock := New() - if _, ok := lock.(*Lock); !ok { - t.Fatalf("lock %T is not *Lock", lock) - } - - got := 0 - want := 10000 - - var wg sync.WaitGroup - for range want { - wg.Add(1) - go func() { - defer wg.Done() - - lock.Lock() - got++ - lock.Unlock() - }() - } - - wg.Wait() - if got != want { - t.Fatalf("got %d != want %d", got, want) - } -} - -func benchmarkLock(b *testing.B, lock sync.Locker) { - b.SetParallelism(1024) - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - lock.Lock() - time.Sleep(50 * time.Microsecond) - lock.Unlock() - } - }) -} - -// go test -v -run=none -bench=^BenchmarkMutex$ -benchmem -benchtime=1s -func BenchmarkMutex(b *testing.B) { - var mu sync.Mutex - benchmarkLock(b, &mu) -} - -// go test -v -run=none -bench=^BenchmarkSpinLock$ -benchmem -benchtime=1s -func BenchmarkSpinLock(b *testing.B) { - var spin Lock - benchmarkLock(b, &spin) -} diff --git a/random.go b/random.go deleted file mode 100644 index ae8ec3f..0000000 --- a/random.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "math/rand" - "time" -) - -type randomScheduler struct { - workers []*worker - random *rand.Rand -} - -func newRandomScheduler(workers ...*worker) *randomScheduler { - scheduler := &randomScheduler{ - workers: workers, - random: rand.New(rand.NewSource(time.Now().Unix())), - } - - return scheduler -} - -// Set sets the workers to scheduler. -func (rs *randomScheduler) Set(workers []*worker) { - rs.workers = workers -} - -// Get gets a worker from scheduler. -func (rs *randomScheduler) Get() *worker { - if len(rs.workers) <= 0 { - return nil - } - - index := rs.random.Intn(len(rs.workers)) - worker := rs.workers[index] - return worker -} diff --git a/random_test.go b/random_test.go deleted file mode 100644 index 56f2fe0..0000000 --- a/random_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "testing" -) - -// go test -v -cover -run=^TestRandomScheduler$ -func TestRandomScheduler(t *testing.T) { - workerNum := 16 - workers := make([]*worker, 0, workerNum) - for range workerNum { - workers = append(workers, new(worker)) - } - - scheduler := newRandomScheduler(workers...) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got := len(scheduler.workers) - want := len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - scheduler.Set(workers) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got = len(scheduler.workers) - want = len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - for i, worker := range workers { - got := scheduler.workers[i] - if got != worker { - t.Fatalf("got %p != worker %p", got, worker) - } - } - - for range workers { - gotNext := scheduler.Get() - if gotNext == nil { - t.Fatal("gotNext is nil") - } - } -} diff --git a/round_robin.go b/round_robin.go deleted file mode 100644 index 333727c..0000000 --- a/round_robin.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -type roundRobinScheduler struct { - workers []*worker - index int -} - -func newRoundRobinScheduler(workers ...*worker) *roundRobinScheduler { - scheduler := &roundRobinScheduler{ - workers: workers, - index: -1, - } - - return scheduler -} - -// Set sets the workers to scheduler. -func (rrs *roundRobinScheduler) Set(workers []*worker) { - rrs.workers = workers -} - -// Get gets a worker from scheduler. -func (rrs *roundRobinScheduler) Get() *worker { - if len(rrs.workers) <= 0 { - return nil - } - - if rrs.index++; rrs.index >= len(rrs.workers) { - rrs.index = 0 - } - - worker := rrs.workers[rrs.index] - return worker -} diff --git a/round_robin_test.go b/round_robin_test.go deleted file mode 100644 index 5e2363b..0000000 --- a/round_robin_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "testing" -) - -// go test -v -cover -run=^TestRoundRobinScheduler$ -func TestRoundRobinScheduler(t *testing.T) { - workerNum := 16 - workers := make([]*worker, 0, workerNum) - for range workerNum { - workers = append(workers, new(worker)) - } - - scheduler := newRoundRobinScheduler(workers...) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got := len(scheduler.workers) - want := len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - scheduler.Set(workers) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got = len(scheduler.workers) - want = len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - for i, worker := range workers { - got := scheduler.workers[i] - if got != worker { - t.Fatalf("got %p != worker %p", got, worker) - } - } - - for _, worker := range workers { - gotNext := scheduler.Get() - if gotNext != worker { - t.Fatalf("gotNext %p != worker %p", gotNext, worker) - } - } -} diff --git a/worker.go b/worker.go deleted file mode 100644 index 9c42bdb..0000000 --- a/worker.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import "time" - -type scheduler interface { - // Set sets the workers to scheduler. - Set(workers []*worker) - - // Get gets a worker from scheduler. - Get() *worker -} - -type worker struct { - executor *Executor - taskQueue chan Task - acceptTime time.Time -} - -func newWorker(executor *Executor) *worker { - w := &worker{ - executor: executor, - taskQueue: make(chan Task, executor.conf.workerQueueSize), - } - - w.work() - return w -} - -// WaitingTasks returns the number of tasks waiting. -func (w *worker) WaitingTasks() int { - return len(w.taskQueue) -} - -// AcceptTime returns the accept time of worker. -func (w *worker) AcceptTime() time.Time { - return w.acceptTime -} - -// SetAcceptTime sets the accept time of worker. -func (w *worker) SetAcceptTime(t time.Time) { - w.acceptTime = t -} - -func (w *worker) handle(task Task) { - if w.executor.conf.recoverable() { - defer func() { - if r := recover(); r != nil { - w.executor.conf.recover(r) - } - }() - } - - task() -} - -func (w *worker) work() { - w.executor.wg.Add(1) - go func() { - defer w.executor.wg.Done() - defer close(w.taskQueue) - - for task := range w.taskQueue { - if task == nil { - break - } - - w.handle(task) - } - }() -} - -// Accept accepts a task to be handled. -func (w *worker) Accept(task Task) { - if task == nil { - return - } - - w.taskQueue <- task -} - -// Done signals the worker to stop working. -func (w *worker) Done() { - w.taskQueue <- nil -} diff --git a/worker_test.go b/worker_test.go deleted file mode 100644 index 4a710bd..0000000 --- a/worker_test.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "testing" - "time" -) - -// go test -v -cover -run=^TestWorkerHandle$ -func TestWorkerHandle(t *testing.T) { - got := 0 - want := 666 - - executor := &Executor{ - conf: &config{ - recoverFunc: func(r any) { - got = r.(int) - }, - }, - } - - worker := &worker{executor: executor} - worker.handle(func() { - panic(want) - }) - - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - want = 123 - worker.handle(func() { - got = 123 - }) - - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - defer func() { - if r := recover(); r == nil { - t.Fatal("worker.handle should panic") - } - }() - - worker.executor.conf.recoverFunc = nil - worker.handle(func() { - panic(want) - }) -} - -// go test -v -cover -run=^TestWorkerWaitingTasks$ -func TestWorkerWaitingTasks(t *testing.T) { - taskQueue := make(chan Task, 4) - worker := &worker{taskQueue: taskQueue} - - got := worker.WaitingTasks() - want := len(taskQueue) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - got = worker.WaitingTasks() - if got != 0 { - t.Fatalf("got %d != 0", got) - } - - taskQueue <- nil - taskQueue <- nil - - got = worker.WaitingTasks() - want = len(taskQueue) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - got = worker.WaitingTasks() - if got != 2 { - t.Fatalf("got %d != 2", got) - } -} - -// go test -v -cover -run=^TestWorkerAcceptTime$ -func TestWorkerAcceptTime(t *testing.T) { - acceptTime := time.Now() - worker := &worker{acceptTime: acceptTime} - - got := worker.AcceptTime() - if got != acceptTime { - t.Fatalf("got %v != acceptTime %v", got, acceptTime) - } - - acceptTime = time.Unix(123456789, 0) - worker.SetAcceptTime(acceptTime) - - got = worker.acceptTime - if got != acceptTime { - t.Fatalf("got %v != acceptTime %v", got, acceptTime) - } - - got = worker.AcceptTime() - if got != acceptTime { - t.Fatalf("got %v != acceptTime %v", got, acceptTime) - } -} From cb316cccc14e51d0f77740c93571b7fc231d20ec Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Thu, 1 Jan 2026 16:05:54 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E5=BC=80?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _examples/basic.go | 7 ++++-- executor.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 ++ go.sum | 2 ++ option.go | 43 ++++++++++++++-------------------- option_test.go | 43 +++++++++++----------------------- worker.go | 51 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 150 insertions(+), 56 deletions(-) create mode 100644 executor.go create mode 100644 go.sum create mode 100644 worker.go diff --git a/_examples/basic.go b/_examples/basic.go index 4885d57..69d24e1 100644 --- a/_examples/basic.go +++ b/_examples/basic.go @@ -5,6 +5,7 @@ package main import ( + "context" "fmt" "time" @@ -12,6 +13,8 @@ import ( ) func main() { + ctx := context.Background() + // Limits the number of simultaneous goroutines and not reuses them. limiter := goes.NewLimiter(4) @@ -26,10 +29,10 @@ func main() { // Limits the number of simultaneous goroutines and reuses them. executor := goes.NewExecutor(4) - defer executor.Close() + defer executor.Close(ctx) for i := 0; i < 20; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) diff --git a/executor.go b/executor.go new file mode 100644 index 0000000..b041884 --- /dev/null +++ b/executor.go @@ -0,0 +1,58 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +import ( + "context" + "errors" + + "github.com/FishGoddess/rego" +) + +var ErrExecutorClosed = errors.New("goes: executor is closed") + +func newExecutorClosedErr(ctx context.Context) error { + return ErrExecutorClosed +} + +type Executor struct { + conf *config + pool *rego.Pool[*worker] +} + +func NewExecutor(workerNum uint, opts ...Option) *Executor { + conf := newConfig().apply(opts...) + + executor := new(Executor) + executor.conf = conf + executor.pool = rego.New(uint64(workerNum), executor.acquire, executor.release) + executor.pool.WithPoolClosedErrFunc(newExecutorClosedErr) + return executor +} + +func (e *Executor) acquire(ctx context.Context) (*worker, error) { + worker := newWorker(e.conf.queueSize, e.conf.recovery) + worker.start() + return worker, nil +} + +func (e *Executor) release(ctx context.Context, worker *worker) error { + worker.stop() + return nil +} + +func (e *Executor) Submit(ctx context.Context, f func()) error { + worker, err := e.pool.Acquire(ctx) + if err != nil { + return err + } + + worker.submit(f) + return nil +} + +func (e *Executor) Close(ctx context.Context) error { + return e.pool.Close(ctx) +} diff --git a/go.mod b/go.mod index 828f3cf..4dfd6ec 100755 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/FishGoddess/goes go 1.25 + +require github.com/FishGoddess/rego v0.4.3 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a15dca9 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/FishGoddess/rego v0.4.3 h1:apa1EpI+rSHY+77LvOSfl/Rgbj8FE0aJ/leZYqQ1UUg= +github.com/FishGoddess/rego v0.4.3/go.mod h1:lGyw0kAJy+aTEfRV35gJxV5/nOFMmiiJWyxJ/JIQBuo= diff --git a/option.go b/option.go index f4fcb6d..0501031 100644 --- a/option.go +++ b/option.go @@ -4,45 +4,38 @@ package goes -import ( - "time" -) - type config struct { - workerNum int - workerQueueSize int - now func() time.Time - handlePanic func(r any) + queueSize uint + recovery func(r any) } -func newConfig(workerNum int) *config { +func newConfig() *config { return &config{ - workerNum: workerNum, - workerQueueSize: 256, - now: time.Now, - handlePanic: nil, + queueSize: 64, + recovery: nil, } } -type Option func(conf *config) - -// WithWorkerQueueSize sets the queue size of worker. -func WithWorkerQueueSize(size int) Option { - return func(conf *config) { - conf.workerQueueSize = size +func (c *config) apply(opts ...Option) *config { + for _, opt := range opts { + opt(c) } + + return c } -// WithNow sets the now function. -func WithNow(now func() time.Time) Option { +type Option func(conf *config) + +// WithQueueSize sets the queue size of worker. +func WithQueueSize(queueSize uint) Option { return func(conf *config) { - conf.now = now + conf.queueSize = queueSize } } -// WithHandlePanic sets the handle panic function. -func WithHandlePanic(handlePanic func(r any)) Option { +// WithRecovery sets the recovery function. +func WithRecovery(recovery func(r any)) Option { return func(conf *config) { - conf.handlePanic = handlePanic + conf.recovery = recovery } } diff --git a/option_test.go b/option_test.go index 433d11c..f981cc4 100644 --- a/option_test.go +++ b/option_test.go @@ -7,44 +7,29 @@ package goes import ( "fmt" "testing" - "time" ) -// go test -v -cover -run=^TestWithWorkerQueueSize$ -func TestWithWorkerQueueSize(t *testing.T) { - conf := &config{workerQueueSize: 0} +// go test -v -cover -run=^TestWithQueueSize$ +func TestWithQueueSize(t *testing.T) { + conf := &config{queueSize: 0} - workerQueueSize := 1024 - WithWorkerQueueSize(workerQueueSize)(conf) + queueSize := uint(1024) + WithQueueSize(queueSize)(conf) - if conf.workerQueueSize != workerQueueSize { - t.Fatalf("got %d != want %d", conf.workerQueueSize, workerQueueSize) + if conf.queueSize != queueSize { + t.Fatalf("got %d != want %d", conf.queueSize, queueSize) } } -// go test -v -cover -run=^TestWithNow$ -func TestWithNow(t *testing.T) { - conf := &config{now: nil} +// go test -v -cover -run=^WithRecovery$ +func TestWithRecovery(t *testing.T) { + conf := &config{recovery: nil} - now := func() time.Time { return time.Now() } - WithNow(now)(conf) + recovery := func(r any) {} + WithRecovery(recovery)(conf) - got := fmt.Sprintf("%p", conf.now) - want := fmt.Sprintf("%p", now) - if got != want { - t.Fatalf("got %s != want %s", got, want) - } -} - -// go test -v -cover -run=^TestWithHandlePanic$ -func TestWithHandlePanic(t *testing.T) { - conf := &config{handlePanic: nil} - - handlePanic := func(r any) {} - WithHandlePanic(handlePanic)(conf) - - got := fmt.Sprintf("%p", conf.handlePanic) - want := fmt.Sprintf("%p", handlePanic) + got := fmt.Sprintf("%p", conf.recovery) + want := fmt.Sprintf("%p", recovery) if got != want { t.Fatalf("got %s != want %s", got, want) } diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..b1232c7 --- /dev/null +++ b/worker.go @@ -0,0 +1,51 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +type worker struct { + funcs chan func() + recovery func(r any) +} + +func newWorker(queueSize uint, recovery func(r any)) *worker { + worker := &worker{ + funcs: make(chan func(), queueSize), + recovery: recovery, + } + + return worker +} + +func (w *worker) do(f func()) { + if w.recovery != nil { + defer func() { + if r := recover(); r != nil { + w.recovery(r) + } + }() + } + + f() +} + +func (w *worker) start() { + go func() { + for f := range w.funcs { + if f == nil { + return + } + + w.do(f) + } + }() +} + +func (w *worker) stop() { + w.funcs <- nil +} + +func (w *worker) submit(f func()) { + w.funcs <- f +} From 2ade0958aafe0e070a938a9fe2218b02c4690cbe Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Thu, 1 Jan 2026 17:31:18 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _examples/basic.go | 22 +++++++++++----------- executor.go | 16 ++++++++++++---- worker.go | 14 ++++++-------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/_examples/basic.go b/_examples/basic.go index 69d24e1..1bb5528 100644 --- a/_examples/basic.go +++ b/_examples/basic.go @@ -15,17 +15,17 @@ import ( func main() { ctx := context.Background() - // Limits the number of simultaneous goroutines and not reuses them. - limiter := goes.NewLimiter(4) - - for i := 0; i < 20; i++ { - limiter.Go(func() { - fmt.Printf("limiter --> %s\n", time.Now()) - time.Sleep(time.Second) - }) - } - - limiter.Wait() + // // Limits the number of simultaneous goroutines and not reuses them. + // limiter := goes.NewLimiter(4) + // + // for i := 0; i < 20; i++ { + // limiter.Go(func() { + // fmt.Printf("limiter --> %s\n", time.Now()) + // time.Sleep(time.Second) + // }) + // } + // + // limiter.Wait() // Limits the number of simultaneous goroutines and reuses them. executor := goes.NewExecutor(4) diff --git a/executor.go b/executor.go index b041884..dc40a4a 100644 --- a/executor.go +++ b/executor.go @@ -7,6 +7,7 @@ package goes import ( "context" "errors" + "sync" "github.com/FishGoddess/rego" ) @@ -18,8 +19,9 @@ func newExecutorClosedErr(ctx context.Context) error { } type Executor struct { - conf *config - pool *rego.Pool[*worker] + conf *config + pool *rego.Pool[*worker] + group sync.WaitGroup } func NewExecutor(workerNum uint, opts ...Option) *Executor { @@ -34,7 +36,7 @@ func NewExecutor(workerNum uint, opts ...Option) *Executor { func (e *Executor) acquire(ctx context.Context) (*worker, error) { worker := newWorker(e.conf.queueSize, e.conf.recovery) - worker.start() + e.group.Go(worker.start) return worker, nil } @@ -50,9 +52,15 @@ func (e *Executor) Submit(ctx context.Context, f func()) error { } worker.submit(f) + e.pool.Release(ctx, worker) return nil } func (e *Executor) Close(ctx context.Context) error { - return e.pool.Close(ctx) + if err := e.pool.Close(ctx); err != nil { + return err + } + + e.group.Wait() + return nil } diff --git a/worker.go b/worker.go index b1232c7..f5e2b3e 100644 --- a/worker.go +++ b/worker.go @@ -31,15 +31,13 @@ func (w *worker) do(f func()) { } func (w *worker) start() { - go func() { - for f := range w.funcs { - if f == nil { - return - } - - w.do(f) + for f := range w.funcs { + if f == nil { + return } - }() + + w.do(f) + } } func (w *worker) stop() { From 0e638be194605f3ac27bb119a60f7e2585edc465 Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Thu, 1 Jan 2026 18:11:55 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E6=80=A7=E8=83=BD=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 2 +- _examples/basic_test.go | 11 +++++++---- limiter.go | 8 ++++---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index d37d13d..432ced9 100755 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ test: go test -v -cover ./... bench: - go test -v ./_examples/performance_test.go -run=none -bench=. -benchmem -benchtime=1s + go test -v ./_examples/basic_test.go -run=none -bench=. -benchmem -benchtime=1s fmt: go fmt ./... diff --git a/_examples/basic_test.go b/_examples/basic_test.go index 6ea4521..e811d15 100644 --- a/_examples/basic_test.go +++ b/_examples/basic_test.go @@ -5,6 +5,7 @@ package main import ( + "context" "sync/atomic" "testing" "time" @@ -65,6 +66,7 @@ func BenchmarkLimiterTime(b *testing.B) { // go test -v -run=none -bench=^BenchmarkExecutor$ -benchmem -benchtime=1s func BenchmarkExecutor(b *testing.B) { + ctx := context.Background() executor := goes.NewExecutor(workerNum) num := uint32(0) @@ -74,16 +76,17 @@ func BenchmarkExecutor(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - executor.Submit(task) + executor.Submit(ctx, task) } }) - executor.Close() + executor.Close(ctx) b.Logf("num is %d", num) } // go test -v -run=none -bench=^BenchmarkExecutorTime$ -benchmem -benchtime=1s func BenchmarkExecutorTime(b *testing.B) { + ctx := context.Background() executor := goes.NewExecutor(workerNum) num := uint32(0) @@ -93,10 +96,10 @@ func BenchmarkExecutorTime(b *testing.B) { beginTime := time.Now() for range timeLoop { - executor.Submit(task) + executor.Submit(ctx, task) } - executor.Close() + executor.Close(ctx) cost := time.Since(beginTime) b.Logf("num is %d, cost is %s", num, cost) diff --git a/limiter.go b/limiter.go index 2be40dc..279cd82 100644 --- a/limiter.go +++ b/limiter.go @@ -16,7 +16,7 @@ type token struct{} // Limiter limits the simultaneous number of goroutines. type Limiter struct { tokens chan token - wg sync.WaitGroup + group sync.WaitGroup } // NewLimiter creates a new limiter with limit. @@ -49,12 +49,12 @@ func (l *Limiter) releaseToken() { // Go starts a goroutine to run f(). func (l *Limiter) Go(f func()) { l.acquireToken() - l.wg.Add(1) + l.group.Add(1) go func() { defer func() { l.releaseToken() - l.wg.Done() + l.group.Done() }() f() @@ -63,5 +63,5 @@ func (l *Limiter) Go(f func()) { // Wait waits all goroutines to be finished. func (l *Limiter) Wait() { - l.wg.Wait() + l.group.Wait() } From 064e4744ae512625c186dfd84a231c025947baa0 Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Wed, 7 Jan 2026 12:11:54 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FUTURE.md | 9 +++-- Makefile | 9 ++--- README.en.md | 25 ++++++++------ README.md | 25 ++++++++------ _examples/basic.go | 26 +++++++------- _examples/basic_test.go | 59 ++++++++++++++++++++++++++------ executor.go | 75 +++++++++++++++++++++++------------------ executor_test.go | 51 ++++++++++++++++++++++++++++ go.mod | 2 -- go.sum | 2 -- option.go | 4 +-- task.go | 23 +++++++++++++ task_test.go | 64 +++++++++++++++++++++++++++++++++++ worker.go | 49 --------------------------- 14 files changed, 287 insertions(+), 136 deletions(-) create mode 100644 executor_test.go create mode 100644 task.go create mode 100644 task_test.go delete mode 100644 worker.go diff --git a/FUTURE.md b/FUTURE.md index f61d3ac..a187639 100755 --- a/FUTURE.md +++ b/FUTURE.md @@ -1,12 +1,17 @@ ## ✒ 未来版本的新特性 (Features in future versions) +### v0.3.x + +* [x] 重构代码,精简 Executor 实现 +* [ ] 完善单元测试,将覆盖率提高到 90% 以上 + ### v0.2.x * [x] 增加异步任务执行器 * [x] 支持轮询调度策略 * [x] 支持随机调度策略 -* [ ] 支持任务数量优先调度策略 -* [ ] 支持任务时间优先调度策略 +* [ ] ~~支持任务数量优先调度策略~~ +* [ ] ~~支持任务时间优先调度策略~~ * [x] 支持 worker 动态扩缩容 * [x] 支持查询可用的 worker 数量 * [x] 支持设置 worker 任务队列大小 diff --git a/Makefile b/Makefile index 432ced9..a575d1d 100755 --- a/Makefile +++ b/Makefile @@ -1,6 +1,9 @@ -.PHONY: test fmt +.PHONY: fmt test -all: test +all: fmt test + +fmt: + go fmt ./... test: go test -v -cover ./... @@ -8,5 +11,3 @@ test: bench: go test -v ./_examples/basic_test.go -run=none -bench=. -benchmem -benchtime=1s -fmt: - go fmt ./... diff --git a/README.en.md b/README.en.md index 2fc4abc..134d9f3 100755 --- a/README.en.md +++ b/README.en.md @@ -30,6 +30,7 @@ $ go get -u github.com/FishGoddess/goes package main import ( + "context" "fmt" "time" @@ -37,6 +38,8 @@ import ( ) func main() { + ctx := context.Background() + // Limits the number of simultaneous goroutines and not reuses them. limiter := goes.NewLimiter(4) @@ -54,7 +57,7 @@ func main() { defer executor.Close() for i := 0; i < 20; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) @@ -73,20 +76,22 @@ $ make bench ```bash goos: linux goarch: amd64 -cpu: AMD EPYC 7K62 48-Core Processor +cpu: Intel(R) Xeon(R) CPU E5-26xx v4 -BenchmarkLimiter-2 2417040 498.5 ns/op 24 B/op 1 allocs/op -BenchmarkExecutor-2 20458502 58.3 ns/op 0 B/op 0 allocs/op -BenchmarkAntsPool-2 4295964 271.7 ns/op 0 B/op 0 allocs/op +BenchmarkLimiter-2 1256862 870.5 ns/op 24 B/op 1 allocs/op +BenchmarkExecutor-2 3916312 286.8 ns/op 0 B/op 0 allocs/op +BenchmarkAntsPool-2 1396972 846.6 ns/op 0 B/op 0 allocs/op +BenchmarkConcPool-2 1473289 843.4 ns/op 0 B/op 0 allocs/op -BenchmarkLimiterTime-2: num is 1000000, cost is 300.936441ms -BenchmarkExecutorTime-2: num is 1000000, cost is 63.026947ms -BenchmarkAntsPoolTime-2: num is 999744, cost is 346.972287ms +BenchmarkLimiterTime-2: num is 500000, cost is 391.462505ms +BenchmarkExecutorTime-2: num is 500000, cost is 180.279155ms +BenchmarkAntsPoolTime-2: num is 500000, cost is 547.328528ms +BenchmarkConcPoolTime-2: num is 500000, cost is 390.354196ms ``` -> Obviously, goes.Executor is 5x faster than ants.Pool which has more features, so try goes if you prefer a lightweight and faster executor. +> Obviously, goes.Executor is faster than other concurrent libraries, so try goes.Executor if you prefer a light-weight and faster executor. -> Benchmarks: [_examples/performance_test.go](./_examples/performance_test.go). +> Benchmarks: [_examples/basic_test.go](./_examples/basic_test.go). ### 👥 Contributing diff --git a/README.md b/README.md index 3240461..d13523e 100755 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ $ go get -u github.com/FishGoddess/goes package main import ( + "context" "fmt" "time" @@ -37,6 +38,8 @@ import ( ) func main() { + ctx := context.Background() + // Limits the number of simultaneous goroutines and not reuses them. limiter := goes.NewLimiter(4) @@ -54,7 +57,7 @@ func main() { defer executor.Close() for i := 0; i < 20; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) @@ -73,20 +76,22 @@ $ make bench ```bash goos: linux goarch: amd64 -cpu: AMD EPYC 7K62 48-Core Processor +cpu: Intel(R) Xeon(R) CPU E5-26xx v4 -BenchmarkLimiter-2 2417040 498.5 ns/op 24 B/op 1 allocs/op -BenchmarkExecutor-2 20458502 58.3 ns/op 0 B/op 0 allocs/op -BenchmarkAntsPool-2 4295964 271.7 ns/op 0 B/op 0 allocs/op +BenchmarkLimiter-2 1256862 870.5 ns/op 24 B/op 1 allocs/op +BenchmarkExecutor-2 3916312 286.8 ns/op 0 B/op 0 allocs/op +BenchmarkAntsPool-2 1396972 846.6 ns/op 0 B/op 0 allocs/op +BenchmarkConcPool-2 1473289 843.4 ns/op 0 B/op 0 allocs/op -BenchmarkLimiterTime-2: num is 1000000, cost is 300.936441ms -BenchmarkExecutorTime-2: num is 1000000, cost is 63.026947ms -BenchmarkAntsPoolTime-2: num is 999744, cost is 346.972287ms +BenchmarkLimiterTime-2: num is 500000, cost is 391.462505ms +BenchmarkExecutorTime-2: num is 500000, cost is 180.279155ms +BenchmarkAntsPoolTime-2: num is 500000, cost is 547.328528ms +BenchmarkConcPoolTime-2: num is 500000, cost is 390.354196ms ``` -> 很明显,goes.Executor 的性能比功能更丰富的 ants.Pool 要高出 5 倍左右,所以当你需要一个很轻量且高性能的异步任务执行器时,可以尝试下 goes。 +> 很明显,goes.Executor 的性能比其他的并发执行库高很多,所以当你需要一个轻量且高性能的并发执行器时,可以尝试下 goes.Executor。 -> 测试文件:[_examples/performance_test.go](./_examples/performance_test.go)。 +> 测试文件:[_examples/basic_test.go](./_examples/basic_test.go)。 ### 👥 贡献者 diff --git a/_examples/basic.go b/_examples/basic.go index 1bb5528..58666b0 100644 --- a/_examples/basic.go +++ b/_examples/basic.go @@ -1,4 +1,4 @@ -// Copyright 2025s FishGoddess. All rights reserved. +// Copyright 2025 FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. @@ -15,21 +15,21 @@ import ( func main() { ctx := context.Background() - // // Limits the number of simultaneous goroutines and not reuses them. - // limiter := goes.NewLimiter(4) - // - // for i := 0; i < 20; i++ { - // limiter.Go(func() { - // fmt.Printf("limiter --> %s\n", time.Now()) - // time.Sleep(time.Second) - // }) - // } - // - // limiter.Wait() + // Limits the number of simultaneous goroutines and not reuses them. + limiter := goes.NewLimiter(4) + + for i := 0; i < 20; i++ { + limiter.Go(func() { + fmt.Printf("limiter --> %s\n", time.Now()) + time.Sleep(time.Second) + }) + } + + limiter.Wait() // Limits the number of simultaneous goroutines and reuses them. executor := goes.NewExecutor(4) - defer executor.Close(ctx) + defer executor.Close() for i := 0; i < 20; i++ { executor.Submit(ctx, func() { diff --git a/_examples/basic_test.go b/_examples/basic_test.go index e811d15..e49a8d2 100644 --- a/_examples/basic_test.go +++ b/_examples/basic_test.go @@ -12,13 +12,13 @@ import ( "github.com/FishGoddess/goes" //"github.com/panjf2000/ants/v2" + //"github.com/sourcegraph/conc/pool" ) const ( - limit = 256 - workerNum = limit - size = limit - timeLoop = 100_0000 + limit = 64 + workers = limit + timeLoop = 50_0000 ) func bench(num *uint32) { @@ -67,7 +67,7 @@ func BenchmarkLimiterTime(b *testing.B) { // go test -v -run=none -bench=^BenchmarkExecutor$ -benchmem -benchtime=1s func BenchmarkExecutor(b *testing.B) { ctx := context.Background() - executor := goes.NewExecutor(workerNum) + executor := goes.NewExecutor(workers) num := uint32(0) task := func() { @@ -80,14 +80,14 @@ func BenchmarkExecutor(b *testing.B) { } }) - executor.Close(ctx) + executor.Close() b.Logf("num is %d", num) } // go test -v -run=none -bench=^BenchmarkExecutorTime$ -benchmem -benchtime=1s func BenchmarkExecutorTime(b *testing.B) { ctx := context.Background() - executor := goes.NewExecutor(workerNum) + executor := goes.NewExecutor(workers) num := uint32(0) task := func() { @@ -99,7 +99,7 @@ func BenchmarkExecutorTime(b *testing.B) { executor.Submit(ctx, task) } - executor.Close(ctx) + executor.Close() cost := time.Since(beginTime) b.Logf("num is %d, cost is %s", num, cost) @@ -107,7 +107,7 @@ func BenchmarkExecutorTime(b *testing.B) { // // go test -v -run=none -bench=^BenchmarkAntsPool$ -benchmem -benchtime=1s // func BenchmarkAntsPool(b *testing.B) { -// pool, _ := ants.NewPool(workerNum) +// pool, _ := ants.NewPool(workers) // // num := uint32(0) // task := func() { @@ -126,7 +126,7 @@ func BenchmarkExecutorTime(b *testing.B) { // // // go test -v -run=none -bench=^BenchmarkAntsPoolTime$ -benchmem -benchtime=1s // func BenchmarkAntsPoolTime(b *testing.B) { -// pool, _ := ants.NewPool(workerNum) +// pool, _ := ants.NewPool(workers) // // num := uint32(0) // task := func() { @@ -143,3 +143,42 @@ func BenchmarkExecutorTime(b *testing.B) { // cost := time.Since(beginTime) // b.Logf("num is %d, cost is %s", num, cost) // } +// +// // // go test -v -run=none -bench=^BenchmarkConcPool$ -benchmem -benchtime=1s +// func BenchmarkConcPool(b *testing.B) { +// pool := pool.New().WithMaxGoroutines(workers) +// +// num := uint32(0) +// task := func() { +// bench(&num) +// } +// +// b.RunParallel(func(pb *testing.PB) { +// for pb.Next() { +// pool.Go(task) +// } +// }) +// +// pool.Wait() +// b.Logf("num is %d", num) +// } +// +// // go test -v -run=none -bench=^BenchmarkConcPoolTime$ -benchmem -benchtime=1s +// func BenchmarkConcPoolTime(b *testing.B) { +// pool := pool.New().WithMaxGoroutines(workers) +// +// num := uint32(0) +// task := func() { +// bench(&num) +// } +// +// beginTime := time.Now() +// for range timeLoop { +// pool.Go(task) +// } +// +// pool.Wait() +// +// cost := time.Since(beginTime) +// b.Logf("num is %d, cost is %s", num, cost) +// } diff --git a/executor.go b/executor.go index dc40a4a..30c06ed 100644 --- a/executor.go +++ b/executor.go @@ -8,59 +8,70 @@ import ( "context" "errors" "sync" - - "github.com/FishGoddess/rego" + "sync/atomic" ) var ErrExecutorClosed = errors.New("goes: executor is closed") -func newExecutorClosedErr(ctx context.Context) error { - return ErrExecutorClosed -} - type Executor struct { - conf *config - pool *rego.Pool[*worker] + conf *config + + tasks chan Task + done chan struct{} + closed atomic.Bool + group sync.WaitGroup } -func NewExecutor(workerNum uint, opts ...Option) *Executor { +func NewExecutor(workers uint, opts ...Option) *Executor { conf := newConfig().apply(opts...) - executor := new(Executor) - executor.conf = conf - executor.pool = rego.New(uint64(workerNum), executor.acquire, executor.release) - executor.pool.WithPoolClosedErrFunc(newExecutorClosedErr) - return executor -} + if workers < 1 { + panic("goes: workers < 1") + } + + executor := &Executor{ + conf: conf, + tasks: make(chan Task, conf.queueSize), + done: make(chan struct{}), + } + + for range workers { + executor.group.Go(executor.worker) + } -func (e *Executor) acquire(ctx context.Context) (*worker, error) { - worker := newWorker(e.conf.queueSize, e.conf.recovery) - e.group.Go(worker.start) - return worker, nil + return executor } -func (e *Executor) release(ctx context.Context, worker *worker) error { - worker.stop() - return nil +func (e *Executor) worker() { + for task := range e.tasks { + task.Do(e.conf.recovery) + } } -func (e *Executor) Submit(ctx context.Context, f func()) error { - worker, err := e.pool.Acquire(ctx) - if err != nil { - return err +func (e *Executor) Submit(ctx context.Context, task Task) error { + if e.closed.Load() { + return ErrExecutorClosed } - worker.submit(f) - e.pool.Release(ctx, worker) - return nil + select { + case e.tasks <- task: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-e.done: + return ErrExecutorClosed + } } -func (e *Executor) Close(ctx context.Context) error { - if err := e.pool.Close(ctx); err != nil { - return err +func (e *Executor) Close() error { + if !e.closed.CompareAndSwap(false, true) { + return nil } + close(e.done) + close(e.tasks) + e.group.Wait() return nil } diff --git a/executor_test.go b/executor_test.go new file mode 100644 index 0000000..74dce2e --- /dev/null +++ b/executor_test.go @@ -0,0 +1,51 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +import ( + "context" + "sync" + "testing" + "time" +) + +// go test -v -cover -run=^TestExecutor$ +func TestExecutor(t *testing.T) { + workers := 16 + executor := NewExecutor(uint(workers)) + defer executor.Close() + + var countMap = make(map[int64]int, 16) + var lock sync.Mutex + + ctx := context.Background() + totalCount := 10 * workers + for i := 0; i < totalCount; i++ { + executor.Submit(ctx, func() { + now := time.Now().UnixMilli() / 10 + + lock.Lock() + countMap[now] = countMap[now] + 1 + lock.Unlock() + + time.Sleep(10 * time.Millisecond) + }) + } + + executor.Close() + + gotTotalCount := 0 + for now, count := range countMap { + gotTotalCount = gotTotalCount + count + + if count != workers { + t.Fatalf("now %d: count %d != workers %d", now, count, workers) + } + } + + if gotTotalCount != totalCount { + t.Fatalf("gotTotalCount %d != totalCount %d", gotTotalCount, totalCount) + } +} diff --git a/go.mod b/go.mod index 4dfd6ec..828f3cf 100755 --- a/go.mod +++ b/go.mod @@ -1,5 +1,3 @@ module github.com/FishGoddess/goes go 1.25 - -require github.com/FishGoddess/rego v0.4.3 diff --git a/go.sum b/go.sum index a15dca9..e69de29 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +0,0 @@ -github.com/FishGoddess/rego v0.4.3 h1:apa1EpI+rSHY+77LvOSfl/Rgbj8FE0aJ/leZYqQ1UUg= -github.com/FishGoddess/rego v0.4.3/go.mod h1:lGyw0kAJy+aTEfRV35gJxV5/nOFMmiiJWyxJ/JIQBuo= diff --git a/option.go b/option.go index 0501031..f3c6c66 100644 --- a/option.go +++ b/option.go @@ -11,7 +11,7 @@ type config struct { func newConfig() *config { return &config{ - queueSize: 64, + queueSize: 1024, recovery: nil, } } @@ -26,7 +26,7 @@ func (c *config) apply(opts ...Option) *config { type Option func(conf *config) -// WithQueueSize sets the queue size of worker. +// WithQueueSize sets the queue size. func WithQueueSize(queueSize uint) Option { return func(conf *config) { conf.queueSize = queueSize diff --git a/task.go b/task.go new file mode 100644 index 0000000..0be3da7 --- /dev/null +++ b/task.go @@ -0,0 +1,23 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +type Task func() + +func (t Task) Do(recovery func(r any)) { + if t == nil { + return + } + + if recovery != nil { + defer func() { + if r := recover(); r != nil { + recovery(r) + } + }() + } + + t() +} diff --git a/task_test.go b/task_test.go new file mode 100644 index 0000000..545426b --- /dev/null +++ b/task_test.go @@ -0,0 +1,64 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +import "testing" + +// go test -v -cover -run=^TestTask$ +func TestTask(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + + var done bool = false + var task Task = func() { done = true } + task.Do(nil) + + if !done { + t.Fatalf("%+v is wrong", done) + } +} + +// go test -v -cover -run=^TestTaskNil$ +func TestTaskNil(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + + var task Task = nil + task.Do(nil) +} + +// go test -v -cover -run=^TestTaskPanic$ +func TestTaskPanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("task should panic") + } + }() + + var task Task = func() { panic("wow") } + task.Do(nil) +} + +// go test -v -cover -run=^TestTaskRecovery$ +func TestTaskRecovery(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + + var task Task = func() { panic("wow") } + task.Do(func(r any) { + if r != "wow" { + t.Fatalf("r %+v is wrong", r) + } + }) +} diff --git a/worker.go b/worker.go deleted file mode 100644 index f5e2b3e..0000000 --- a/worker.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -type worker struct { - funcs chan func() - recovery func(r any) -} - -func newWorker(queueSize uint, recovery func(r any)) *worker { - worker := &worker{ - funcs: make(chan func(), queueSize), - recovery: recovery, - } - - return worker -} - -func (w *worker) do(f func()) { - if w.recovery != nil { - defer func() { - if r := recover(); r != nil { - w.recovery(r) - } - }() - } - - f() -} - -func (w *worker) start() { - for f := range w.funcs { - if f == nil { - return - } - - w.do(f) - } -} - -func (w *worker) stop() { - w.funcs <- nil -} - -func (w *worker) submit(f func()) { - w.funcs <- f -} From bb6a1773569e90c0ea85f63bc3a5575ad7e63f08 Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Wed, 7 Jan 2026 12:13:26 +0800 Subject: [PATCH 6/7] v0.3.x --- HISTORY.md | 6 ++++++ _icons/coverage.svg | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 09a0ce7..2cac061 100755 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,11 @@ ## ✒ 历史版本的特性介绍 (Features in old versions) +### v0.3.0-alpha + +> 此版本发布于 2026-01-07 + +* 重构代码,精简 Executor 实现 + ### v0.2.5-alpha > 此版本发布于 2025-06-30 diff --git a/_icons/coverage.svg b/_icons/coverage.svg index be8453f..a433fc7 100644 --- a/_icons/coverage.svg +++ b/_icons/coverage.svg @@ -10,7 +10,7 @@ coverage coverage - 98% - 98% + 87% + 87% \ No newline at end of file From 696f5b953ffa842f25995cf5a0f53139bddb92a4 Mon Sep 17 00:00:00 2001 From: FishGoddess Date: Wed, 7 Jan 2026 12:27:03 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=E5=92=8C=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _icons/coverage.svg | 4 ++-- executor.go | 24 +++++++++++++++++++----- limiter.go | 15 ++++++++++----- limiter_test.go | 2 +- 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/_icons/coverage.svg b/_icons/coverage.svg index a433fc7..7f6c81e 100644 --- a/_icons/coverage.svg +++ b/_icons/coverage.svg @@ -10,7 +10,7 @@ coverage coverage - 87% - 87% + 86% + 86% \ No newline at end of file diff --git a/executor.go b/executor.go index 30c06ed..0d6e2da 100644 --- a/executor.go +++ b/executor.go @@ -11,23 +11,35 @@ import ( "sync/atomic" ) -var ErrExecutorClosed = errors.New("goes: executor is closed") +const ( + minWorkers = 1 + maxWorkers = 10000 +) + +var ( + ErrExecutorClosed = errors.New("goes: executor is closed") +) +// Executor starts some workers to do tasks concurrently. type Executor struct { conf *config tasks chan Task done chan struct{} closed atomic.Bool - - group sync.WaitGroup + group sync.WaitGroup } +// NewExecutor creates a executor with workers. func NewExecutor(workers uint, opts ...Option) *Executor { conf := newConfig().apply(opts...) - if workers < 1 { - panic("goes: workers < 1") + if workers < minWorkers { + workers = minWorkers + } + + if workers > maxWorkers { + workers = maxWorkers } executor := &Executor{ @@ -49,6 +61,7 @@ func (e *Executor) worker() { } } +// Submit submits a task to executor and returns an error if failed. func (e *Executor) Submit(ctx context.Context, task Task) error { if e.closed.Load() { return ErrExecutorClosed @@ -64,6 +77,7 @@ func (e *Executor) Submit(ctx context.Context, task Task) error { } } +// Close closes the executor and returns an error if failed. func (e *Executor) Close() error { if !e.closed.CompareAndSwap(false, true) { return nil diff --git a/limiter.go b/limiter.go index 279cd82..9ea3555 100644 --- a/limiter.go +++ b/limiter.go @@ -13,14 +13,18 @@ const ( type token struct{} -// Limiter limits the simultaneous number of goroutines. +// Limiter starts some goroutines to do tasks concurrently. type Limiter struct { + conf *config + tokens chan token group sync.WaitGroup } // NewLimiter creates a new limiter with limit. -func NewLimiter(limit int) *Limiter { +func NewLimiter(limit uint, opts ...Option) *Limiter { + conf := newConfig().apply(opts...) + if limit < minLimit { limit = minLimit } @@ -30,6 +34,7 @@ func NewLimiter(limit int) *Limiter { } return &Limiter{ + conf: conf, tokens: make(chan token, limit), } } @@ -46,8 +51,8 @@ func (l *Limiter) releaseToken() { } } -// Go starts a goroutine to run f(). -func (l *Limiter) Go(f func()) { +// Go starts a goroutine to run task. +func (l *Limiter) Go(task Task) { l.acquireToken() l.group.Add(1) @@ -57,7 +62,7 @@ func (l *Limiter) Go(f func()) { l.group.Done() }() - f() + task.Do(l.conf.recovery) }() } diff --git a/limiter_test.go b/limiter_test.go index a5f7c42..386be7e 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -13,7 +13,7 @@ import ( // go test -v -cover -run=^TestLimiter$ func TestLimiter(t *testing.T) { limit := 16 - limiter := NewLimiter(limit) + limiter := NewLimiter(uint(limit)) var countMap = make(map[int64]int, 16) var lock sync.Mutex