Bootstrap

手摸手Go 深入剖析sync.Pool

如果能够将所有内存都分配到栈上无疑性能是最佳的,但不幸的是我们不可避免需要使用堆上分配的内存。我们可以优化使用堆内存时的性能损耗吗?答案是肯定的。Go同步包中,提供了保存和访问一组临时对象并复用它们的能力。

对于一些创建成本昂贵、频繁使用的临时对象,使用可以减少内存分配,降低GC压力。因为的gc算法是根据标记清除改进的三色标记法,如果频繁创建大量临时对象,势必给GC标记带来负担,CPU也很容易出现毛刺现象。当然需要注意的是:**存储在中的对象随时都可能在不被通知的情况下被移除。 所以并不是所有频繁使用、创建昂贵的对象都适用,比如DB连接、线程池**。

Talk is cheap,Show me your code

因为版本后对做了优化,放弃了利用加锁的方式该用CAS加带环形数组的双向链表的方式来实现,本文基于最新稳定版本分析。

基本使用

package main

import "sync"

type Person struct {
	Age int
}

// 初始化pool
var personPool = sync.Pool{
	New: func() interface{} {
		return new(Person)
	},
}

func main() {
	// 获取一个实例
	newPerson := personPool.Get().(*Person)
	// 回收对象 以备其他协程使用
	defer personPool.Put(newPerson)

	newPerson.Age = 25
}

使用起来比较简单大概分三步:

提供一个函数,当Pool中未缓存该对象时调用

需要注意的是:跟一样第一次使用之后是不允许被拷贝的。

那对性能优化真的有这么大魔力吗?Benchmark之

import (
	"testing"
)

func BenchmarkWithoutPool(b *testing.B) {
	var p *Person
	b.ReportAllocs()
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		for j := 0; j < 10000; j++ {
			p = new(Person)
			p.Age = 30
		}
	}
}

func BenchmarkWithPool(b *testing.B) {
	var p *Person
	b.ReportAllocs()
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		for j := 0; j < 10000; j++ {
			p = personPool.Get().(*Person)
			p.Age = 30
			personPool.Put(p)
		}
	}
}

基准测试结果:

BenchmarkWithoutPool
BenchmarkWithoutPool-8   	    7630	    135523 ns/op	   80000 B/op	   10000 allocs/op
BenchmarkWithPool
BenchmarkWithPool-8   	    9865	    126072 ns/op	       0 B/op	       0 allocs/op

工作原理

没有啥一张图搞不定的

如果不行 那就再来一张

sync.Pool 数据结构

type Pool struct {
	noCopy noCopy
	// 实际指向[]poolLocal 每个P对应一个poolLocal 数组大小取决于P的数量 runtime.GOMAXPROCS(0)
	local     unsafe.Pointer 
	localSize uintptr        // []poolLocal的大小

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array
  
  //当缓存池无对应对象时调用
	New func() interface{}
}

相较于之前版本,的结构体中新增了、字段

主要维护了一个的数组,数组大小由决定。

type poolLocal struct {
	poolLocalInternal
	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
	private interface{} // 只能被对应的P使用
	shared  poolChain   // 本地的P可以从Head 进行pushHead/popHead 其他的P可以popTail.
}

内部又由P私有空间和共享空间。共享空间是一个双端队列,双端队列每个节点又对应着一个环形数组,听着貌似有点儿绕,老规矩上图:

算是个逻辑上的环形数组,字段存储着实际的值,出于操作原子性的考虑,字段将首尾索引融合在一起,高32位为head的索引下标,低32位为tail的索引下标,head和tail指向同一位置则表示环形数组为空。

代码佐证:

func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
	const mask = 1<> dequeueBits) & mask)
	tail = uint32(ptrs & mask)
	return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
	const mask = 1<

实际使用过程中又将进行了包装,因为数组大小是固定,所以为了让他大小可变,将其包装成了双向链表。

操作方法

接下来我们来剖析一下几个核心流程

获取对象 p.Get

获取对象,大体流程:

func (p *Pool) Get() interface{} {
  // 将当前goroutine与P进行绑定 runtime_procPin禁用抢占
  // 返回poolLocal与P的id
	l, pid := p.pin()
	x := l.private //尝试直接从私有空间拿
	l.private = nil
	if x == nil {
    //从共享区域头部拿
		x, _ = l.shared.popHead()
		if x == nil {
      //直接实在没有 尝试去别人那边看看能不能偷个
			x = p.getSlow(pid)
		}
	}
  // 解除抢占禁用
	runtime_procUnpin()
  // 都没有 那只好自己New一个
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

那么我们来看看goroutine 是怎么跟P绑定的

func (p *Pool) pin() (*poolLocal, int) {
	pid := runtime_procPin()
  // pinSlow中我们先存储local再存储localSize,这里我们以相反顺序加载
  // 因为我们已经禁用了抢占 GC这期间不会发生 因此我们需要观察local的大小至少跟localSize一样
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local                          // load-consume
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
  // 运行过程中可能会存在调整P的情况 或者GC了
	return p.pinSlow()
}

这里我们先调用,为啥它这么牛逼,不仅让P不会被抢占,还让GC为之折腰?

番外:禁止抢占

func runtime_procPin() int
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
	return procPin()
}
//go:nosplit
func procPin() int {
	_g_ := getg()
	mp := _g_.m

	mp.locks++
	return int(mp.p.ptr().id)
}

正如所见,兜兜转转实际绑定goroutine和P、禁用抢占交给了。首先从或专用寄存器拿到当前的,然后获取当前绑定的物理线程,并对物理线程的属性自增操作。这意味什么呢?

这里可能涉及到一些调度的内容,Go runtime调度是一个GPM模型。G为调度的基本单元,P可以理解为运行G的逻辑CPU M为系统线程。何为抢占?

即,将绑定的给占用,因为中99.9%的任务都需要才能执行任务。Go运行时调度主要存在两种抢占的情况:

  • 第一种情况,进行系统调用的,因为存在阻塞,傻傻等在那里会比较浪费计算资源,为了让其他不被饿死

  • 第二种情况,如果一个运行时间太长,中其他得不到执行也会饿死

抢占实现

Go中的抢占是实现的。对 没错就是里的那个也是唯一一个脱离模型只需即可运行的特例。中包含了、、、,这里抢占我们需要关注下。

//go:nowritebarrierrec
func sysmon() {
  ...
	// retake P's blocked in syscalls
		// and preempt long running G's
		if retake(now) != 0 {
			idle = 0
		} else {
			idle++
		}
  ...
}
func retake(now int64) uint32 {
 ... 
if s == _Prunning || s == _Psyscall {
			// Preempt G if it's running for too long.
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {//G运行时间超过forcePreemptNS
				preemptone(_p_)
				// In case of syscall, preemptone() doesn't
				// work, because there is no M wired to P.
				sysretake = true
			}
  ...
}

P处于运行中或系统调用,检查运行时间是否超过,超过则调用抢占这个

func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}

	gp.preempt = true

	// Every call in a go routine checks for stack overflow by
	// comparing the current stack pointer to gp->stackguard0.
	// Setting gp->stackguard0 to StackPreempt folds
	// preemption into the normal stack overflow check.
	gp.stackguard0 = stackPreempt

	// Request an async preemption of this P.
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}

	return true
}

主要是设置两个标志位和 主要起作用的是后者。通过将的设置为,导致在执行下一次的函数调用时,栈空间检查失败(与SP寄存器比较),进而触发编译器安插的指令。

//以asm_amd64.s为例
TEXT runtime·morestack(SB),NOSPLIT,$0-0
	... ...
	// Call newstack on m->g0's stack.
	MOVQ	m_g0(BX), BX
	MOVQ	BX, g(CX)
	MOVQ	(g_sched+gobuf_sp)(BX), SP
	CALL	runtime·newstack(SB)
	CALL	runtime·abort(SB)	// crash if newstack returns
	RET

会调用尝试栈扩容

//go:nowritebarrierrec
func newstack() {
  ... ...
	if preempt {
		if !canPreemptM(thisg.m) {
			// Let the goroutine keep running for now.
			// gp->preempt is set, so it will be preempted next time.
			gp.stackguard0 = gp.stack.lo + _StackGuard
			gogo(&gp.sched) // never return
		}
	}
 ... ... 
}
//go:nosplit
func canPreemptM(mp *m) bool {
	return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}

在栈扩容前会检查抢占标志位则不抢占。

如果抢占成功,则会继续调用进而调用将与当前接触关联,设置状态,然后将插入Global runnable queue 等待下次调度。

至此,应该能彻底明白为啥能够通过修改绑定的的属性就能禁用抢占了。

但是还有个问题,为啥GC也拿它没办法?

关于的,大致有三种触发方式:

  • gcTriggerCycle 后台定时检查触发,如

  • gcTriggerTimer 自上个GC周期超过forcegcperiod纳秒则触发 如

  • g cTriggerHeap 申请的堆内存大小达到触发阈值 如

最终都会调用,进而我们在GC的STW阶段执行中可以看到

func stopTheWorldWithSema() {
	_g_ := getg()

	// If we hold a lock, then we won't be able to stop another M
	// that is blocked trying to acquire the lock.
	if _g_.m.locks > 0 {
		throw("stopTheWorld: holding locks")
	}
	lock(&sched.lock)
	sched.stopwait = gomaxprocs
	atomic.Store(&sched.gcwaiting, 1)
	preemptall()
	// stop current P
	_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
	sched.stopwait--
	// try to retake all P's in Psyscall status
	for _, p := range allp {
		s := p.status
		if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
			if trace.enabled {
				traceGoSysBlock(p)
				traceProcStop(p)
			}
			p.syscalltick++
			sched.stopwait--
		}
	}
	// stop idle P's
	for {
		p := pidleget()
		if p == nil {
			break
		}
		p.status = _Pgcstop
		sched.stopwait--
	}
	wait := sched.stopwait > 0
	unlock(&sched.lock)

	// wait for remaining P's to stop voluntarily
	if wait {
		for {
			// wait for 100us, then try to re-preempt in case of any races
			if notetsleep(&sched.stopnote, 100*1000) {
				noteclear(&sched.stopnote)
				break
			}
			preemptall()
		}
	}

	// sanity checks
	bad := ""
	if sched.stopwait != 0 {
		bad = "stopTheWorld: not stopped (stopwait != 0)"
	} else {
		for _, p := range allp {
			if p.status != _Pgcstop {
				bad = "stopTheWorld: not stopped (status != _Pgcstop)"
			}
		}
	}
	if atomic.Load(&freezing) != 0 {
		// Some other thread is panicking. This can cause the
		// sanity checks above to fail if the panic happens in
		// the signal handler on a stopped thread. Either way,
		// we should halt this thread.
		lock(&deadlock)
		lock(&deadlock)
	}
	if bad != "" {
		throw(bad)
	}
}

大致逻辑先调用尝试抢占所有的,然后停掉当前,遍历所有的,如果处于系统调用则直接掉;然后处理空闲的;最后检查是否存在需要等待处理的,如果有则循环等待,并尝试调用

func preemptall() bool {
	res := false
	for _, _p_ := range allp {
		if _p_.status != _Prunning {
			continue
		}
		if preemptone(_p_) {
			res = true
		}
	}
	return res
}

到这里就很清晰了,我们又看到老朋友,显然会在阶段等下去,自然也无法执行下去。

好了 刚刚两个问题我们已经搞清楚了。书归正传 能禁用被抢占,那么自然能解除禁用。完成与的绑定,返回了当前的id,如果则说明当前poolLocal已经存在 直接利用地址偏移拿到

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

如果运行时被调整了呢?那么尝试下,正如其名这个过程会有点儿慢

func (p *Pool) pinSlow() (*poolLocal, int) {
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
	return &local[pid], pid
}

上来第一件事儿 将我们之前设置的P禁用抢占给释放了。然后尝试获取全局排他锁。这也能解释它为啥上来就释放掉之前的禁止占用,因为获取当前全局排他锁不一定能立马拿到啊。拿到锁之后又开启了禁止抢占P,接着又判断了下因为拿到锁之前可能已经变化了。如果当前则将放到全局的池子里,也是为啥刚才需要等待全局排他锁的原因。因为时会将原有的pool清理掉所以这里进行重建,原有pool真的没了吗?这个就跟之前提到的有点儿关系了 等会儿一起看。

至此,我们拿到了,接着获取对象的顺序为

func (c *poolChain) popHead() (interface{}, bool) {
	d := c.head
	for d != nil {
		if val, ok := d.popHead(); ok {
			return val, ok
		}
		// There may still be unconsumed elements in the
		// previous dequeue, so try backing up.
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

共享空间是以为节点的双向链表,首先我们尝试沿着双向链表的方向依次调用尝试从头部拿数据

func (d *poolDequeue) popHead() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// Queue is empty.
			return nil, false
		}

		// Confirm tail and decrement head. We do this before
		// reading the value to take back ownership of this
		// slot.
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// We successfully took back slot.
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	// Zero the slot. Unlike popTail, this isn't racing with
	// pushHead, so we don't need to be careful here.
	*slot = eface{}
	return val, true
}

逻辑也比较简单

2.1 将拆封 如果head==tail表明当前环形数组为空,直接返回

2.2 接着将head索引减1,然后将head、tail再打包回去,通过CAS判断当前没有并发修改就拿到数据 跳出循环 否则循环等待

2.3 将slot转为interface{}类型

2.4 将slot赋值为eface{}

func (p *Pool) getSlow(pid int) interface{} {
	// See the comment in pin regarding ordering of the loads.
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	locals := p.local                        // load-consume
	// Try to steal one element from other procs.
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Try the victim cache. We do this after attempting to steal
	// from all primary caches because we want objects in the
	// victim cache to age out if at all possible.
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

3.1 拿到[]poolLocal数组,遍历每个poolLocal,并调用 从其共享空间的尾部拿数据

func (c *poolChain) popTail() (interface{}, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
		// It's important that we load the next pointer
		// *before* popping the tail. In general, d may be
		// transiently empty, but if next is non-nil before
		// the pop and the pop fails, then d is permanently
		// empty, which is the only condition under which it's
		// safe to drop d from the chain.
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			// This is the only dequeue. It's empty right
			// now, but could be pushed to in the future.
			return nil, false
		}

		// The tail of the chain has been drained, so move on
		// to the next dequeue. Try to drop it from the chain
		// so the next pop doesn't have to look at the empty
		// dequeue again.
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// We won the race. Clear the prev pointer so
			// the garbage collector can collect the empty
			// dequeue and so popHead doesn't back up
			// further than necessary.
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

首先拿到尾节点,然后在死循环中沿着双向链表的方向不断获取节点,尝试调用获取数据

func (d *poolDequeue) popTail() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// Queue is empty.
			return nil, false
		}
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}
	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)
	return val, true
}

与比较像,不同在于一个从头部拿数据一个从尾部拿。首先依然是在死循环中先将拆封,如果tai l==head表示环形数组为空,直接返回。否则将tail+1再封装好,同CAS规避并发问题 拿到数据则跳出循环,否则循环等待。

这里有一个跟不同的是 先将value置为nil然后利用CAS来将typ置空操作,原因很简单,和一个从头放一个从尾拿数据,一旦碰头就会出现竞争。

3.2 那如果偷都偷不到,会进行以下操作

size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)

翻译过来叫“受害者缓存”

受害者缓存是由提出的一种提高缓存性能的硬件技术。如他的论文所述

>

Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.

大概意思就是在旧缓存和缓解重建的过程中,添加一个全关联的缓存(保存旧缓存数据)。也就是说当一级缓存踢出的数据,放到受害者缓存中。当我们在一级缓存未命中,则可以继续尝试从受害者缓存中查询。

如代码:

size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)

如果能理解,其实还是挺简单的,也就是

local1 ->GC ->local2 victim->local1

>

Local2 ->GC ->local3 victim->local2

if x == nil && p.New != nil {
		x = p.New()
	}

用完返回Pool p.Put

看完 ,接着看下

func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
  // 将goroutine与P绑定 runtime_procPin禁用抢占 返回poolLocal
	l, _ := p.pin()
	if l.private == nil {//优先放到私有空间
		l.private = x
		x = nil
	}
	if x != nil { //放回共享空间
		l.shared.pushHead(x)
	}
  // 解除抢占禁用
	runtime_procUnpin()
}

基本逻辑:

func (c *poolChain) pushHead(val interface{}) {
	d := c.head
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}
	if d.pushHead(val) {
		return
	}

	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

逻辑主要是将对象放到双向链表的对应节点的环形数组中。

func (d *poolDequeue) pushHead(val interface{}) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	if (tail+uint32(len(d.vals)))&(1<

跟是相反的操作,大体也比较简单。先判断环形数组是否满了,满了则直接返回。因为跟存在竞争关系,不为空可能是还没处理完。

关于GC清除数据问题

中的函数组册了GC发生时如何清理Pool的函数,调用链如下

->->->

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
	poolcleanup = f
}
func poolCleanup() {
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	oldPools, allPools = allPools, nil
}

逻辑很简单 正如上面讲说的那样。

最后的最后,细心的你可能发现 还遗漏了两个细节

noCopy

结构体中其实是为了防止使用过程中被拷贝。至于原因应该不用多说,因为并没有提供原生的强制不能拷贝的方法。所以采用这种方式,让检测报错来实现。

举个例子

type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}
type People struct {
	noCopy noCopy
}

func say(p People) {

}

func main() {
	var p People
	say(p)
}

go vet demo.go

输出:

# command-line-arguments
./demo.go:12:12: say passes lock by value: command-line-arguments.People contains command-line-arguments.noCopy
./demo.go:18:6: call of say copies lock value: command-line-arguments.People contains command-line-arguments.noCopy

当然直接执行不会报任何错

pad

type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

字段在这里没有啥业务意思,目的就是为了避免问题。因为我们为了缓解计算机CPU计算速度和内存的读取速度不匹配的矛盾,在他们之间增加了L1 L2 L3 高速缓存,他们比内存小很多但是速度却是内存无法比拟的。

缓存系统中我们是以缓存行(cache line)为单位,通常大小为64字节。上面这张图,我们可以看到L1、L2、L3三级缓存他们和内存的读取速度当然取决于他们与CPU紧密程度。L1>L2>L3>内存

但是!我们现在使用的都是多核CPU的计算机,如何保证多核看到的数据的一致性呢?这里我们需要谈到一个协议-MESI协议,M、E、S、I分别表示缓存行的4个状态

M(修改,Modified):本地处理器已经修改缓存行,即是脏行,它的内容与内存中的内容不一样,并且此 cache 只有本地一个拷贝(专有);

>

E(专有,Exclusive):缓存行内容和内存中的一样,而且其它处理器都没有这行数据;

>

S(共享,Shared):缓存行内容和内存中的一样, 有可能其它处理器也存在此缓存行的拷贝;

>

I(无效,Invalid):缓存行失效, 不能使用。

他们转换关系如下:

现在假设我们有以下场景

有两个变量X、Y共享在了一个中。如果core1想要更新X,core2想要更新Y,更新完他们的缓存行都变成了I状态,即L1 L2上的缓存均不可用,这时如果其他线程再要访问X Y就只能从L3甚至从内存拿数据,其性能可想而知。

怎么解决呢?

解决伪共享的问题 业界大多采用pad填充的方式来解决,让数据独占一个cacheline 降低数据关联共享的影响。比如Java8还提供了语法糖,通过添加注解自动进行缓存行填充。

总结

实现总体比较小巧,具体思想其实其他语言也都有影子,比如Java中的。但是往往简单设计的细节往往很值得我们去考究学习一下的。总结下知识点还真不少:

  • work stealing算法

  • CAS如何做到lock-free

  • 设置抢占标志 禁止P被占用 并制止GC

  • Victim cache 受害者缓存是怎么回事儿

  • noCopy是干啥的 怎么实现禁止拷贝

  • 伪共享(false share)

  • Pool GC的机制

不过这也符合Go“少即是多”的设计理念。