Go并发编程元语以及在Tendermint/CosmosSDK中的应用

Go并发编程元语以及在Tendermint/CosmosSDK中的应用

本文首先对并发编程模型和元语进行简要介绍,然后结合Tendermint/Cosmos-SDK源代码详细介绍Go语言支持的两种并发编程模型所提供的元语。本文不区分并发(Concurrency)和并行(Parallel),文中会根据情况使用这两个词中的一个。

  1. 并发编程模型
  2. 并发编程元语
  3. 共享内存模型元语
    • 3.1 sync.Mutex
    • 3.2 sync.RWMutex
    • 3.3 sync.WaitGroup
    • 3.4 sync.Cond
    • 3.5 sync.Once
    • 3.6 sync.Pool
  4. CSP模型元语
    • 4.1 Channel基本用法
    • 4.2 Channel的容量
    • 4.3 Channel的方向
    • 4.4 select语句

1. 并发编程模型

并发编程模型可以按两种方式分类:按照process交互方式,或者按照问题分解方式。按照process交互方式又可以进一步分为共享内存模型、消息传递模型、隐式交互模型。很多编程语言(比如Java)提供的线程模型就属于共享内存模型,在这种模型下,并行执行的多个线程通过共享内存来进行交互。消息传递模型又可以进一步分为同步异步两种模式,同步模式比较常用的有Communicating Sequential Processes(后文简称CSP)模型,异步模式比较常用的有Actor模型。Actor模型被Scala等语言采用,CSP模型则被Go语言采用并且大放异彩。本文只结合Go语言讨论共享内存和CSP这两种模型,下面给出并发编程模型的分类仅供参考。

Parallel programming model:

  • Process interaction

    • Shared memory
    • Message passing
      • synchronous
        • CSP
      • asynchronous
        • Actor
    • Implicit interation
  • Problem decomposition

    • Task parallelism
    • Data parallelism
    • Implicit parallelism

2. 并发编程元语

不管某个编程语言使用哪种并发编程模型,都需要提供一些基本的语法或者API来供程序员在这种模型下编程,这些基本语法或API就叫做并发编程元语(Primitives)。大家都知道在Go语言里可以使用go关键字非常方便的开启协程(Goroutine),Go语言同时支持共享内存和CSP这两种编程模型。共享内存元语主要由sync包提供,包括MutexCond等。CSP元语则是内置在语言内的,主要包括Channel类型和select语句。下面是Go语言提供的(也是本文将要介绍的)并发编程元语。

Go concurrency primitives:

  • Shared memory
    • sync.Locker
      • sync.Mutex
      • sync.RWMutex
    • sync.WaitGroup
    • sync.Cond
    • sync.Once
    • sync.Pool
  • CSP
    • channels
    • select

这里需要指出,虽然Go语言同时支持共享内存和CSP两种编程模型,但通常还是鼓励使用CSP模型。只有在真正必要时,再使用sync包。Go的口号是:

Do not communicate by sharing memory; instead, share memory by communicating.

3. 共享内存模型元语

Go语言共享内存编程模型主要由标准库sync包支持,这一节对sync包提供的6个元语进行介绍。

3.1 sync.Mutex

Mutex是Go语言提供的互斥锁实现。Mutex本身的用法非常简单,只有Lock()Unlock()两个方法。这两个方法也是Locker接口仅有的两个方法,因此Mutex实现了Locker接口:

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

在Go语言里,使用Mutex的套路一般是:当需要进入临界区时调先用Lock()方法加锁,然后使用defer语句调用Unlock()方法解锁,最后访问临界区。以Cosmos-SDK提供的cachekv.Store为例:

// Store wraps an in-memory cache around an underlying types.KVStore.
type Store struct {
    mtx           sync.Mutex
    cache         map[string]*cValue
    unsortedCache map[string]struct{}
    sortedCache   *list.List // always ascending sorted
    parent        types.KVStore
}

StoreGet()Set()Delete()等方法均需要保证并发安全性,所以按照前面描述的套路使用Mutex。以Get()方法为例:

// Implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
    store.mtx.Lock()
    defer store.mtx.Unlock()
    types.AssertValidKey(key)

    cacheValue, ok := store.cache[string(key)]
    if !ok {
        value = store.parent.Get(key)
        store.setCacheValue(key, value, false, false)
    } else {
        value = cacheValue.value
    }

    return value
}

3.2 sync.RWMutex

RWMutex是Go语言提供的读写锁实现。和Mutex一样,RWMutex也实现了Locker接口,用于操作写锁。另外增加了RLock()RUnlock()RLocker()三个方法,用于操作读锁。下面是RWMutex的完整API:

type RWMutex struct { /* fields */ }
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) RLocker() Locker

RWMutex使用起来也很简单:当需要进入临界区时进行写操作时,使用写锁;如果只读,则使用读锁。以Tendermint里的BlockStore为例:

// BlockStore is a simple low level store for blocks.
type BlockStore struct {
    db dbm.DB

    mtx    sync.RWMutex
    height int64
}

Height()方法只读取块的高度,因此使用读锁即可:

// Height returns the last known contiguous block height.
func (bs *BlockStore) Height() int64 {
    bs.mtx.RLock()
    defer bs.mtx.RUnlock()
    return bs.height
}

SaveBlock()方法要写高度,因此使用写锁:

func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
    ...

    // Done!
    bs.mtx.Lock()
    bs.height = height
    bs.mtx.Unlock()

    // Flush
    bs.db.SetSync(nil, nil)
}

3.3 sync.WaitGroup

如果要等待n个Goroutine结束,那么可以使用WaitGroup(类似Java提供的CountDownLatch)。WaitGroup内部维护了一个计数器:Add()方法可以对计数器增加任意值(包括负值);Done()方法对计数器减一;Wait()方法会导致Goroutine被阻塞,直到计数器变为0。下面是WaitGroup的完整API:

type WaitGroup struct { /* fields */ }
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

其实Done()方法只是对Add()方法的简单封装而已:

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

使用WaitGroup的一般套路是:第一步,创建WaitGroup实例;第二步,调用Add()方法设置需要等待的Goroutine数量;第三步,启动Goroutine干活儿,并且在Goroutine内部调用Done()方法;第四步,调用Wait()方法等待全部Goroutine结束。以Tendermint里的p2p.Switch#Broadcast()方法为例:

// Broadcast runs a go routine for each attempted send, which will block trying
// to send for defaultSendTimeoutSeconds. Returns a channel which receives
// success values for each attempted send (false if times out). Channel will be
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
    sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))

    peers := sw.peers.List()
    var wg sync.WaitGroup // step#1
    wg.Add(len(peers))    // step#2
    successChan := make(chan bool, len(peers))

    for _, peer := range peers {
        go func(p Peer) {
            defer wg.Done() // step#3
            success := p.Send(chID, msgBytes)
            successChan <- success
        }(peer)
    }

    go func() {
        wg.Wait() // step#4
        close(successChan)
    }()

    return successChan
}

3.4 sync.Cond

如果想等待某种条件(Condition)发生,或者等待某种信号发出,那么可以使用CondCond总是和一个Locker相关联,调用Wait()方法之前需要先锁住这个LockerWait()方法内部会自动解锁Locker。所有调用Wait()方法的Goroutine会进入一个等待队列并暂停执行(同时释放锁),调用Signal()方法可以恢复等待队列中的某个Goroutine,调用Broadcast()方法则可以恢复等待队列中的全部Goroutine。Wait()方法返回后,相应的Goroutine会继续持有锁。下面是Cond的完整API:

type Cond struct { 
    L Locker // L is held while observing or changing the condition
    // other fields
}
func NewCond(l Locker) *Cond
func (c *Cond) Wait()
func (c *Cond) Signal()
func (c *Cond) Broadcast()

由于Tendermint/Cosmos-SDK并没有用到Cond,所以我们看一下Tendermint依赖的gRPC。gRPC定义了一个Server结构体

// Server is a gRPC server to serve RPC requests.
type Server struct {
    opts options

    mu     sync.Mutex // guards following
    lis    map[net.Listener]bool
    conns  map[io.Closer]bool
    serve  bool
    drain  bool
    cv     *sync.Cond          // signaled when connections close for GracefulStop
    m      map[string]*service // service name -> service info
    events trace.EventLog

    ... // other fields
}

Server实例由NewServer()函数创建,这个函数里会调用NewCond()函数初始化cv字段:

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range opt {
        o(&opts)
    }
    s := &Server{ ... }
    s.cv = sync.NewCond(&s.mu) // <---
    if EnableTracing { ... }
    if channelz.IsOn() { ... }
    return s
}

调用Wait()方法的套路一般是这样:先持有锁,然后循环判断条件并在循环内调用Wait()方法等待,循环退出后条件满足进行操作,最后解锁。下面是这种套路的伪代码:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

ServerGracefulStop()方法需要“优雅”的关闭服务,所以要等待所有连接都关闭。该方法就是按照套路行事:

// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
    ...
    s.mu.Lock()

    for len(s.conns) != 0 {
        s.cv.Wait() // wait in a loop
    }
    s.conns = nil         // make use of condition
    if s.events != nil {  //
        s.events.Finish() //
        s.events = nil    //
    }                     //
    s.mu.Unlock()
}

ServerremoveConn()方法在连接关闭之后会调用Broadcast()方法:

func (s *Server) removeConn(c io.Closer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.conns != nil {
        delete(s.conns, c)
        s.cv.Broadcast() // <---
    }
}

需要指出的是,调用Singal()Broadcast()并不一定要持有锁。不过上面的removeConn()方法因为要改变共享状态(conns字段),所以才需要持有锁。

3.5 sync.Once

如果想在并发情况下保证某操作只被执行一次,那么可以使用OnceOnce非常简单,只有一个Do()方法,用于提交操作。下面是Once的完整API:

type Once struct { /* fields */ }
func (o *Once) Do(f func())

Tendermint/Cosmos-SDK也没有使用Once,前面介绍gRPC的Server结构体时省略了一些字段,下面给出该结构体的完整定义:

// Server is a gRPC server to serve RPC requests.
type Server struct {
    opts options

    mu     sync.Mutex // guards following
    lis    map[net.Listener]bool
    conns  map[io.Closer]bool
    serve  bool
    drain  bool
    cv     *sync.Cond          // signaled when connections close for GracefulStop
    m      map[string]*service // service name -> service info
    events trace.EventLog

    quit               chan struct{}
    done               chan struct{}
    quitOnce           sync.Once
    doneOnce           sync.Once
    channelzRemoveOnce sync.Once
    serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

    channelzID int64 // channelz unique identification number
    czData     *channelzData
}

可以看到,Server结构体包含了三个Once类型的字段。以Stop()方法为例:

func (s *Server) Stop() {
    s.quitOnce.Do(func() { // <---
        close(s.quit)
    })

    defer func() {
        s.serveWG.Wait()
        s.doneOnce.Do(func() { // <---
            close(s.done)
        })
    }()

    ...
}

Once的使用保证了quitdone这两个channel仅被关闭一次。

3.6 sync.Pool

虽然Go语言有自动垃圾回(GC)收机制,但是如果有一些对象创建和销毁代价比较大,则使用一个对象池来重复利用这些对象也是不错的主意。Pool就是来做这件事的,并且保证并发安全性。Pool只有两个方法:Get()从池子里取一个对象,Put()把对象归还给池子。Pool还有一个可选的New字段,类型是函数,当池子空的时候,Get()方法会调用New()函数创建新对象。下面是Pool的完整API:

type Pool struct {
    ... // other fields

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}
func (p *Pool) Put(x interface{})
func (p *Pool) Get() interface{}

Tendermint的tmfmtLogger使用Pool来缓存tmfmtEncoder实例:

var tmfmtEncoderPool = sync.Pool{
    New: func() interface{} {
        var enc tmfmtEncoder
        enc.Encoder = logfmt.NewEncoder(&enc.buf)
        return &enc
    },
}

Log()方法先调用Get()方法从池子里取出一个encoder,接着使用defer语句调用Put()归还encoder,然后使用encoder。这也是使用Pool的一般套路,代码如下所示:

func (l tmfmtLogger) Log(keyvals ...interface{}) error {
    enc := tmfmtEncoderPool.Get().(*tmfmtEncoder)
    enc.Reset()
    defer tmfmtEncoderPool.Put(enc)
    ... // use enc
}

4. CSP模型元语

这一节围绕Go语言提供的chan类型以及相关语法介绍CSP编程模型。

4.1 Channel基本用法

和共享内存模型不同,Go语言从语法上对CSP模型进行支持。从这一点也可以看出,用Go语言编程时,使用CSP模型才更自然一些。CSP模型最重要的概念就是消息传递,为此Go语言提供了特殊的channel类型。顾名思义,channel就是通道,消息可以经由通道在Goroutine之间流通。而在通道里流通的消息,则可以是任意类型(包括channel)。必须使用chan关键字来声明channel类型。由于channel是强类型的,因此chan的后面要跟上允许在通道里流动的消息的类型,例如:

var mychan chan string // channel of string

通道的实例由内置函数make()创建,例如:

mychan = make(chan string)

要想给channel发送消息,或者从channel里读取消息,可以使用<-运算符。如果<-在channel的左边则表示读取消息,如果在右边则表示发送消息:

s := <-mychan // read from channel
mychan <- s   // write to channel

还可以使用for range语句循环从channel里读取消息:

for s := range mychan {
    fmt.Println(s)
}

如果channel不再有用,可以使用内置函数close()把它关闭:

close(mychan)

Channel非常像其他语言(比如Java)里的阻塞队列(Blocking Queue),只不过容量为0(后面会进一步解释)。如果一个Goroutine试图从channel里读取消息会被阻塞,直到有其他Goroutine往里面发送消息为止。反之,如果一个Goroutine试图从channel里发送消息也会被阻塞,直到有其他Goroutine从里面拿走这个消息为止。Tendermint/Cosmos-SDK代码中大量使用了channel,以Cosmos-SDK提供的iavl包为例:

// Implements types.Iterator.
type iavlIterator struct {
    tree *iavl.ImmutableTree // Underlying store
    start, end []byte        // Domain
    ascending bool           // Iteration order
    iterCh chan cmn.KVPair   // Channel to push iteration values.
    quitCh chan struct{}     // Close this to release goroutine.
    initCh chan struct{}     // Close this to signal that state is initialized.
    ... // other fields
}

上面给出了iavlIterator结构体的代码,可以看到,这个结构体包含了三个chan类型的字段。下面是newIAVLIterator()函数的代码,从中可以看到make()函数的使用:

// newIAVLIterator will create a new iavlIterator.
// CONTRACT: Caller must release the iavlIterator, as each one creates a new
// goroutine.
func newIAVLIterator(tree *iavl.ImmutableTree, start, end []byte, ascending bool) *iavlIterator {
	iter := &iavlIterator{
		tree:      tree,
		start:     types.Cp(start),
		end:       types.Cp(end),
		ascending: ascending,
		iterCh:    make(chan cmn.KVPair), // Set capacity > 0?
		quitCh:    make(chan struct{}),
		initCh:    make(chan struct{}),
	}
	go iter.iterateRoutine()
	go iter.initRoutine()
	return iter
}

iterateRoutine()函数则展示了<-运算符以及close()函数的使用:

// Run this to funnel items from the tree to iterCh.
func (iter *iavlIterator) iterateRoutine() {
	iter.tree.IterateRange(
		iter.start, iter.end, iter.ascending,
		func(key, value []byte) bool {
			select {
			case <-iter.quitCh:
				return true // done with iteration.
			case iter.iterCh <- cmn.KVPair{Key: key, Value: value}:
				return false // yay.
			}
		},
	)
	close(iter.iterCh) // done.
}

4.2 Channel的容量

前面说了,channel类似于其他语言中的阻塞队列,只不过容量(Capacity)为0。其实,channel的容量可以是任意大小的,只需要在调用make()函数时通过第二个参数指定即可(注意:容量不属于channel类型,因此无法在声明channel时指定):

mychan = make(chan string, 100)

如果创建channel时不指定容量,那么我们创建的就是一个unbuffered channel,否则创建的就是buffered channel。当往buffered channel里写入消息时,只有buffer满才会导致Goroutine被阻塞。同理,当从buffered channel里读取消息时,只有buffer空才会导致Goroutine被阻塞。Tendermint也大量使用了buffered channel,这里就不举例说明了,下面列出一些用到的地方供读者参考:

$ cd tendermint
$ grep -nr make\(chan.*,.*\) . | grep -v _test | grep -v \.md
./consensus/state.go:156:		peerMsgQueue:     make(chan msgInfo, msgQueueSize),
./consensus/state.go:157:		internalMsgQueue: make(chan msgInfo, msgQueueSize),
./consensus/state.go:159:		statsMsgQueue:    make(chan msgInfo, msgQueueSize),
./consensus/ticker.go:43:		tickChan: make(chan timeoutInfo, tickTockBufferSize),
./consensus/ticker.go:44:		tockChan: make(chan timeoutInfo, tickTockBufferSize),
./abci/server/socket_server.go:115:		closeConn := make(chan error, 2)              // Push to signal connection closed
./abci/server/socket_server.go:116:		responses := make(chan *types.Response, 1000) // A channel to buffer responses
./abci/client/socket_client.go:45:		reqQueue:    make(chan *ReqRes, reqQueueSize),
./tools/tm-signer-harness/internal/test_harness.go:131:	c := make(chan os.Signal, 1)
./tools/tm-monitor/monitor/monitor.go:85:	blockCh := make(chan tmtypes.Header, 10)
./tools/tm-monitor/monitor/monitor.go:87:	blockLatencyCh := make(chan float64, 10)
./tools/tm-monitor/monitor/monitor.go:89:	disconnectCh := make(chan bool, 10)
./libs/autofile/autofile.go:72:	af.hupc = make(chan os.Signal, 1)
./libs/common/os.go:18:	c := make(chan os.Signal, 1)
./libs/common/async.go:127:	var taskDoneCh = make(chan bool, len(tasks))         // A "wait group" channel, early abort if any true received.
./libs/common/async.go:135:		var taskResultCh = make(chan TaskResult, 1) // Capacity for 1 result.
./libs/pubsub/subscription.go:33:		out:       make(chan Message, outCapacity),
./blockchain/v1/reactor.go:88:	eventsFromFSMCh := make(chan bcFsmMessage, capacity)
./blockchain/v1/reactor.go:89:	messagesForFSMCh := make(chan bcReactorMessage, capacity)
./blockchain/v1/reactor.go:90:	errorsForFSMCh := make(chan bcReactorMessage, capacity)
./blockchain/v1/reactor.go:280:	doProcessBlockCh := make(chan struct{}, 1)
./blockchain/v1/reactor.go:336:	stopProcessing := make(chan struct{}, 1)
./blockchain/v0/reactor.go:81:	requestsCh := make(chan BlockRequest, maxTotalRequesters)
./blockchain/v0/reactor.go:84:	errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
./blockchain/v0/reactor.go:225:	didProcessCh := make(chan struct{}, 1)
./blockchain/v0/pool.go:512:		gotBlockCh: make(chan struct{}, 1),
./blockchain/v0/pool.go:513:		redoCh:     make(chan p2p.ID, 1),
./p2p/transport.go:332:	errc := make(chan error, len(mt.connFilters))
./p2p/transport.go:495:		errc = make(chan error, 2)
./p2p/switch.go:264:	successChan := make(chan bool, len(peers))
./p2p/switch.go:719:	errc := make(chan error, len(sw.peerFilters))
./p2p/conn/connection.go:166:		send:          make(chan struct{}, 1),
./p2p/conn/connection.go:167:		pong:          make(chan struct{}, 1),
./p2p/conn/connection.go:208:	c.pongTimeoutCh = make(chan bool, 1)
./p2p/conn/connection.go:739:		sendQueue:               make(chan []byte, desc.SendQueueCapacity),
./mempool/clist_mempool.go:108:	mem.txsAvailable = make(chan struct{}, 1)
./rpc/core/mempool.go:137:	resCh := make(chan *abci.Response, 1)
./rpc/core/mempool.go:237:	checkTxResCh := make(chan *abci.Response, 1)
./rpc/lib/server/handlers.go:543:	wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
./rpc/lib/server/handlers.go:712:	pongs := make(chan string, 1)
./rpc/lib/client/ws_client.go:166:	c.reconnectAfter = make(chan error, 1)
./rpc/lib/client/ws_client.go:169:	c.backlog = make(chan types.RPCRequest, 1)
./rpc/client/httpclient.go:410:	outc := make(chan ctypes.ResultEvent, outCap)
./rpc/client/localclient.go:179:	outc := make(chan ctypes.ResultEvent, outCap)

4.3 Channel的方向

虽然声明channel时无法指定容量,但却能指定方向。像前面那样声明的channel实际是双向(Bidirectional)通道,我们还可以声明单向(Unidirectional)通道,只要在申明channel时带上<-即可。如果<-chan关键字左边表示只读通道,右边则表示只写通道,例如:

var mychan <-chan string // readonly
var mychan chan<- string // write-only

Tendermint也大量使用了单向通道,这里也不举例说明了,下面列出一些使用只读通道的地方:

$ grep -nr \<\-chan . | grep -v _test | grep -v \.md
./consensus/state.go:62:	TxsAvailable() <-chan struct{}
./consensus/ticker.go:20:	Chan() <-chan timeoutInfo       // on which to receive a timeout
./consensus/ticker.go:66:func (t *timeoutTicker) Chan() <-chan timeoutInfo {
./abci/server/socket_server.go:218:func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
./tools/tm-monitor/monitor/monitor.go:168:func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
./types/event_bus.go:25:	Out() <-chan tmpubsub.Message
./types/event_bus.go:26:	Cancelled() <-chan struct{}
./evidence/pool.go:47:func (evpool *EvidencePool) EvidenceWaitChan() <-chan struct{} {
./mock/mempool.go:38:func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
./mock/mempool.go:43:func (Mempool) TxsWaitChan() <-chan struct{} { return nil }
./libs/clist/clist.go:97:func (e *CElement) PrevWaitChan() <-chan struct{} {
./libs/clist/clist.go:106:func (e *CElement) NextWaitChan() <-chan struct{} {
./libs/clist/clist.go:308:func (l *CList) WaitChan() <-chan struct{} {
./libs/common/service.go:46:	Quit() <-chan struct{}
./libs/common/service.go:217:func (bs *BaseService) Quit() <-chan struct{} {
./libs/common/async.go:20:type TaskResultCh <-chan TaskResult
./libs/pubsub/subscription.go:41:func (s *Subscription) Out() <-chan Message {
./libs/pubsub/subscription.go:47:func (s *Subscription) Cancelled() <-chan struct{} {
./blockchain/v0/reactor.go:68:	requestsCh <-chan BlockRequest
./blockchain/v0/reactor.go:69:	errorsCh   <-chan peerError
./p2p/trust/ticker.go:13:	GetChannel() <-chan time.Time
./p2p/trust/ticker.go:33:func (t *TestTicker) GetChannel() <-chan time.Time {
./p2p/trust/ticker.go:60:func (t *Ticker) GetChannel() <-chan time.Time {
./p2p/fuzz.go:18:	start  <-chan time.Time
./p2p/fuzz.go:34:		start:  make(<-chan time.Time),
./mempool/clist_mempool.go:203:func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
./mempool/clist_mempool.go:444:func (mem *CListMempool) TxsAvailable() <-chan struct{} {
./mempool/mempool.go:58:	TxsAvailable() <-chan struct{}
./rpc/lib/client/ws_client.go:367:		ticker = &time.Ticker{C: make(<-chan time.Time)}
./rpc/client/httpclient.go:399:	outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
./rpc/client/interface.go:103:	Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
./rpc/client/localclient.go:164:func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {

下面是一些使用只写通道的地方:

$ grep -nr chan\<\- . | grep -v _test | grep -v \.md
./consensus/wal_generator.go:151:	signalWhenStopsTo chan<- struct{}
./consensus/wal_generator.go:159:func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL {
./abci/server/socket_server.go:147:func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
./abci/server/socket_server.go:179:func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
./tools/tm-monitor/monitor/node.go:38:	blockCh        chan<- tmtypes.Header
./tools/tm-monitor/monitor/node.go:39:	blockLatencyCh chan<- float64
./tools/tm-monitor/monitor/node.go:40:	disconnectCh   chan<- bool
./tools/tm-monitor/monitor/node.go:82:func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
./tools/tm-monitor/monitor/node.go:86:func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
./tools/tm-monitor/monitor/node.go:90:func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
./types/event_bus.go:218:func (NopEventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
./blockchain/v0/pool.go:78:	requestsCh chan<- BlockRequest
./blockchain/v0/pool.go:79:	errorsCh   chan<- peerError
./blockchain/v0/pool.go:84:func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
./p2p/transport.go:335:		go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) {
./p2p/transport.go:501:	go func(errc chan<- error, c net.Conn) {
./p2p/transport.go:505:	go func(errc chan<- error, c net.Conn) {
./p2p/switch.go:722:		go func(f PeerFilterFunc, p Peer, errc chan<- error) {
./rpc/client/localclient.go:185:func (c *Local) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {

4.4 select语句

如果想同时操作多个channel,那么可以使用select语句(语法和switch语句有点像)。如果有多个channel可用,那么select语句会随机选择一个可用channel进行读或者写。如果没有channel可用,并且有default分支,则会执行此分支。如果没有channel可用也没有提供default分支,那么整个select语句会被阻塞。下面是select语言的一般形式:

select {
    case <-c1:
        // do somethine
    case <-c2:
        // do somethine
    case c3 <- x:
        // do somethine
    case c4 <- y:
        // do somethine
    default:
        // do somethine
}

Tendermint/Cosmos-SDK代码中也是大量使用了select语句,前面介绍的iterateRoutine()方法就是一个例子。下面再从Temdermint p2p包中选取一个方法,以供参考:

// Accept implements Transport.
func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
    select {
    // This case should never have any side-effectful/blocking operations to
    // ensure that quality peers are ready to be used.
    case a := <-mt.acceptc:
        if a.err != nil { return nil, a.err }
        cfg.outbound = false
        return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil
    case <-mt.closec:
        return nil, ErrTransportClosed{}
    }
}

由于select语句的casedefault分支均可省略,所以一个惯用法就是通过空的select {}来“永久”阻塞Goroutine。以Tendermint提供的node命令为例:

// NewRunNodeCmd returns the command that allows the CLI to start a node.
// It can be used with a custom PrivValidator and in-process ABCI application.
func NewRunNodeCmd(nodeProvider nm.NodeProvider) *cobra.Command {
	cmd := &cobra.Command{
		Use:   "node",
		Short: "Run the tendermint node",
		RunE: func(cmd *cobra.Command, args []string) error {
			n, err := nodeProvider(config, logger)
			if err != nil {
				return fmt.Errorf("Failed to create node: %v", err)
			}

			// Stop upon receiving SIGTERM or CTRL-C.
			cmn.TrapSignal(logger, func() {
				if n.IsRunning() { n.Stop() }
			})

			if err := n.Start(); err != nil {
				return fmt.Errorf("Failed to start node: %v", err)
			}
			logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo())

			select {} // Run forever.
		},
	}

	AddNodeFlags(cmd)
	return cmd
}

本文由CoinEx Chain团队Chase写作,转载无需授权。