Golang 并发监控值变化
5 min read
轮询
使用 sync/atomic 包对值进行存储和读取,当轮询发现值变化时返回。
简单、无锁,但是非实时,有轮询的延迟。
package polling
import (
"log"
"sync/atomic"
"time"
)
var val int64
func get(oldAddr *int64) int64 {
log.Println("start get")
// 保存值用来比较
oldVal := *oldAddr
for {
log.Println("start for")
newVal := atomic.LoadInt64(oldAddr)
log.Println("newVal =", newVal)
log.Println("oldVal =", oldVal)
if newVal == oldVal {
log.Println("start polling")
time.Sleep(100 * time.Millisecond)
} else {
log.Println("end polling")
log.Println("end for")
log.Println("end get")
return newVal
}
}
}
func set(newVal int64) {
log.Println("start set")
log.Println("start change val")
atomic.StoreInt64(&val, newVal)
log.Println("end change val")
log.Println("end set")
}
package polling
import (
"log"
"testing"
"time"
)
func TestPolling(t *testing.T) {
go func() {
time.Sleep(2 * time.Second)
set(1)
}()
go func() {
newVal := get(&val)
log.Println(newVal)
}()
time.Sleep(3 * time.Second)
}
日志如下,已删除部分重复数据:
2025/05/07 18:58:42 start get
2025/05/07 18:58:42 start for
2025/05/07 18:58:42 newVal = 0
2025/05/07 18:58:42 oldVal = 0
2025/05/07 18:58:42 start polling
2025/05/07 18:58:42 start for
2025/05/07 18:58:42 newVal = 0
2025/05/07 18:58:42 oldVal = 0
2025/05/07 18:58:44 start polling
2025/05/07 18:58:44 start set
2025/05/07 18:58:44 start change val
2025/05/07 18:58:44 end change val
2025/05/07 18:58:44 end set
2025/05/07 18:58:44 start for
2025/05/07 18:58:44 newVal = 1
2025/05/07 18:58:44 oldVal = 0
2025/05/07 18:58:44 end polling
2025/05/07 18:58:44 end for
2025/05/07 18:58:44 end get
2025/05/07 18:58:44 1
条件变量
先新建一个互斥锁,把锁放进 sync.Cond,通过 Wait()
阻塞,值修改后通过 Broadcast()
广播。
sync.Cond 容易出错,不建议使用,而且需要用户自己完成部分同步工作,详见:Go advanced concurrency patterns: part 3 (channels)
package cond
import (
"log"
"sync"
)
var val int
var mu = new(sync.Mutex)
// 赋值 mu 给 cond 内部的锁
var cond = sync.NewCond(mu)
func get(old int) int {
log.Println("start get")
mu.Lock()
defer mu.Unlock()
// val 和 old 相同时,即未改变时,直接进入 for 等待改变
// val 和 old 不相同时,直接返回 val
for val == old {
log.Println("start for")
// 阻塞,接收到广播才会停止阻塞
// cond.Wait() 之前必须获取到 mu 锁
log.Println("start wait")
cond.Wait()
log.Println("end wait")
}
log.Println("end for")
log.Println("end get")
return val
}
func set(newVal int) {
log.Println("start set")
// 这里不会死锁吗?
// 不会,cond.Wait() 里面每隔一段系统内置的时间,就会解除 mu 的锁
mu.Lock()
defer mu.Unlock()
log.Println("start change val")
val = newVal
log.Println("end change val")
// 发生消息到全部,即 cond.Wait()
log.Println("start broadcast")
cond.Broadcast()
log.Println("end broadcast")
log.Println("end set")
}
package cond
import (
"log"
"testing"
"time"
)
func TestCond(t *testing.T) {
go func() {
time.Sleep(2 * time.Second)
set(1)
}()
go func() {
newVal := get(val)
log.Println(newVal)
}()
time.Sleep(3 * time.Second)
}
日志如下:
2025/05/07 15:44:15 start get
2025/05/07 15:44:15 start for
2025/05/07 15:44:15 start wait
2025/05/07 15:44:17 start set
2025/05/07 15:44:17 start change val
2025/05/07 15:44:17 end change val
2025/05/07 15:44:17 start broadcast
2025/05/07 15:44:17 end broadcast
2025/05/07 15:44:17 end set
2025/05/07 15:44:17 end wait
2025/05/07 15:44:17 end for
2025/05/07 15:44:17 end get
2025/05/07 15:44:17 1
channel
类似 sync.Cond,但通过 select
阻塞,值修改后通过 close channel
广播。
package channel
import (
"context"
"log"
"sync"
)
var val int
var mu sync.Mutex
var done = make(chan struct{})
func get(ctx context.Context, old int) (int, bool) {
log.Println("start get")
mu.Lock()
// 获取值的复制
v, ch := val, done
mu.Unlock()
// val 和 old 相同时,即未改变时,直接进入 for
// val 和 old 不相同时,直接返回 val
for v == old {
log.Println("start for")
log.Println("start wait")
select {
case <-ctx.Done():
return 0, false // false = timeout
case <-ch:
mu.Lock()
log.Println("start get val")
v, ch = val, done
log.Println("end get val")
mu.Unlock()
}
log.Println("end wait")
}
log.Println("end for")
log.Println("end get")
return v, true
}
func set(newVal int) {
log.Println("start set")
mu.Lock()
defer mu.Unlock()
log.Println("start change val")
val = newVal
log.Println("end change val")
log.Println("start close chan")
close(done)
log.Println("end close chan")
// 因为 close 了的通道无法再次使用,这里重新 make 一个
done = make(chan struct{})
}
package channel
import (
"context"
"log"
"testing"
"time"
)
func TestChannel(t *testing.T) {
go func() {
time.Sleep(2 * time.Second)
set(1)
}()
go func() {
ctx := context.Background()
newVal, ok := get(ctx, val)
log.Println(newVal)
log.Println(ok)
}()
time.Sleep(3 * time.Second)
}
日志如下:
2025/05/07 15:54:08 start get
2025/05/07 15:54:08 start for
2025/05/07 15:54:08 start wait
2025/05/07 15:54:10 start set
2025/05/07 15:54:10 start change val
2025/05/07 15:54:10 end change val
2025/05/07 15:54:10 start close chan
2025/05/07 15:54:10 end close chan
2025/05/07 15:54:10 start get val
2025/05/07 15:54:10 end get val
2025/05/07 15:54:10 end wait
2025/05/07 15:54:10 end for
2025/05/07 15:54:10 end get
2025/05/07 15:54:10 1
2025/05/07 15:54:10 true