• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Go Channel的基本使用和底层原理

武飞扬头像
xkzeee
帮助5

前言

如果说 goroutine 是Go程序并发的执行体,channel就是它们之间的连接。

Go语言采用的并发模型是:通信顺序进程(CSP),提倡通过通信共享内存而不是通过共享内存而实现通信。channel是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

1. channel类型

channel类型:未初始化的通道类型变量默认零值为nil

var 变量名称 chan 元素类型
// 例如
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

2. 初始化channel

声明一个通道类型的变量是需要使用make()函数初始化之后才能使用

make(chan 元素类型, [缓冲大小]) // 这里的缓冲大小是可选的
// 例如
flag := make(chan bool)
flag := make(chan bool, 1)

3. channel操作

通道一共有 发送、接收、关闭三种操作。

cha := make(chan int)
// 发送
cha <- 10   // 将10发送到cha种

// 接收
x := <- cha // 从cha种接收值并赋值给x变量
<- cha      // 从cha中接收值,忽略结果

// 关闭
close(cha)

4. 缓冲通道

4.1无缓冲通道

以下代码执行会报错 fatal error: all goroutines are asleep - deadlock!
deadlock表示程序中的 goroutine 都被挂起导致程序死锁了。

// make(chan int) 创建的就是无缓冲通道
func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("发送成功")
}

造成死锁的原因: 无缓冲通道必须至少有一个接收方才能发送成功,同理至少有一个发送放才能接收成功。

思路: 因为此通道没有进行接收操作,程序执行到 ch <- 10 会阻塞,但是这时创建了一个goroutine,那么就会进入recv,recv函数中有接收操作,最后代码执行完毕结束。

// 解决以上问题
func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}

func main() {
	ch := make(chan int)
	go recv(ch) // 创建一个 goroutine 从通道接收值
	ch <- 10
	fmt.Println("发送成功")
}

使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此,无缓冲通道也被称为同步通道

4.2 有缓冲通道

 func main() {
	ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
	ch <- 10
	fmt.Println("发送成功")
}

如果当通道内已有元素数达到最大容量后,再向通道执行发送操作就会阻塞,除非有从通道执行接收操作。

4.3 多返回值模式

  • 当向通道发送完数据时,通过close函数来关闭通道。当一个通道关闭后在向其发送数据会引发panic
  • 取值操作会先取完通道中的值,取完之后在执行接受操作得到的都是对应元素的0值
// value:是从通道中取出的值
// ok:通道chan关闭时返回false,没有关闭返回true
value, ok := <- chan

// 例子
func test(ch chan int) {
    for {
        value, ok := <-ch
            if !ok {
                fmt.Println("通道已关闭")
                break
        }
      fmt.Printf("v:%#v ok:%#v\n", value, ok)
	}
}

func main() {
	chan1 := make(chan int, 2)
	chan1 <- 3
    chan1 <- 5
	close(ch)
	test(ch)
}
// 输出 
    v:1 ok:true
    v:5 ok:true
    通道已关闭
学新通

4.4 单向通道

单向通道就是规定channel是只能接收呢?还是只能读取呢?这样用于标识在操作同一个通道

// 只能接收(只能往外边取,只读)
<- chan int 
// 只能发送(只能往里写,只写)
chan <- int 

5. chan数据结构

从hchan数据结构来看channel由队列、类型信息、goroutine等待队列组成

type hchan struct {
    qcount   uint           // 当前队列中剩余元素个数
    dataqsiz uint           // 环形队列长度,即可以存放的元素个数
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识关闭状态
    elemtype *_type         // 元素类型
    sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置
    recvx    uint           // 队列下标,指示元素从队列的该位置读出
    recvq    waitq          // 等待读消息的goroutine队列,即等待接收队列
    sendq    waitq          // 等待写消息的goroutine队列,即等待发送队列
    lock     mutex          // 互斥锁,chan不允许并发读写
}

5.1 环形队列

hchan结构体中:qcount、dataqsiz、buf、sendx、recv五个字段构建底层的循环队列(环形队列)

学新通

channel的数据结构(以上图为例),就是一个环形队列,里面保存了:

  • 队列剩余元素个数:qcount
  • 队列容量:dataqsiz
  • 数据buf:它是指向队列的内存
  • sendx:指示后续写入的数据存储的位置,取值[0, 10)
  • recvx:指示从该位置读取数据, 取值[0, 10)

closed: 用来标识channel的状态,0:关闭,!0 标识已关闭,如果关闭,那就不能发送数据

5.2 等待队列

从channel 读数据时如果channel缓冲区为空,当前goroutine 会被阻塞写数据时,如果channel缓冲区为满,当前goroutine 会被阻塞,还有在进行读或者写操作时没有缓冲区也会使当前goroutine阻塞
被阻塞的goroutine会被挂在channel的等待队列中,唤醒条件:

  • 因为读阻塞的 goroutine :向 channel 写入数据的 goroutine 唤醒
  • 因为写阻塞的 goroutine :从 channel 读数据的 goroutine 唤醒

一般情况下 recvq 和 sendq 至少有一个为空;只有一个例外,就是同一个 goroutine 使用 select 语句向 channel 一边写数据一边读数据

5.3 类型信息

一个 channel 只能传递一种类型的值,类型的信息存储在 hchan 数据结构中

  • elemtype 表示类型,用于数据传递过程中的赋值
  • elemsize 表示大小,用于在 buf 中定位元素的位置

6. channel读写

6.1 向 channel 写数据

向 channel 写数据的过程如下

  1. 先判断等待接收队列 recvq 中是否为空,如果 recvq 不为空,说明之前在进行读操作时,缓冲区中没有数据或者没有缓冲区,所以造成了阻塞并挂在到等待接收队列中,此时直接从 recvq 取出 G,并把数据写入,最后把该 G 唤醒,结束写过程
  2. 如果等待接收队列为空,并且缓冲区中有空余位置,那就直接将数据写入缓冲区,结束发送过程;
  3. 如果等待接收队列为空,并且缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

流程图如下:
学新通

6.2 向 channel 中读数据

向 channel 读数据的过程如下

此处 sendq 不为空的原因:说明之前在进行写操作时,缓冲区没有空余的位置即缓冲区满了,所以造成了阻塞并挂到等待发送队列中

  1. 先判断等待发送队列 sendq 中是否为空,如果 sendq 不为空,且没有缓冲区,直接从 sendq 中取出 G,然后把 G 的数据独出,最后把 G 唤醒,结束读取过程
  2. 如果等待发送队列 sendq 不为空,且有缓冲区,但是此时说明缓冲区已满,从缓冲区首部读出数据,然后把G中的数据写入缓冲区尾部,把G唤醒结束读取过程
  3. 如果等待发送队列 sendq 为空,缓冲区中有数据,取出,结束读取过程
  4. 如果等待发送队列 sendq 为空,缓冲区中没有数据,没有数据就没法读取发送,就会造成阻塞将当前goroutine挂到等待接收队列 recvq中,睡眠,等待被写goroutine唤醒
    流程图如下:
    学新通

7. channel的创建

7.1 makechan64()函数创建channel的过程

channel的创建是调用了以下两个函数之一,它的创建过程实际上是初始化hchan结构的。

  • makechan()
  • makechan64()

makechan64()函数是处理缓冲区大小大于 2 的 32 次方的情况,其实makechan64函数也是调用了makechan函数

func makechan64(t *chantype, size int64) *hchan {
	if int64(int(size)) != size {
		panic(plainError("makechan: size out of range"))
	}
	return makechan(t, int(size))
}

func makechan(t *chantype, size int) *hchan {}

7.2 makechan()函数创建channel的过程

7.2.1 合法性验证

  1. 类型大小大与 1<<16 时会法生异常(即大与65536)
  2. 内存对齐,当大与maxAlign(最大内存8字节数)时会发生异常
  3. 传入的size大小大与堆可分配的最大内存时会发成异常
    学新通
    学新通

7.2.2 分配地址空间

根据 channel 中 *chantype 和 size 初始化 hchan 数据结构和缓冲区

  1. 如果 channel 不存在缓冲区,分配hchan结构空间,即无缓存的 channel

  2. 如果当前 channel 中存储的类型不是指针类型,会为当前 channel 和 底层的数组分配一块连续的内存空间

  3. 默认情况包括指针,为 hchan 和 buf 单独分配数据地址空间
    学新通

7.2.3 最后更新 hchan 结构体的数据

学新通

7.2.4 打印 channel 信息

如果 debugChan 为true,则在创建channel时,会输出channel的元素大小,及可缓冲的元素个数等信息

学新通

7.3 关闭channel

关闭channel时候,会把 等待发送队列中的G全部唤醒,本该写入G的数据位置为nil;把sendq中的G全部唤醒,但这些G会panic。

  • 关闭值为nil的channel,会panic
  • 关闭已经被关闭的channel,会panic
  • 向已经关闭的channel写数据,会panic

8. select 使用及其实现原理

8.1 select使用

select是Go在语言层面提供的多路IO复用的机制,可以检测多个channel是否ready(即是否可读或可写)。又或者说使用select 可以监控channel,如果说有多个case都满足了,那么执行的顺序是随机的

如果检测到channel可以发送数据或者可以接收数据,就会执行case分支,如果阻塞就会执行default分支

// 只要ch 可以进行写操作时 就执行case分支
select {
	case ch <- 1:
		...
	default:
		...
}
// 只要ch可进行读操作时,就执行case分支
select {
	case result <- ch:
		...
	default:
		...
}
// 多路复用 
// 通过select监控两个channel:chan1 和 chan2
// 这里default分支是可选的
select {
	case result <- chan1 :
	...
	case result <- chan2 :
	...
	default:
        fmt.Printf("No element in chan1 and chan2.\n")
}
学新通

select的case语句读channel不会阻塞,尽管channel中没有数据。这是由于case语句编译后调用读channel时会明确传入不阻塞的参数,此时读不到数据时不会将当前goroutine加入到等待队列,而是直接返回

8.1 select实现原理

8.1.1 select数据结构

源码包 src/runtime/select.go 定义了表示case语句的数据结构scase

我的go版本是 1.8版本的,和以往版本的case数据结构有所不同,在以往的版本此结构中还有个 kind uint16 字段,它表示该case的类型,分为读channel、写channel和default,这三种类型由常量定义:

  1. caseRecv:case语句中尝试读取scase.c中的数据;
  2. caseSend:case语句中尝试向scase.c中写入数据;
  3. caseDefault: default语句
type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // data element
}

scase.c为当前case语句所操作的channel指针,这也说明了一个case语句只能操作一个channel

scase.elem表示缓冲区地址,根据scase.kind不同,有不同的用途:

  • 当scase.kind == caseRecv : scase.elem表示读出channel的数据存放地址;
  • 当scase.kind == caseSend : scase.elem表示将要写入channel的数据存放地址;

8.1.2 select实现逻辑

在scase数据结构下边有一个 selectgo() 函数,它是一个定义了select选择case的函数

核心方法selectgo如下:

  • order0 为一个两倍 cas0 数组长度的 buffer,保存 scase 随机序列 pollorder 和 scase 中 channel 地址序列 lockorder
  • pollorder:每次selectgo执行都会把scase序列打乱,以达到随机检测case的目的
  • lockorder:所有case语句中channel序列,以达到去重防止对channel加锁时重复加锁的目的
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
    // generate permuted order
    norder := 0
    for i := range scases {
        cas := &scases[i]
		// Omit cases without channels from the poll and lock orders.
		if cas.c == nil {
			cas.elem = nil // allow GC
			continue
		}
		// ....
		i  
    }
    pollorder = pollorder[:norder]
	lockorder = lockorder
	
	// sort the cases by Hchan address to get the locking order.
	// simple heap sort, to guarantee n log n time and constant stack footprint.
	// 一系列堆排序操作
	
	// lock all the channels involved in the select
	sellock(scases, lockorder)
	
	// pass 1 - look for something already waiting
	for _, casei := range pollorder {
	}
	
	if !block {
		selunlock(scases, lockorder)
		casi = -1
		goto retc
	}
	
	// pass 2 - enqueue on all chans
	gp = getg()
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting
	for _, casei := range lockorder {
	}
	
	// wait for someone to wake us up
	gp.param = nil
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)

	sellock(scases, lockorder)
	
	// pass 3 - dequeue from unsuccessful chans otherwise they stack up on quiet channels
	// record the successful case, if any. We singly-linked up the SudoGs in lock order.
	// Clear all elem before unlinking from gp.waiting.
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
	gp.waiting = nil
	for _, casei := range lockorder {
	}
}

// 伪代码
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    //1. 锁定scase语句中所有的channel
    //2. 按照随机顺序检测scase中的channel是否ready
    //   2.1 如果case可读,则读取channel中数据,解锁所有的channel,然后返回(case index, true)
    //   2.2 如果case可写,则将数据写入channel,解锁所有的channel,然后返回(case index, false)
    //   2.3 所有case都未ready,则解锁所有的channel,然后返回(default index, false)
    //3. 所有case都未ready,且没有default语句
    //   3.1 将当前协程加入到所有channel的等待队列
    //   3.2 当将协程转入阻塞,等待被唤醒
    //4. 唤醒后返回channel对应的case index
    //   4.1 如果是读操作,解锁所有的channel,然后返回(case index, true)
    //   4.2 如果是写操作,解锁所有的channel,然后返回(case index, false)
}

学新通

9. range

什么是range?
range 是 Go 语言中的一个关键字,常用于 for 循环中迭代array、slice、channel、map、string及涉及到遍历输出的东西。

迭代集合时,返回map的键值对(key-value)
迭代数组和切片时,返回元素的下标和下标对应的值(index-value)

迭代通道

// 会读出所有通道中的元素,这里和channel读机制是一样的
func main() {
	ch := make(chan int, 10)
	ch <- 1
	ch <- 2
	ch <- 3
	ch <- 4
	ch <- 5
	ch <- 6
	close(ch)
	for i := range ch {
		fmt.Println(i)
	}
	// 输出:1 2 3 4 5 6
}

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfibgei
系列文章
更多 icon
同类精品
更多 icon
继续加载