From 2394be094bc6d4c28b83baca6e9c8a49db56a8e1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 12 Oct 2025 09:36:02 +0000 Subject: [PATCH 1/2] Initial plan From 7e909f9f53e21029c7593b84b0e80e46a46d1922 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 12 Oct 2025 09:43:39 +0000 Subject: [PATCH 2/2] Fix memory leaks, race conditions, and improve code robustness Co-authored-by: uzziahlin <120019273+uzziahlin@users.noreply.github.com> --- OPTIMIZATIONS.md | 187 ++++++++++++++++++++++++++++++++++++ main.go => examples/main.go | 9 +- redis/client.go | 44 +++++++-- redis/lock.go | 46 +++++++-- 4 files changed, 263 insertions(+), 23 deletions(-) create mode 100644 OPTIMIZATIONS.md rename main.go => examples/main.go (67%) diff --git a/OPTIMIZATIONS.md b/OPTIMIZATIONS.md new file mode 100644 index 0000000..06b5a4e --- /dev/null +++ b/OPTIMIZATIONS.md @@ -0,0 +1,187 @@ +# 代码优化说明 (Code Optimizations) + +本文档记录了对 go-lock 项目所做的优化改进。 + +## 优化清单 + +### 1. 修复导入循环问题 +**问题**: `main.go` 在根包 `lock` 中,与 `redis` 包中的测试文件形成导入循环。 +**解决方案**: +- 将 `main.go` 移至 `examples/` 目录 +- 修改包名为 `package main` +- 这样既避免了导入循环,又提供了使用示例 + +**影响**: 修复了测试无法运行的问题 + +--- + +### 2. 修复 time.Tick 内存泄漏 +**问题**: `startWatchDog` 函数使用 `time.Tick()` 创建 ticker,该 ticker 无法被垃圾回收,造成内存泄漏。 +**解决方案**: +- 使用 `time.NewTicker()` 替代 `time.Tick()` +- 添加 `defer ticker.Stop()` 确保资源释放 + +```go +// 修改前 +tick := time.Tick(expire / 3) + +// 修改后 +ticker := time.NewTicker(expire / 3) +defer ticker.Stop() +``` + +**影响**: 防止长时间运行时的内存泄漏 + +--- + +### 3. 修复 time.After 内存泄漏 +**问题**: `Lock()` 函数中使用 `time.After()` 在循环中可能导致大量 timer 堆积。 +**解决方案**: +- 使用 `time.NewTimer()` 替代 `time.After()` +- 在不再需要时调用 `timer.Stop()` 释放资源 + +```go +// 修改前 +case <-time.After(time.Duration(val) * time.Millisecond): + +// 修改后 +timer := time.NewTimer(time.Duration(val) * time.Millisecond) +select { +case v := <-sub: + timer.Stop() + // ... +case <-timer.C: +case <-ctx.Done(): + timer.Stop() + // ... +} +``` + +**影响**: 防止在高并发场景下的内存泄漏 + +--- + +### 4. 添加互斥锁保护 doneC +**问题**: `doneC` 的初始化和关闭存在并发访问,可能导致竞态条件。 +**解决方案**: +- 在 `Lock` 结构体中添加 `mu sync.Mutex` +- 使用互斥锁保护 `doneC` 的读写操作 + +```go +type Lock struct { + // ... + doneC chan struct{} + mu sync.Mutex // 新增 +} + +// 在访问 doneC 时加锁 +l.mu.Lock() +if l.doneC == nil { + l.doneC = make(chan struct{}) +} +l.mu.Unlock() +``` + +**影响**: 消除竞态条件,提高并发安全性 + +--- + +### 5. 添加类型断言安全检查 +**问题**: `Lock()` 函数中直接进行类型断言 `res.Val.(int64)`,如果类型不匹配会 panic。 +**解决方案**: +- 使用安全的类型断言 `val, ok := res.Val.(int64)` +- 检查断言结果,失败时返回错误 + +```go +// 修改前 +val := res.Val.(int64) + +// 修改后 +val, ok := res.Val.(int64) +if !ok { + return errors.New("unexpected return type from lock script") +} +``` + +**影响**: 提高代码健壮性,避免运行时 panic + +--- + +### 6. 优化 goroutine 和 channel 资源管理 + +#### 6.1 Subscribe 方法优化 +**问题**: goroutine 可能因为 channel 阻塞而泄漏。 +**解决方案**: +- 使用缓冲 channel `make(chan Sub, 1)` +- 添加 `defer close(c)` 确保 channel 关闭 +- 在发送时检查 channel 关闭状态 +- 在所有发送操作中添加 `ctx.Done()` 检查,防止阻塞 + +```go +func (l *Lock) Subscribe(ctx context.Context) <-chan Sub { + c := make(chan Sub, 1) // 使用缓冲 channel + go func() { + defer close(c) // 确保 channel 关闭 + // ... + select { + case c <- Sub{}: + case <-ctx.Done(): + return + } + }() + return c +} +``` + +#### 6.2 DefaultClient.Subscribe 方法优化 +**问题**: goroutine 和 Redis 订阅连接未正确清理。 +**解决方案**: +- 添加 `defer sub.Close()` 确保 Redis 订阅关闭 +- 添加 `defer close(res)` 确保 channel 关闭 +- 在所有发送操作中添加 `ctx.Done()` 检查 +- 使用缓冲 channel 防止阻塞 + +```go +func (d DefaultClient) Subscribe(ctx context.Context, channels ...string) (chan any, error) { + res := make(chan any, 1) // 使用缓冲 channel + sub := d.client.Subscribe(ctx, channels...) + + go func() { + defer close(res) // 确保 channel 关闭 + defer sub.Close() // 确保订阅关闭 + // ... + select { + case res <- msg: + case <-ctx.Done(): + return + } + }() + return res, nil +} +``` + +**影响**: 防止 goroutine 泄漏和资源泄漏 + +--- + +## 优化总结 + +这些优化主要解决了以下问题: +1. **内存泄漏**: 修复了 timer 和 goroutine 泄漏问题 +2. **并发安全**: 添加了互斥锁保护共享资源 +3. **资源管理**: 确保所有资源(channel、goroutine、订阅)正确清理 +4. **健壮性**: 添加了类型检查,避免运行时 panic +5. **可测试性**: 修复了导入循环,使测试可以正常运行 + +所有优化都经过了以下验证: +- `go build ./...` - 编译通过 +- `go vet ./...` - 静态分析通过 +- `go test -race` - 竞态检测通过 + +## 建议 + +为了进一步提高代码质量,建议: +1. 添加单元测试(不依赖真实 Redis) +2. 添加基准测试以验证性能 +3. 考虑添加配置项控制 watchdog 的刷新频率 +4. 考虑添加指标收集(metrics)以监控锁的使用情况 diff --git a/main.go b/examples/main.go similarity index 67% rename from main.go rename to examples/main.go index 70e7172..1577540 100644 --- a/main.go +++ b/examples/main.go @@ -1,20 +1,21 @@ -package lock +package main import ( "context" redisv9 "github.com/redis/go-redis/v9" + "github.com/uzziahlin/go-lock" "github.com/uzziahlin/go-lock/redis" ) func main() { - var lock Lock + var l lock.Lock client := redis.NewDefaultClient(&redisv9.Options{ Addr: "your redis address", Password: "you redis password", DB: 0, }) - lock = redis.NewLock("lockName", redis.WithClient(client)) - err := lock.Lock(context.TODO()) + l = redis.NewLock("lockName", redis.WithClient(client)) + err := l.Lock(context.TODO()) if err != nil { // todo 加锁失败 } diff --git a/redis/client.go b/redis/client.go index 6a85ea4..1ba6b1f 100644 --- a/redis/client.go +++ b/redis/client.go @@ -56,43 +56,69 @@ func (d DefaultClient) Eval(ctx context.Context, script string, keys []string, v } func (d DefaultClient) Subscribe(ctx context.Context, channels ...string) (chan any, error) { - res := make(chan any) + res := make(chan any, 1) sub := d.client.Subscribe(ctx, channels...) go func() { + defer close(res) + defer sub.Close() + for { receive, err := sub.Receive(ctx) if err != nil { - res <- err + select { + case res <- err: + case <-ctx.Done(): + } return } switch v := receive.(type) { case *redis.Subscription: - res <- &Subscription{ + select { + case res <- &Subscription{ Kind: v.Kind, Channel: v.Channel, Count: v.Count, + }: + case <-ctx.Done(): + return } case *redis.Message: - res <- &Message{ + select { + case res <- &Message{ Channel: v.Channel, Pattern: v.Pattern, Data: []byte(v.Payload), + }: + case <-ctx.Done(): + return } for _, p := range v.PayloadSlice { - res <- &Message{ + select { + case res <- &Message{ Channel: v.Channel, Pattern: v.Pattern, Data: []byte(p), + }: + case <-ctx.Done(): + return } } - case *Pong: - res <- &Pong{ - Data: v.Data, + case *redis.Pong: + select { + case res <- &Pong{ + Data: v.Payload, + }: + case <-ctx.Done(): + return } case error: - res <- v + select { + case res <- v: + case <-ctx.Done(): + } + return } } }() diff --git a/redis/lock.go b/redis/lock.go index a157f01..046aa3d 100644 --- a/redis/lock.go +++ b/redis/lock.go @@ -42,6 +42,8 @@ type Lock struct { hasDog uint32 // 通知看门狗退出 doneC chan struct{} + // 保护doneC的互斥锁 + mu sync.Mutex } type LockOption func(lock *Lock) @@ -100,7 +102,10 @@ func (l *Lock) Lock(ctx context.Context) error { return res.Err } - val := res.Val.(int64) + val, ok := res.Val.(int64) + if !ok { + return errors.New("unexpected return type from lock script") + } // 如果val == -1 说明已经拿到锁了 if val == -1 { @@ -123,13 +128,16 @@ func (l *Lock) Lock(ctx context.Context) error { // 有2种情况发生会重新进行抢锁:1.收到channel的释放锁通知 2.阻塞上边获取的redis里该锁的过期时间后 // 如果设置了超时时间,则可能会超时 + timer := time.NewTimer(time.Duration(val) * time.Millisecond) select { case v := <-sub: + timer.Stop() if v.Err != nil { return v.Err } - case <-time.After(time.Duration(val) * time.Millisecond): + case <-timer.C: case <-ctx.Done(): + timer.Stop() return ctx.Err() } } @@ -142,8 +150,9 @@ type Sub struct { // Subscribe 订阅redis特定的channel,当channel有特定消息写入时会通过chan返回来 func (l *Lock) Subscribe(ctx context.Context) <-chan Sub { - c := make(chan Sub) + c := make(chan Sub, 1) go func() { + defer close(c) sub, err := l.client.Subscribe(ctx, fmt.Sprintf("%s%s", LockChannelPrefix, l.Name)) if err != nil { c <- Sub{Err: err} @@ -151,18 +160,27 @@ func (l *Lock) Subscribe(ctx context.Context) <-chan Sub { } for { select { - case val := <-sub: + case val, ok := <-sub: + if !ok { + return + } switch v := val.(type) { case *Subscription: case *Message: - c <- Sub{} + select { + case c <- Sub{}: + case <-ctx.Done(): + return + } case *Pong: case error: - c <- Sub{Err: v} + select { + case c <- Sub{Err: v}: + case <-ctx.Done(): + } return } case <-ctx.Done(): - // c <- Sub{Err: ctx.Err()} return } } @@ -174,9 +192,11 @@ func (l *Lock) Unlock(ctx context.Context) error { defer func() { if atomic.LoadUint32(&l.hasDog) == 1 { atomic.CompareAndSwapUint32(&l.hasDog, 1, 0) + l.mu.Lock() if l.doneC != nil { close(l.doneC) } + l.mu.Unlock() } }() @@ -197,21 +217,27 @@ func (l *Lock) Unlock(ctx context.Context) error { // startWatchDog 当用户没有设置锁过期时间,会自动开启开门狗机制 // 默认情况下会给锁设置30秒的过期时间,并且三分之一过期时间时会定期检测锁是否还存在,如果存在则会进行锁续约 func (l *Lock) startWatchDog(ctx context.Context, expireTime time.Duration) { + l.mu.Lock() if l.doneC == nil { l.doneC = make(chan struct{}) } - go func(expire time.Duration) { + l.mu.Unlock() + go func(expire time.Duration) { defer func() { + l.mu.Lock() l.doneC = nil + l.mu.Unlock() }() - tick := time.Tick(expire / 3) + ticker := time.NewTicker(expire / 3) + defer ticker.Stop() + for { select { case <-l.doneC: return - case <-tick: + case <-ticker.C: if err := l.Expire(ctx, expire); err != nil { return }