原子操作

原子操作

所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。通常,如果多个并发体对同一个共享资源进行的操作是原子的话,那么同一时刻最多只能有一个并发体对该资源进行操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。

Mutex

一般情况下,原子操作都是通过“互斥”访问来保证的,通常由特殊的 CPU 指令提供保护。当然,如果仅仅是想模拟下粗粒度的原子操作,我们可以借助于 sync.Mutex 来实现:

import (
	"sync"
)

var total struct {
	sync.Mutex
	value int
}

func worker(wg *sync.WaitGroup) {
	defer wg.Done()

	for i := 0; i <= 100; i++ {
		total.Lock()
		total.value += i
		total.Unlock()
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	go worker(&wg)
	go worker(&wg)
	wg.Wait()

	fmt.Println(total.value)
}

sync/atomic

用互斥锁来保护一个数值型的共享资源,麻烦且效率低下。标准库的 sync/atomic 包对原子操作提供了丰富的支持。我们可以重新实现上面的例子:

import (
	"sync"
	"sync/atomic"
)

var total uint64

func worker(wg *sync.WaitGroup) {
	defer wg.Done()

	var i uint64
	for i = 0; i <= 100; i++ {
		atomic.AddUint64(&total, i)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	go worker(&wg)
	go worker(&wg)
	wg.Wait()
}

atomic.AddUint64 函数调用保证了 total 的读取、更新和保存是一个原子操作,因此在多线程中访问也是安全的。sync/atomic 包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value 原子对象提供了 Load 和 Store 两个原子方法,分别用于加载和保存数据,返回值和参数都是 interface{}类型,因此可以用于任意的自定义复杂类型。

var config atomic.Value // 保存当前配置信息

// 初始化配置信息
config.Store(loadConfig())

// 启动一个后台线程, 加载更新后的配置信息
go func() {
	for {
		time.Sleep(time.Second)
		config.Store(loadConfig())
	}
}()

// 用于处理请求的工作者线程始终采用最新的配置信息
for i := 0; i < 10; i++ {
	go func() {
		for r := range requests() {
			c := config.Load()
			// ...
		}
	}()
}

这是一个简化的生产者消费者模型:后台线程生成最新的配置信息;前台多个工作者线程获取最新的配置信息。所有线程共享配置信息资源。

Once

原子操作配合互斥锁可以实现非常高效的单件模式。互斥锁的代价比普通整数的原子读写高很多,在性能敏感的地方可以增加一个数字型的标志位,通过原子检测标志位状态降低互斥锁的使用次数来提高性能。

type singleton struct {}

var (
	instance    *singleton
	initialized uint32
	mu          sync.Mutex
)

func Instance() *singleton {
	if atomic.LoadUint32(&initialized) == 1 {
		return instance
	}

	mu.Lock()
	defer mu.Unlock()

	if instance == nil {
		defer atomic.StoreUint32(&initialized, 1)
		instance = &singleton{}
	}
	return instance
}

我们可以将通用的代码提取出来,就成了标准库中 sync.Once 的实现:

type Once struct {
	m    Mutex
	done uint32
}

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}

	o.m.Lock()
	defer o.m.Unlock()

	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

基于 sync.Once 重新实现单例模式:

var (
	instance *singleton
	once     sync.Once
)

func Instance() *singleton {
	once.Do(func() {
		instance = &singleton{}
	})
	return instance
}