diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index caced66..6c44d3d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - name: Setup uses: actions/setup-go@v4 with: - go-version: "1.23" + go-version: "1.25" - run: go version diff --git a/FUTURE.md b/FUTURE.md index f61d3ac..a187639 100755 --- a/FUTURE.md +++ b/FUTURE.md @@ -1,12 +1,17 @@ ## ✒ 未来版本的新特性 (Features in future versions) +### v0.3.x + +* [x] 重构代码,精简 Executor 实现 +* [ ] 完善单元测试,将覆盖率提高到 90% 以上 + ### v0.2.x * [x] 增加异步任务执行器 * [x] 支持轮询调度策略 * [x] 支持随机调度策略 -* [ ] 支持任务数量优先调度策略 -* [ ] 支持任务时间优先调度策略 +* [ ] ~~支持任务数量优先调度策略~~ +* [ ] ~~支持任务时间优先调度策略~~ * [x] 支持 worker 动态扩缩容 * [x] 支持查询可用的 worker 数量 * [x] 支持设置 worker 任务队列大小 diff --git a/HISTORY.md b/HISTORY.md index 09a0ce7..2cac061 100755 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,11 @@ ## ✒ 历史版本的特性介绍 (Features in old versions) +### v0.3.0-alpha + +> 此版本发布于 2026-01-07 + +* 重构代码,精简 Executor 实现 + ### v0.2.5-alpha > 此版本发布于 2025-06-30 diff --git a/LICENSE b/LICENSE index e69ceec..4becfb5 100755 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2023 FishGoddess +Copyright (c) 2025 FishGoddess Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), diff --git a/Makefile b/Makefile index d37d13d..a575d1d 100755 --- a/Makefile +++ b/Makefile @@ -1,12 +1,13 @@ -.PHONY: test fmt +.PHONY: fmt test -all: test +all: fmt test + +fmt: + go fmt ./... test: go test -v -cover ./... bench: - go test -v ./_examples/performance_test.go -run=none -bench=. -benchmem -benchtime=1s + go test -v ./_examples/basic_test.go -run=none -bench=. -benchmem -benchtime=1s -fmt: - go fmt ./... diff --git a/README.en.md b/README.en.md index 2fc4abc..134d9f3 100755 --- a/README.en.md +++ b/README.en.md @@ -30,6 +30,7 @@ $ go get -u github.com/FishGoddess/goes package main import ( + "context" "fmt" "time" @@ -37,6 +38,8 @@ import ( ) func main() { + ctx := context.Background() + // Limits the number of simultaneous goroutines and not reuses them. limiter := goes.NewLimiter(4) @@ -54,7 +57,7 @@ func main() { defer executor.Close() for i := 0; i < 20; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) @@ -73,20 +76,22 @@ $ make bench ```bash goos: linux goarch: amd64 -cpu: AMD EPYC 7K62 48-Core Processor +cpu: Intel(R) Xeon(R) CPU E5-26xx v4 -BenchmarkLimiter-2 2417040 498.5 ns/op 24 B/op 1 allocs/op -BenchmarkExecutor-2 20458502 58.3 ns/op 0 B/op 0 allocs/op -BenchmarkAntsPool-2 4295964 271.7 ns/op 0 B/op 0 allocs/op +BenchmarkLimiter-2 1256862 870.5 ns/op 24 B/op 1 allocs/op +BenchmarkExecutor-2 3916312 286.8 ns/op 0 B/op 0 allocs/op +BenchmarkAntsPool-2 1396972 846.6 ns/op 0 B/op 0 allocs/op +BenchmarkConcPool-2 1473289 843.4 ns/op 0 B/op 0 allocs/op -BenchmarkLimiterTime-2: num is 1000000, cost is 300.936441ms -BenchmarkExecutorTime-2: num is 1000000, cost is 63.026947ms -BenchmarkAntsPoolTime-2: num is 999744, cost is 346.972287ms +BenchmarkLimiterTime-2: num is 500000, cost is 391.462505ms +BenchmarkExecutorTime-2: num is 500000, cost is 180.279155ms +BenchmarkAntsPoolTime-2: num is 500000, cost is 547.328528ms +BenchmarkConcPoolTime-2: num is 500000, cost is 390.354196ms ``` -> Obviously, goes.Executor is 5x faster than ants.Pool which has more features, so try goes if you prefer a lightweight and faster executor. +> Obviously, goes.Executor is faster than other concurrent libraries, so try goes.Executor if you prefer a light-weight and faster executor. -> Benchmarks: [_examples/performance_test.go](./_examples/performance_test.go). +> Benchmarks: [_examples/basic_test.go](./_examples/basic_test.go). ### 👥 Contributing diff --git a/README.md b/README.md index 3240461..d13523e 100755 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ $ go get -u github.com/FishGoddess/goes package main import ( + "context" "fmt" "time" @@ -37,6 +38,8 @@ import ( ) func main() { + ctx := context.Background() + // Limits the number of simultaneous goroutines and not reuses them. limiter := goes.NewLimiter(4) @@ -54,7 +57,7 @@ func main() { defer executor.Close() for i := 0; i < 20; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) @@ -73,20 +76,22 @@ $ make bench ```bash goos: linux goarch: amd64 -cpu: AMD EPYC 7K62 48-Core Processor +cpu: Intel(R) Xeon(R) CPU E5-26xx v4 -BenchmarkLimiter-2 2417040 498.5 ns/op 24 B/op 1 allocs/op -BenchmarkExecutor-2 20458502 58.3 ns/op 0 B/op 0 allocs/op -BenchmarkAntsPool-2 4295964 271.7 ns/op 0 B/op 0 allocs/op +BenchmarkLimiter-2 1256862 870.5 ns/op 24 B/op 1 allocs/op +BenchmarkExecutor-2 3916312 286.8 ns/op 0 B/op 0 allocs/op +BenchmarkAntsPool-2 1396972 846.6 ns/op 0 B/op 0 allocs/op +BenchmarkConcPool-2 1473289 843.4 ns/op 0 B/op 0 allocs/op -BenchmarkLimiterTime-2: num is 1000000, cost is 300.936441ms -BenchmarkExecutorTime-2: num is 1000000, cost is 63.026947ms -BenchmarkAntsPoolTime-2: num is 999744, cost is 346.972287ms +BenchmarkLimiterTime-2: num is 500000, cost is 391.462505ms +BenchmarkExecutorTime-2: num is 500000, cost is 180.279155ms +BenchmarkAntsPoolTime-2: num is 500000, cost is 547.328528ms +BenchmarkConcPoolTime-2: num is 500000, cost is 390.354196ms ``` -> 很明显,goes.Executor 的性能比功能更丰富的 ants.Pool 要高出 5 倍左右,所以当你需要一个很轻量且高性能的异步任务执行器时,可以尝试下 goes。 +> 很明显,goes.Executor 的性能比其他的并发执行库高很多,所以当你需要一个轻量且高性能的并发执行器时,可以尝试下 goes.Executor。 -> 测试文件:[_examples/performance_test.go](./_examples/performance_test.go)。 +> 测试文件:[_examples/basic_test.go](./_examples/basic_test.go)。 ### 👥 贡献者 diff --git a/_examples/basic.go b/_examples/basic.go index 4885d57..58666b0 100644 --- a/_examples/basic.go +++ b/_examples/basic.go @@ -1,10 +1,11 @@ -// Copyright 2025s FishGoddess. All rights reserved. +// Copyright 2025 FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. package main import ( + "context" "fmt" "time" @@ -12,6 +13,8 @@ import ( ) func main() { + ctx := context.Background() + // Limits the number of simultaneous goroutines and not reuses them. limiter := goes.NewLimiter(4) @@ -29,7 +32,7 @@ func main() { defer executor.Close() for i := 0; i < 20; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) diff --git a/_examples/performance_test.go b/_examples/basic_test.go similarity index 66% rename from _examples/performance_test.go rename to _examples/basic_test.go index 6ea4521..e49a8d2 100644 --- a/_examples/performance_test.go +++ b/_examples/basic_test.go @@ -5,19 +5,20 @@ package main import ( + "context" "sync/atomic" "testing" "time" "github.com/FishGoddess/goes" //"github.com/panjf2000/ants/v2" + //"github.com/sourcegraph/conc/pool" ) const ( - limit = 256 - workerNum = limit - size = limit - timeLoop = 100_0000 + limit = 64 + workers = limit + timeLoop = 50_0000 ) func bench(num *uint32) { @@ -65,7 +66,8 @@ func BenchmarkLimiterTime(b *testing.B) { // go test -v -run=none -bench=^BenchmarkExecutor$ -benchmem -benchtime=1s func BenchmarkExecutor(b *testing.B) { - executor := goes.NewExecutor(workerNum) + ctx := context.Background() + executor := goes.NewExecutor(workers) num := uint32(0) task := func() { @@ -74,7 +76,7 @@ func BenchmarkExecutor(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - executor.Submit(task) + executor.Submit(ctx, task) } }) @@ -84,7 +86,8 @@ func BenchmarkExecutor(b *testing.B) { // go test -v -run=none -bench=^BenchmarkExecutorTime$ -benchmem -benchtime=1s func BenchmarkExecutorTime(b *testing.B) { - executor := goes.NewExecutor(workerNum) + ctx := context.Background() + executor := goes.NewExecutor(workers) num := uint32(0) task := func() { @@ -93,7 +96,7 @@ func BenchmarkExecutorTime(b *testing.B) { beginTime := time.Now() for range timeLoop { - executor.Submit(task) + executor.Submit(ctx, task) } executor.Close() @@ -104,7 +107,7 @@ func BenchmarkExecutorTime(b *testing.B) { // // go test -v -run=none -bench=^BenchmarkAntsPool$ -benchmem -benchtime=1s // func BenchmarkAntsPool(b *testing.B) { -// pool, _ := ants.NewPool(workerNum) +// pool, _ := ants.NewPool(workers) // // num := uint32(0) // task := func() { @@ -123,7 +126,7 @@ func BenchmarkExecutorTime(b *testing.B) { // // // go test -v -run=none -bench=^BenchmarkAntsPoolTime$ -benchmem -benchtime=1s // func BenchmarkAntsPoolTime(b *testing.B) { -// pool, _ := ants.NewPool(workerNum) +// pool, _ := ants.NewPool(workers) // // num := uint32(0) // task := func() { @@ -140,3 +143,42 @@ func BenchmarkExecutorTime(b *testing.B) { // cost := time.Since(beginTime) // b.Logf("num is %d, cost is %s", num, cost) // } +// +// // // go test -v -run=none -bench=^BenchmarkConcPool$ -benchmem -benchtime=1s +// func BenchmarkConcPool(b *testing.B) { +// pool := pool.New().WithMaxGoroutines(workers) +// +// num := uint32(0) +// task := func() { +// bench(&num) +// } +// +// b.RunParallel(func(pb *testing.PB) { +// for pb.Next() { +// pool.Go(task) +// } +// }) +// +// pool.Wait() +// b.Logf("num is %d", num) +// } +// +// // go test -v -run=none -bench=^BenchmarkConcPoolTime$ -benchmem -benchtime=1s +// func BenchmarkConcPoolTime(b *testing.B) { +// pool := pool.New().WithMaxGoroutines(workers) +// +// num := uint32(0) +// task := func() { +// bench(&num) +// } +// +// beginTime := time.Now() +// for range timeLoop { +// pool.Go(task) +// } +// +// pool.Wait() +// +// cost := time.Since(beginTime) +// b.Logf("num is %d, cost is %s", num, cost) +// } diff --git a/_examples/purge.go b/_examples/purge.go deleted file mode 100644 index 76d3e87..0000000 --- a/_examples/purge.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package main - -import ( - "fmt" - "math/rand" - "time" - - "github.com/FishGoddess/goes" -) - -func watchExecutorWorkers(executor *goes.Executor) { - for { - fmt.Printf("workers --> %d\n", executor.AvailableWorkers()) - time.Sleep(time.Second) - } -} - -func main() { - // Creates a purge-active executor with 16 workers. - // The purge task will be executed every purge interval and purge workers not in lifetime. - purgeInterval := time.Minute - workerLifetime := time.Minute - - executor := goes.NewExecutor(16, goes.WithPurgeActive(purgeInterval, workerLifetime)) - defer executor.Close() - - go watchExecutorWorkers(executor) - - // Submit some tasks. - for i := 0; i < 200; i++ { - no := i - - executor.Submit(func() { - r := 3000 + rand.Intn(1000) - fmt.Printf("task %d --> %d\n", no, r) - time.Sleep(time.Duration(r) * time.Millisecond) - }) - } - - time.Sleep(time.Minute + 5*time.Second) - fmt.Println("Wait! You will see the workers are decreasing and increasing...") - - for i := 0; i < 10; i++ { - no := i - - executor.Submit(func() { - r := 10000 + rand.Intn(1000) - fmt.Printf("task %d --> %d\n", no, r) - time.Sleep(time.Duration(r) * time.Millisecond) - }) - } - - time.Sleep(2 * time.Minute) -} diff --git a/_examples/spin_lock.go b/_examples/spin_lock.go deleted file mode 100644 index bb4bbb3..0000000 --- a/_examples/spin_lock.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package main - -import ( - "fmt" - "sync" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -func main() { - // It's an implementation of sync.Locker, so you can use it as a mutex. - spin := spinlock.New() - - var total int - var wg sync.WaitGroup - for i := 0; i < 10000; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - spin.Lock() - total++ - spin.Unlock() - }() - } - - wg.Wait() - fmt.Printf("total is %d\n", total) -} diff --git a/_icons/coverage.svg b/_icons/coverage.svg index be8453f..7f6c81e 100644 --- a/_icons/coverage.svg +++ b/_icons/coverage.svg @@ -10,7 +10,7 @@ coverage coverage - 98% - 98% + 86% + 86% \ No newline at end of file diff --git a/config.go b/config.go deleted file mode 100644 index 1750ebe..0000000 --- a/config.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "sync" - "time" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -type config struct { - workerNum int - workerQueueSize int - workerLifetime time.Duration - purgeInterval time.Duration - nowFunc func() time.Time - recoverFunc func(r any) - newLockerFunc func() sync.Locker - newSchedulerFunc func(workers ...*worker) scheduler -} - -func newDefaultConfig(workerNum int) *config { - return &config{ - workerNum: workerNum, - workerQueueSize: 64, - workerLifetime: 0, - purgeInterval: 0, - nowFunc: nil, - recoverFunc: nil, - newLockerFunc: nil, - newSchedulerFunc: nil, - } -} - -func (c *config) now() time.Time { - if c.nowFunc == nil { - return time.Now() - } - - return c.nowFunc() -} - -func (c *config) recoverable() bool { - return c.recoverFunc != nil -} - -func (c *config) recover(r any) { - c.recoverFunc(r) -} - -func (c *config) newLocker() sync.Locker { - if c.newLockerFunc == nil { - return spinlock.New() - } - - return c.newLockerFunc() -} - -func (c *config) newScheduler(workers ...*worker) scheduler { - if c.newSchedulerFunc == nil { - return newRoundRobinScheduler(workers...) - } - - return c.newSchedulerFunc(workers...) -} diff --git a/config_test.go b/config_test.go deleted file mode 100644 index 26ce362..0000000 --- a/config_test.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -// go test -v -cover -run=^TestNewDefaultConfig$ -func TestNewDefaultConfig(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - if conf.workerNum != workerNum { - t.Fatalf("conf.workerNum %d != workerNum %d", conf.workerNum, workerNum) - } -} - -// go test -v -cover -run=^TestConfigNow$ -func TestConfigNow(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - got := conf.now().Unix() - want := time.Now().Unix() - if got != want { - t.Fatalf("got %v != want %v", got, want) - } - - wantTime := time.Unix(123456789, 0) - conf.nowFunc = func() time.Time { - return wantTime - } - - got = conf.now().Unix() - want = wantTime.Unix() - if got != want { - t.Fatalf("got %v != want %v", got, want) - } -} - -// go test -v -cover -run=^TestConfigRecover$ -func TestConfigRecover(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - if conf.recoverable() { - t.Fatalf("conf.recoverable() is wrong") - } - - got := 0 - conf.recoverFunc = func(r any) { - got = r.(int) - } - - if !conf.recoverable() { - t.Fatalf("conf.recoverable() is wrong") - } - - want := 1 - conf.recover(want) - - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - defer func() { - if r := recover(); r == nil { - t.Fatal("conf.recover should panic") - } - }() - - conf.recoverFunc = nil - conf.recover(0) -} - -// go test -v -cover -run=^TestConfigNewLocker$ -func TestConfigNewLocker(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - got := conf.newLocker() - if _, ok := got.(*spinlock.Lock); !ok { - t.Fatalf("got %T is not *spinlock.Lock", got) - } - - want := &sync.Mutex{} - conf.newLockerFunc = func() sync.Locker { - return want - } - - got = conf.newLocker() - if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) { - t.Fatalf("got %p != want %p", got, want) - } -} - -// go test -v -cover -run=^TestConfigNewScheduler$ -func TestConfigNewScheduler(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - - workers := make([]*worker, workerNum) - got := conf.newScheduler(workers...) - - if _, ok := got.(*roundRobinScheduler); !ok { - t.Fatalf("got %T is not *roundRobinScheduler", got) - } - - want := &roundRobinScheduler{} - conf.newSchedulerFunc = func(workers ...*worker) scheduler { - want.workers = workers - return want - } - - got = conf.newScheduler(workers...) - if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) { - t.Fatalf("got %p != want %p", got, want) - } - - scheduler, ok := got.(*roundRobinScheduler) - if !ok { - t.Fatalf("got %T is not *roundRobinScheduler", got) - } - - gotLen := len(scheduler.workers) - if gotLen != workerNum { - t.Fatalf("gotLen %d != workerNum %d", gotLen, workerNum) - } -} diff --git a/executor.go b/executor.go index bfbab7e..0d6e2da 100644 --- a/executor.go +++ b/executor.go @@ -5,202 +5,87 @@ package goes import ( + "context" "errors" "sync" - "time" + "sync/atomic" ) -var ( - ErrExecutorIsClosed = errors.New("goes: executor is closed") - ErrWorkerIsNil = errors.New("goes: worker is nil") +const ( + minWorkers = 1 + maxWorkers = 10000 ) -// Task is a function can be executed by executor. -type Task = func() +var ( + ErrExecutorClosed = errors.New("goes: executor is closed") +) -// Executor executes tasks concurrently using limited goroutines. -// You can specify the number of workers and the queue size of each worker. +// Executor starts some workers to do tasks concurrently. type Executor struct { conf *config - workers []*worker - scheduler scheduler - closeCh chan struct{} - closed bool - - wg sync.WaitGroup - lock sync.Locker + tasks chan Task + done chan struct{} + closed atomic.Bool + group sync.WaitGroup } -// NewExecutor creates a new executor with given worker number and options. -func NewExecutor(workerNum int, opts ...Option) *Executor { - conf := newDefaultConfig(workerNum) - for _, opt := range opts { - opt.applyTo(conf) - } +// NewExecutor creates a executor with workers. +func NewExecutor(workers uint, opts ...Option) *Executor { + conf := newConfig().apply(opts...) - if conf.workerNum <= 0 { - panic("goes: executor's worker num <= 0") + if workers < minWorkers { + workers = minWorkers } - if conf.workerQueueSize <= 0 { - panic("goes: worker's queue size <= 0") + if workers > maxWorkers { + workers = maxWorkers } - if conf.purgeInterval > 0 && conf.purgeInterval < time.Minute { - panic("goes: executor's purge interval < 1 minute") - } - - if conf.workerLifetime > 0 && conf.workerLifetime < time.Minute { - panic("goes: executor's worker lifetime < 1 minute") - } - - workers := make([]*worker, 0, conf.workerNum) executor := &Executor{ - conf: conf, - workers: workers, - scheduler: conf.newScheduler(), - closeCh: make(chan struct{}, 1), - closed: false, - lock: conf.newLocker(), - } - - executor.spawnWorker() - executor.runPurgeTask() - return executor -} - -func (e *Executor) spawnWorker() *worker { - worker := newWorker(e) - - e.workers = append(e.workers, worker) - e.scheduler.Set(e.workers) - return worker -} - -func (e *Executor) purgeActive() bool { - return e.conf.purgeInterval > 0 && e.conf.workerLifetime > 0 -} - -func (e *Executor) purgeWorkers() { - e.lock.Lock() - defer e.lock.Unlock() - - now := e.conf.now() - purgeable := false - - isPurgeable := func(worker *worker) bool { - return worker.WaitingTasks() <= 0 && now.Sub(worker.AcceptTime()) >= e.conf.workerLifetime + conf: conf, + tasks: make(chan Task, conf.queueSize), + done: make(chan struct{}), } - // Check if we need to purge workers. - for _, worker := range e.workers { - if isPurgeable(worker) { - purgeable = true - break - } + for range workers { + executor.group.Go(executor.worker) } - if !purgeable { - return - } - - // Purge workers and we will keep one worker at least. - newWorkers := make([]*worker, 0, len(e.workers)) - for _, worker := range e.workers { - if len(newWorkers) <= 0 { - newWorkers = append(newWorkers, worker) - continue - } - - if isPurgeable(worker) { - worker.Done() - } else { - newWorkers = append(newWorkers, worker) - } - } - - e.workers = newWorkers - e.scheduler.Set(e.workers) + return executor } -func (e *Executor) runPurgeTask() { - if !e.purgeActive() { - return +func (e *Executor) worker() { + for task := range e.tasks { + task.Do(e.conf.recovery) } - - go func() { - ticker := time.NewTicker(e.conf.purgeInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - e.purgeWorkers() - case <-e.closeCh: - return - } - } - }() } -// AvailableWorkers returns the number of workers available. -func (e *Executor) AvailableWorkers() int { - e.lock.Lock() - defer e.lock.Unlock() - - return len(e.workers) -} - -// Submit submits a task to be handled by workers. -func (e *Executor) Submit(task Task) error { - e.lock.Lock() - - if e.closed { - e.lock.Unlock() - - return ErrExecutorIsClosed +// Submit submits a task to executor and returns an error if failed. +func (e *Executor) Submit(ctx context.Context, task Task) error { + if e.closed.Load() { + return ErrExecutorClosed } - worker := e.scheduler.Get() - if worker == nil { - e.lock.Unlock() - - return ErrWorkerIsNil + select { + case e.tasks <- task: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-e.done: + return ErrExecutorClosed } +} - // 1. We don't need to create a new worker if we got a worker with no tasks. - // 2. The number of workers has reached the limit, so we can only use the worker we got. - if worker.WaitingTasks() > 0 && len(e.workers) < e.conf.workerNum { - worker = e.spawnWorker() +// Close closes the executor and returns an error if failed. +func (e *Executor) Close() error { + if !e.closed.CompareAndSwap(false, true) { + return nil } - if e.purgeActive() { - worker.SetAcceptTime(e.conf.now()) - } + close(e.done) + close(e.tasks) - e.lock.Unlock() - worker.Accept(task) + e.group.Wait() return nil } - -// Wait waits all tasks to be handled. -func (e *Executor) Wait() { - e.wg.Wait() -} - -// Close closes the executor after handling all tasks. -func (e *Executor) Close() { - e.lock.Lock() - defer e.lock.Unlock() - - for _, worker := range e.workers { - worker.Done() - } - - close(e.closeCh) - e.closed = true - e.workers = nil - e.scheduler.Set(nil) - e.Wait() -} diff --git a/executor_test.go b/executor_test.go index f3cbb50..74dce2e 100644 --- a/executor_test.go +++ b/executor_test.go @@ -5,70 +5,25 @@ package goes import ( - "fmt" + "context" "sync" "testing" "time" ) -// go test -v -cover -run=^TestNewExecutor$ -func TestNewExecutor(t *testing.T) { - workerNum := 16 - - t.Run("ok", func(t *testing.T) { - defer func() { - if r := recover(); r != nil { - t.Fatalf("r should not panic") - } - }() - - executor := NewExecutor(workerNum, WithWorkerQueueSize(256), WithPurgeActive(time.Minute, time.Minute)) - defer executor.Close() - }) - - testCase := func(t *testing.T, workerNum int, opts ...Option) { - defer func() { - if r := recover(); r == nil { - t.Fatalf("r should panic") - } - }() - - executor := NewExecutor(workerNum, opts...) - defer executor.Close() - } - - t.Run("worker num", func(t *testing.T) { - testCase(t, 0) - }) - - t.Run("worker queue size", func(t *testing.T) { - testCase(t, workerNum, WithWorkerQueueSize(0)) - }) - - t.Run("purge task 1", func(t *testing.T) { - testCase(t, workerNum, WithPurgeActive(time.Millisecond, 0)) - }) - - t.Run("purge task 2", func(t *testing.T) { - testCase(t, workerNum, WithPurgeActive(0, time.Millisecond)) - }) - - t.Run("purge task 3", func(t *testing.T) { - testCase(t, workerNum, WithPurgeActive(time.Millisecond, time.Millisecond)) - }) -} - // go test -v -cover -run=^TestExecutor$ func TestExecutor(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) + workers := 16 + executor := NewExecutor(uint(workers)) + defer executor.Close() var countMap = make(map[int64]int, 16) var lock sync.Mutex - totalCount := 10 * workerNum + ctx := context.Background() + totalCount := 10 * workers for i := 0; i < totalCount; i++ { - executor.Submit(func() { + executor.Submit(ctx, func() { now := time.Now().UnixMilli() / 10 lock.Lock() @@ -80,14 +35,13 @@ func TestExecutor(t *testing.T) { } executor.Close() - executor.Wait() gotTotalCount := 0 for now, count := range countMap { gotTotalCount = gotTotalCount + count - if count != workerNum { - t.Fatalf("now %d: count %d != workerNum %d", now, count, workerNum) + if count != workers { + t.Fatalf("now %d: count %d != workers %d", now, count, workers) } } @@ -95,144 +49,3 @@ func TestExecutor(t *testing.T) { t.Fatalf("gotTotalCount %d != totalCount %d", gotTotalCount, totalCount) } } - -// go test -v -cover -run=^TestExecutorError$ -func TestExecutorError(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - executor.Close() - - err := executor.Submit(func() {}) - if err != ErrExecutorIsClosed { - t.Fatalf("err %v != ErrExecutorIsClosed %v", err, ErrExecutorIsClosed) - } - - executor = NewExecutor(workerNum) - executor.scheduler = &roundRobinScheduler{} - - err = executor.Submit(func() {}) - if err != ErrWorkerIsNil { - t.Fatalf("err %v != ErrWorkerIsNil %v", err, ErrWorkerIsNil) - } -} - -// go test -v -cover -run=^TestExecutorAvailableWorkers$ -func TestExecutorAvailableWorkers(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - - if len(executor.workers) != 1 { - t.Fatalf("len(executor.workers) %d != 1", len(executor.workers)) - } - - got := executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - executor.workers = make([]*worker, workerNum) - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } -} - -// go test -v -cover -run=^TestExecutorSpawnWorker$ -func TestExecutorSpawnWorker(t *testing.T) { - workerNum := 16 - executor := NewExecutor(workerNum) - - got := executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - for range workerNum { - executor.Submit(func() {}) - time.Sleep(time.Millisecond) - } - - got = executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - for range workerNum * 2 { - executor.Submit(func() { - time.Sleep(time.Millisecond) - }) - } - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } -} - -// go test -v -cover -run=^TestExecutorDynamicScaling$ -func TestExecutorDynamicScaling(t *testing.T) { - testCase := func(t *testing.T, workerNum int) { - executor := NewExecutor(workerNum) - executor.conf.purgeInterval = time.Millisecond - executor.conf.workerLifetime = 2 * time.Millisecond - executor.runPurgeTask() - defer executor.Close() - - got := executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != 1", got) - } - - for range workerNum * 2 { - executor.Submit(func() { - time.Sleep(time.Millisecond) - }) - } - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - time.Sleep(500 * time.Microsecond) - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - time.Sleep(5 * time.Millisecond) - - got = executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - for range workerNum * 2 { - executor.Submit(func() { - time.Sleep(time.Millisecond) - }) - } - - got = executor.AvailableWorkers() - if got != workerNum { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - - time.Sleep(5 * time.Millisecond) - - got = executor.AvailableWorkers() - if got != 1 { - t.Fatalf("got %d != workerNum %d", got, workerNum) - } - } - - workerNums := []int{1, 2, 4, 16, 64, 256, 1024} - for _, workerNum := range workerNums { - name := fmt.Sprintf("worker num %d", workerNum) - t.Run(name, func(t *testing.T) { - testCase(t, workerNum) - }) - } -} diff --git a/go.mod b/go.mod index bd0e63a..828f3cf 100755 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/FishGoddess/goes -go 1.23 +go 1.25 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/limiter.go b/limiter.go index 8eb3fb3..9ea3555 100644 --- a/limiter.go +++ b/limiter.go @@ -1,4 +1,4 @@ -// Copyright 2023 FishGoddess. All rights reserved. +// Copyright 2025 FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. @@ -13,14 +13,18 @@ const ( type token struct{} -// Limiter limits the simultaneous number of goroutines. +// Limiter starts some goroutines to do tasks concurrently. type Limiter struct { + conf *config + tokens chan token - wg sync.WaitGroup + group sync.WaitGroup } // NewLimiter creates a new limiter with limit. -func NewLimiter(limit int) *Limiter { +func NewLimiter(limit uint, opts ...Option) *Limiter { + conf := newConfig().apply(opts...) + if limit < minLimit { limit = minLimit } @@ -30,6 +34,7 @@ func NewLimiter(limit int) *Limiter { } return &Limiter{ + conf: conf, tokens: make(chan token, limit), } } @@ -46,22 +51,22 @@ func (l *Limiter) releaseToken() { } } -// Go starts a goroutine to run f(). -func (l *Limiter) Go(f func()) { +// Go starts a goroutine to run task. +func (l *Limiter) Go(task Task) { l.acquireToken() - l.wg.Add(1) + l.group.Add(1) go func() { defer func() { l.releaseToken() - l.wg.Done() + l.group.Done() }() - f() + task.Do(l.conf.recovery) }() } // Wait waits all goroutines to be finished. func (l *Limiter) Wait() { - l.wg.Wait() + l.group.Wait() } diff --git a/limiter_test.go b/limiter_test.go index 3096605..386be7e 100644 --- a/limiter_test.go +++ b/limiter_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 FishGoddess. All rights reserved. +// Copyright 2025 FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. @@ -13,7 +13,7 @@ import ( // go test -v -cover -run=^TestLimiter$ func TestLimiter(t *testing.T) { limit := 16 - limiter := NewLimiter(limit) + limiter := NewLimiter(uint(limit)) var countMap = make(map[int64]int, 16) var lock sync.Mutex diff --git a/option.go b/option.go index 2924716..f3c6c66 100644 --- a/option.go +++ b/option.go @@ -4,96 +4,38 @@ package goes -import ( - "sync" - "time" - - "github.com/FishGoddess/goes/pkg/spinlock" -) - -// Option is for setting config. -type Option func(conf *config) - -func (o Option) applyTo(conf *config) { - o(conf) +type config struct { + queueSize uint + recovery func(r any) } -// WithWorkerQueueSize sets the queue size of worker. -func WithWorkerQueueSize(size int) Option { - return func(conf *config) { - conf.workerQueueSize = size +func newConfig() *config { + return &config{ + queueSize: 1024, + recovery: nil, } } -// WithPurgeActive sets the purge interval of executor and the lifetime of worker. -func WithPurgeActive(purgeInterval time.Duration, workerLifetime time.Duration) Option { - return func(conf *config) { - conf.purgeInterval = purgeInterval - conf.workerLifetime = workerLifetime +func (c *config) apply(opts ...Option) *config { + for _, opt := range opts { + opt(c) } -} -// WithNowFunc sets the now function. -func WithNowFunc(nowFunc func() time.Time) Option { - return func(conf *config) { - conf.nowFunc = nowFunc - } -} - -// WithRecoverFunc sets the recover function. -func WithRecoverFunc(recoverFunc func(r any)) Option { - return func(conf *config) { - conf.recoverFunc = recoverFunc - } -} - -// WithNewLockerFunc sets the new locker function. -func WithNewLockerFunc(newLockerFunc func() sync.Locker) Option { - return func(conf *config) { - conf.newLockerFunc = newLockerFunc - } + return c } -// WithSpinLock sets the new locker function returns spin lock. -func WithSpinLock() Option { - newLockerFunc := func() sync.Locker { - return spinlock.New() - } - - return func(conf *config) { - conf.newLockerFunc = newLockerFunc - } -} - -// WithSyncMutex sets the new locker function returns sync mutex. -func WithSyncMutex() Option { - newLockerFunc := func() sync.Locker { - return new(sync.Mutex) - } - - return func(conf *config) { - conf.newLockerFunc = newLockerFunc - } -} - -// WithRoundRobinScheduler sets the new scheduler function using round robin strategy. -func WithRoundRobinScheduler() Option { - newSchedulerFunc := func(workers ...*worker) scheduler { - return newRoundRobinScheduler(workers...) - } +type Option func(conf *config) +// WithQueueSize sets the queue size. +func WithQueueSize(queueSize uint) Option { return func(conf *config) { - conf.newSchedulerFunc = newSchedulerFunc + conf.queueSize = queueSize } } -// WithRandomScheduler sets the new scheduler function using random strategy. -func WithRandomScheduler() Option { - newSchedulerFunc := func(workers ...*worker) scheduler { - return newRandomScheduler(workers...) - } - +// WithRecovery sets the recovery function. +func WithRecovery(recovery func(r any)) Option { return func(conf *config) { - conf.newSchedulerFunc = newSchedulerFunc + conf.recovery = recovery } } diff --git a/option_test.go b/option_test.go index de3c571..f981cc4 100644 --- a/option_test.go +++ b/option_test.go @@ -6,138 +6,31 @@ package goes import ( "fmt" - "sync" "testing" - "time" - - "github.com/FishGoddess/goes/pkg/spinlock" ) -// go test -v -cover -run=^TestWithWorkerQueueSize$ -func TestWithWorkerQueueSize(t *testing.T) { - workerNum := 16 - workerQueueSize := 256 - conf := newDefaultConfig(workerNum) - WithWorkerQueueSize(workerQueueSize)(conf) - - if conf.workerQueueSize != workerQueueSize { - t.Fatalf("conf.workerQueueSize %d != workerQueueSize %d", conf.workerQueueSize, workerQueueSize) - } -} - -// go test -v -cover -run=^TestWithPurgeActive$ -func TestWithPurgeActive(t *testing.T) { - workerNum := 16 - purgeInterval := time.Minute - workerLifetime := 3 * time.Minute - conf := newDefaultConfig(workerNum) - WithPurgeActive(purgeInterval, workerLifetime)(conf) - - if conf.purgeInterval != purgeInterval { - t.Fatalf("conf.purgeInterval %d != purgeInterval %d", conf.purgeInterval, purgeInterval) - } - - if conf.workerLifetime != workerLifetime { - t.Fatalf("conf.workerLifetime %d != workerLifetime %d", conf.workerLifetime, workerLifetime) - } -} - -// go test -v -cover -run=^TestWithNowFunc$ -func TestWithNowFunc(t *testing.T) { - workerNum := 16 - nowFunc := func() time.Time { return time.Now() } - conf := newDefaultConfig(workerNum) - WithNowFunc(nowFunc)(conf) - - if fmt.Sprintf("%p", conf.nowFunc) != fmt.Sprintf("%p", nowFunc) { - t.Fatalf("conf.nowFunc %p != nowFunc %p", conf.nowFunc, nowFunc) - } -} - -// go test -v -cover -run=^TestWithRecoverFunc$ -func TestWithRecoverFunc(t *testing.T) { - workerNum := 16 - recoverFunc := func(r any) {} - conf := newDefaultConfig(workerNum) - WithRecoverFunc(recoverFunc)(conf) - - if fmt.Sprintf("%p", conf.recoverFunc) != fmt.Sprintf("%p", recoverFunc) { - t.Fatalf("conf.recoverFunc %p != recoverFunc %p", conf.recoverFunc, recoverFunc) - } -} +// go test -v -cover -run=^TestWithQueueSize$ +func TestWithQueueSize(t *testing.T) { + conf := &config{queueSize: 0} -// go test -v -cover -run=^TestWithNewLockerFunc$ -func TestWithNewLockerFunc(t *testing.T) { - workerNum := 16 - newLockerFunc := func() sync.Locker { return nil } - conf := newDefaultConfig(workerNum) - WithNewLockerFunc(newLockerFunc)(conf) + queueSize := uint(1024) + WithQueueSize(queueSize)(conf) - if fmt.Sprintf("%p", conf.newLockerFunc) != fmt.Sprintf("%p", newLockerFunc) { - t.Fatalf("conf.newLockerFunc %p != newLockerFunc %p", conf.newLockerFunc, newLockerFunc) + if conf.queueSize != queueSize { + t.Fatalf("got %d != want %d", conf.queueSize, queueSize) } } -// go test -v -cover -run=^TestWithSpinLock$ -func TestWithSpinLock(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithSpinLock()(conf) +// go test -v -cover -run=^WithRecovery$ +func TestWithRecovery(t *testing.T) { + conf := &config{recovery: nil} - got := conf.newLockerFunc() - if _, ok := got.(*spinlock.Lock); !ok { - t.Fatalf("got %T is not *spinlock.Lock", got) - } -} - -// go test -v -cover -run=^TestWithSyncMutex$ -func TestWithSyncMutex(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithSyncMutex()(conf) - - got := conf.newLockerFunc() - if _, ok := got.(*sync.Mutex); !ok { - t.Fatalf("got %T is not *sync.Mutex", got) - } -} - -// go test -v -cover -run=^TestWithRoundRobinScheduler$ -func TestWithRoundRobinScheduler(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithRoundRobinScheduler()(conf) - - workers := make([]*worker, 0, workerNum) - got := conf.newSchedulerFunc(workers...) - - scheduler, ok := got.(*roundRobinScheduler) - if !ok { - t.Fatalf("got %T is not *roundRobinScheduler", got) - } - - gotCap := cap(scheduler.workers) - if gotCap != workerNum { - t.Fatalf("gotCap %d != workerNum %d", gotCap, workerNum) - } -} - -// go test -v -cover -run=^TestWithRandomScheduler$ -func TestWithRandomScheduler(t *testing.T) { - workerNum := 16 - conf := newDefaultConfig(workerNum) - WithRandomScheduler()(conf) - - workers := make([]*worker, 0, workerNum) - got := conf.newSchedulerFunc(workers...) - - scheduler, ok := got.(*randomScheduler) - if !ok { - t.Fatalf("got %T is not *randomScheduler", got) - } + recovery := func(r any) {} + WithRecovery(recovery)(conf) - gotCap := cap(scheduler.workers) - if gotCap != workerNum { - t.Fatalf("gotCap %d != workerNum %d", gotCap, workerNum) + got := fmt.Sprintf("%p", conf.recovery) + want := fmt.Sprintf("%p", recovery) + if got != want { + t.Fatalf("got %s != want %s", got, want) } } diff --git a/pkg/spinlock/spin_lock.go b/pkg/spinlock/spin_lock.go deleted file mode 100644 index 26ca2bb..0000000 --- a/pkg/spinlock/spin_lock.go +++ /dev/null @@ -1,36 +0,0 @@ -package spinlock - -import ( - "runtime" - "sync" - "sync/atomic" -) - -const maxBackoff = 16 - -type Lock uint32 - -// New creates a new spin lock. -func New() sync.Locker { - return new(Lock) -} - -// Lock locks with the spin lock. -func (l *Lock) Lock() { - backoff := 1 - - for !atomic.CompareAndSwapUint32((*uint32)(l), 0, 1) { - for i := 0; i < backoff; i++ { - runtime.Gosched() - } - - if backoff < maxBackoff { - backoff <<= 1 - } - } -} - -// Unlock unlocks with the spin lock. -func (l *Lock) Unlock() { - atomic.StoreUint32((*uint32)(l), 0) -} diff --git a/pkg/spinlock/spin_lock_test.go b/pkg/spinlock/spin_lock_test.go deleted file mode 100644 index 0a4eb4d..0000000 --- a/pkg/spinlock/spin_lock_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package spinlock - -import ( - "sync" - "testing" - "time" -) - -// go test -v -cover -run=^TestSpinLock$ -func TestSpinLock(t *testing.T) { - lock := New() - if _, ok := lock.(*Lock); !ok { - t.Fatalf("lock %T is not *Lock", lock) - } - - got := 0 - want := 10000 - - var wg sync.WaitGroup - for range want { - wg.Add(1) - go func() { - defer wg.Done() - - lock.Lock() - got++ - lock.Unlock() - }() - } - - wg.Wait() - if got != want { - t.Fatalf("got %d != want %d", got, want) - } -} - -func benchmarkLock(b *testing.B, lock sync.Locker) { - b.SetParallelism(1024) - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - lock.Lock() - time.Sleep(50 * time.Microsecond) - lock.Unlock() - } - }) -} - -// go test -v -run=none -bench=^BenchmarkMutex$ -benchmem -benchtime=1s -func BenchmarkMutex(b *testing.B) { - var mu sync.Mutex - benchmarkLock(b, &mu) -} - -// go test -v -run=none -bench=^BenchmarkSpinLock$ -benchmem -benchtime=1s -func BenchmarkSpinLock(b *testing.B) { - var spin Lock - benchmarkLock(b, &spin) -} diff --git a/random.go b/random.go deleted file mode 100644 index ae8ec3f..0000000 --- a/random.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "math/rand" - "time" -) - -type randomScheduler struct { - workers []*worker - random *rand.Rand -} - -func newRandomScheduler(workers ...*worker) *randomScheduler { - scheduler := &randomScheduler{ - workers: workers, - random: rand.New(rand.NewSource(time.Now().Unix())), - } - - return scheduler -} - -// Set sets the workers to scheduler. -func (rs *randomScheduler) Set(workers []*worker) { - rs.workers = workers -} - -// Get gets a worker from scheduler. -func (rs *randomScheduler) Get() *worker { - if len(rs.workers) <= 0 { - return nil - } - - index := rs.random.Intn(len(rs.workers)) - worker := rs.workers[index] - return worker -} diff --git a/random_test.go b/random_test.go deleted file mode 100644 index 56f2fe0..0000000 --- a/random_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "testing" -) - -// go test -v -cover -run=^TestRandomScheduler$ -func TestRandomScheduler(t *testing.T) { - workerNum := 16 - workers := make([]*worker, 0, workerNum) - for range workerNum { - workers = append(workers, new(worker)) - } - - scheduler := newRandomScheduler(workers...) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got := len(scheduler.workers) - want := len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - scheduler.Set(workers) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got = len(scheduler.workers) - want = len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - for i, worker := range workers { - got := scheduler.workers[i] - if got != worker { - t.Fatalf("got %p != worker %p", got, worker) - } - } - - for range workers { - gotNext := scheduler.Get() - if gotNext == nil { - t.Fatal("gotNext is nil") - } - } -} diff --git a/round_robin.go b/round_robin.go deleted file mode 100644 index 333727c..0000000 --- a/round_robin.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -type roundRobinScheduler struct { - workers []*worker - index int -} - -func newRoundRobinScheduler(workers ...*worker) *roundRobinScheduler { - scheduler := &roundRobinScheduler{ - workers: workers, - index: -1, - } - - return scheduler -} - -// Set sets the workers to scheduler. -func (rrs *roundRobinScheduler) Set(workers []*worker) { - rrs.workers = workers -} - -// Get gets a worker from scheduler. -func (rrs *roundRobinScheduler) Get() *worker { - if len(rrs.workers) <= 0 { - return nil - } - - if rrs.index++; rrs.index >= len(rrs.workers) { - rrs.index = 0 - } - - worker := rrs.workers[rrs.index] - return worker -} diff --git a/round_robin_test.go b/round_robin_test.go deleted file mode 100644 index 5e2363b..0000000 --- a/round_robin_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "fmt" - "testing" -) - -// go test -v -cover -run=^TestRoundRobinScheduler$ -func TestRoundRobinScheduler(t *testing.T) { - workerNum := 16 - workers := make([]*worker, 0, workerNum) - for range workerNum { - workers = append(workers, new(worker)) - } - - scheduler := newRoundRobinScheduler(workers...) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got := len(scheduler.workers) - want := len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - scheduler.Set(workers) - if fmt.Sprintf("%p", scheduler.workers) != fmt.Sprintf("%p", workers) { - t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) - } - - got = len(scheduler.workers) - want = len(workers) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - for i, worker := range workers { - got := scheduler.workers[i] - if got != worker { - t.Fatalf("got %p != worker %p", got, worker) - } - } - - for _, worker := range workers { - gotNext := scheduler.Get() - if gotNext != worker { - t.Fatalf("gotNext %p != worker %p", gotNext, worker) - } - } -} diff --git a/task.go b/task.go new file mode 100644 index 0000000..0be3da7 --- /dev/null +++ b/task.go @@ -0,0 +1,23 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +type Task func() + +func (t Task) Do(recovery func(r any)) { + if t == nil { + return + } + + if recovery != nil { + defer func() { + if r := recover(); r != nil { + recovery(r) + } + }() + } + + t() +} diff --git a/task_test.go b/task_test.go new file mode 100644 index 0000000..545426b --- /dev/null +++ b/task_test.go @@ -0,0 +1,64 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package goes + +import "testing" + +// go test -v -cover -run=^TestTask$ +func TestTask(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + + var done bool = false + var task Task = func() { done = true } + task.Do(nil) + + if !done { + t.Fatalf("%+v is wrong", done) + } +} + +// go test -v -cover -run=^TestTaskNil$ +func TestTaskNil(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + + var task Task = nil + task.Do(nil) +} + +// go test -v -cover -run=^TestTaskPanic$ +func TestTaskPanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("task should panic") + } + }() + + var task Task = func() { panic("wow") } + task.Do(nil) +} + +// go test -v -cover -run=^TestTaskRecovery$ +func TestTaskRecovery(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + + var task Task = func() { panic("wow") } + task.Do(func(r any) { + if r != "wow" { + t.Fatalf("r %+v is wrong", r) + } + }) +} diff --git a/worker.go b/worker.go deleted file mode 100644 index 9c42bdb..0000000 --- a/worker.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import "time" - -type scheduler interface { - // Set sets the workers to scheduler. - Set(workers []*worker) - - // Get gets a worker from scheduler. - Get() *worker -} - -type worker struct { - executor *Executor - taskQueue chan Task - acceptTime time.Time -} - -func newWorker(executor *Executor) *worker { - w := &worker{ - executor: executor, - taskQueue: make(chan Task, executor.conf.workerQueueSize), - } - - w.work() - return w -} - -// WaitingTasks returns the number of tasks waiting. -func (w *worker) WaitingTasks() int { - return len(w.taskQueue) -} - -// AcceptTime returns the accept time of worker. -func (w *worker) AcceptTime() time.Time { - return w.acceptTime -} - -// SetAcceptTime sets the accept time of worker. -func (w *worker) SetAcceptTime(t time.Time) { - w.acceptTime = t -} - -func (w *worker) handle(task Task) { - if w.executor.conf.recoverable() { - defer func() { - if r := recover(); r != nil { - w.executor.conf.recover(r) - } - }() - } - - task() -} - -func (w *worker) work() { - w.executor.wg.Add(1) - go func() { - defer w.executor.wg.Done() - defer close(w.taskQueue) - - for task := range w.taskQueue { - if task == nil { - break - } - - w.handle(task) - } - }() -} - -// Accept accepts a task to be handled. -func (w *worker) Accept(task Task) { - if task == nil { - return - } - - w.taskQueue <- task -} - -// Done signals the worker to stop working. -func (w *worker) Done() { - w.taskQueue <- nil -} diff --git a/worker_test.go b/worker_test.go deleted file mode 100644 index 4a710bd..0000000 --- a/worker_test.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package goes - -import ( - "testing" - "time" -) - -// go test -v -cover -run=^TestWorkerHandle$ -func TestWorkerHandle(t *testing.T) { - got := 0 - want := 666 - - executor := &Executor{ - conf: &config{ - recoverFunc: func(r any) { - got = r.(int) - }, - }, - } - - worker := &worker{executor: executor} - worker.handle(func() { - panic(want) - }) - - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - want = 123 - worker.handle(func() { - got = 123 - }) - - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - defer func() { - if r := recover(); r == nil { - t.Fatal("worker.handle should panic") - } - }() - - worker.executor.conf.recoverFunc = nil - worker.handle(func() { - panic(want) - }) -} - -// go test -v -cover -run=^TestWorkerWaitingTasks$ -func TestWorkerWaitingTasks(t *testing.T) { - taskQueue := make(chan Task, 4) - worker := &worker{taskQueue: taskQueue} - - got := worker.WaitingTasks() - want := len(taskQueue) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - got = worker.WaitingTasks() - if got != 0 { - t.Fatalf("got %d != 0", got) - } - - taskQueue <- nil - taskQueue <- nil - - got = worker.WaitingTasks() - want = len(taskQueue) - if got != want { - t.Fatalf("got %d != want %d", got, want) - } - - got = worker.WaitingTasks() - if got != 2 { - t.Fatalf("got %d != 2", got) - } -} - -// go test -v -cover -run=^TestWorkerAcceptTime$ -func TestWorkerAcceptTime(t *testing.T) { - acceptTime := time.Now() - worker := &worker{acceptTime: acceptTime} - - got := worker.AcceptTime() - if got != acceptTime { - t.Fatalf("got %v != acceptTime %v", got, acceptTime) - } - - acceptTime = time.Unix(123456789, 0) - worker.SetAcceptTime(acceptTime) - - got = worker.acceptTime - if got != acceptTime { - t.Fatalf("got %v != acceptTime %v", got, acceptTime) - } - - got = worker.AcceptTime() - if got != acceptTime { - t.Fatalf("got %v != acceptTime %v", got, acceptTime) - } -}