• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Go|Channel 底层实现和运行调度逻辑

武飞扬头像
Mengo_x
帮助1

1、Channel 与 CSP并发模型

CSP(Communicating Sequential Process)通信顺序进程,是一种很强大的并发数据模型,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。

Golang 其实只用到了 CSP 的很小一部分,即理论中的 Process/Channel(对应到语言中的 goroutine/channel):这两个并发原语之间没有从属关系, Process 可以订阅任意个 Channel,Channel 也并不关心是哪个 Process 在利用它进行通信;Process 围绕 Channel 进行读写,形成一套有序阻塞和可预测的并发模型。

DO NOT COMMUNICATE BY SHARING MEMORY; INSTEAD, SHARE MEMORY BY COMMUNICATING.
不要以共享内存的方式来通信,而要通过通信来共享内存。

无论是通过共享内存来通信还是通过通信来共享内存,最终我们应用程序都是读取的内存当中的数据,只是前者是直接读取内存的数据,而后者是通过发送消息的方式来进行同步。

大部分的语言采用的都是第一种方式直接去操作内存,然后通过互斥锁,CAS 等操作来保证并发安全。共享内存的方式在高并发场景下的锁竞争激烈,开销大。采用channel进行通信可以控制并发的数量,可以使得生产者和消费者解耦,提高代码可读性。Go的CSP并发模型,是通过goroutinechannel来实现的。

Go中的channel 是一个队列,遵循先进先出的原则,负责协程之间的通信。Channel 其实和消息队列很相似。

Go 的 GMP 协程调度模型不展开讲,可以查看之前的博客

Channel

channel 使用场景:

  • 停止信号监听
  • 定时任务
  • 生产方和消费方解耦
  • 控制并发数

channel有3种状态:未初始化、正常、关闭

状态 未初始化 关闭 正常
关闭 panic panic 正常关闭
发送 阻塞 panic 阻塞或者成功发送
接收 阻塞 缓冲区为空则为零值, 否则可以继续读 阻塞或者成功接收
  1. 一个 channel不能多次关闭,会导致painc
  2. 如果多个 goroutine 都监听同一个 channel,那么 channel 上的数据都可能随机被某一个 goroutine 取走进行消费
  3. 如果多个 goroutine 监听同一个 channel,如果这个 channel 被关闭,则所有 goroutine 都能收到退出信号

channel死锁场景:

  • 非缓存channel只写不读
  • 非缓存channel读在写后面
  • 缓存channel写入超过缓冲区数量
  • 空读
  • 多个协程互相等待

2、Channel 底层结构

channel 变量是一个存储在函数栈帧上的指针,占用8个字节,指向堆上的 hchan 结构体。

type hchan struct {
 closed   uint32   // channel是否关闭的标志
 elemtype *_type   // channel中的元素类型
 
 // channel分为无缓冲和有缓冲两种。
 // 对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组
 // 为啥是循环数组?普通数组不行吗,普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部都前移
 // 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
 buf      unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区)
 qcount   uint           // 循环数组中的元素数量
 dataqsiz uint           // 循环数组的长度
 elemsize uint16         // 元素的大小
 sendx    uint           // 下一次写下标的位置
 recvx    uint           // 下一次读下标的位置
  
 // 尝试读取channel或向channel写入数据而被阻塞的goroutine
 recvq    waitq  // 读等待队列
 sendq    waitq  // 写等待队列

 lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}
学新通

对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组。环形缓存的内存空间可以复用,减少了GC的压力。

环形缓存由hchan的五个字段构成。qcout是环形缓存中已经保存数据的多少,dataqsize表示的是最多能缓存多少数据,buf是一个指向环形缓存第一个成员的指针,elemsizeelemtype是缓存数据的大小和类型。
学新通

channel中含有两个队列分别是:接收队列 recvq发送队列 sendq。其底层结构为双向链表 waitq,包含一个头结点和一个尾结点。每个节点是一个sudog结构体变量,记录哪个协程 g 在等待,等待的是哪个 hchan,等待发送/接收的数据 elem 在哪里。

sudog代表着等待队列中的一个goroutine,G与同步对象(指chan)关系是多对多的。一个 G 可以出现在许多等待队列上,因此一个 G 可能有多个sudog。并且多个 G 可能正在等待同一个同步对象,因此一个对象可能有许多 sudog。sudog 是从特殊池中分配出来的。使用 acquireSudog 和 releaseSudog 分配和释放它们。

type waitq struct {
  first *sudog
  last  *sudog
}

type sudog struct {
    g *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer 
    c        *hchan 
    ...
}

3、Channel 运行逻辑

此部分源码解析可参考这篇博客

阻塞唤醒调度示例可参考这篇博客

创建 channel

// 带缓冲
ch := make(chan int, 3)
// 不带缓冲
ch := make(chan int)

创建时的策略:

  • 如果是无缓冲的 channel,会直接在堆上给 hchan 分配内存;
  • 如果是有缓冲的 channel,并且元素不包含指针,那么会在堆上为 hchan 和底层 buf 数组分配一段连续的地址;
  • 如果是有缓冲的 channel,并且元素包含指针,那么会在堆上为 hchan 和底层 buf 数组分别分配地址;

当存储在 buf 中的元素不包含指针时,hchan 中也不包含 GC 关心的指针。buf 指向一段相同元素类型的内存,elemtype 固定不变。受到垃圾回收器的限制,指针类型的缓冲 buf 需要单独分配内存。

创建channel实际上就是在堆内存中实例化了一个hchan的结构体,并返回一个ch指针,我们使用过程中channel在函数之间的传递都是用的这个指针,这就是为什么函数传递中无需使用channel的指针,而直接用channel就行了。

发送数据

向 channel 中发送数据时大概分为两大块:检查和数据发送

  • 如果 channel 的读等待队列 recvq 存在接收者goroutine
    • 将数据直接发送给第一个等待的 goroutine(将数据拷贝到了接收者的内存地址上), 唤醒接收的 goroutine(将等待接收的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable)
  • 如果 channel 的读等待队列 recvq 不存在接收者goroutine
    • 如果循环数组buf未满,那么将会把数据发送到循环数组 buf 的队尾(需要给 buf** 加锁**)
    • 如果循环数组buf已满,这个时候就会走阻塞发送的流程,将当前 goroutine 加入写等待队列 sendq,并挂起等待唤醒

接收数据

向 channel 中接收数据时大概分为两大块,检查和数据接收

  • 如果 channel 的写等待队列 sendq 存在发送者goroutine
    • 如果是无缓冲 channel,直接从第一个发送者goroutine那里把数据拷贝给接收变量,唤醒发送的 goroutine
    • 如果是有缓冲 channel(已满),将循环数组buf的队首元素拷贝给接收变量,将第一个发送者goroutine的数据拷贝到 buf循环数组队尾,唤醒发送的 goroutine
  • 如果 channel 的写等待队列 sendq 不存在发送者goroutine
    • 如果循环数组buf非空,将循环数组buf的队首元素拷贝给接收变量
    • 如果循环数组buf为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列 recvq ,并挂起等待唤醒

发送/接收操作细节

缓存链表如果要使用 buf,每一步的操作,都是需要加锁的,每一步的操作的细节可以细化为:

  • 第一,加锁
  • 第二,把数据从goroutine中copy到“队列”中(或者从队列中copy到goroutine中)。
  • 第三,释放锁

Go中那句经典的话:Do not communicate by sharing memory; instead, share memory by communicating.的具体实现就是利用channel把数据从一个G copy到了另一个 G。

goroutine 阻塞唤醒与调度

当channel缓存满了,或者没有缓存的时候,我们继续 send(ch <- xxx) 或者 recv(<- ch) 会阻塞当前goroutine。这里就是 Go 运行时的 scheduler 完成的调度,具体的 GMP 调度模型可以看以前的博客

goroutine的阻塞操作,实际上是调用 send (ch <- xx)或者 recv ( <-ch) 的时候主动触发的。

以发送为例,G1 协程执行以下操作,导致 channel 已经满了。

//goroutine1 中,记做G1
ch := make(chan int, 3)
ch <- 1
ch <- 1
ch <- 1

再次进行send操作 (ch<-1) 的时候,会主动调用Go的调度器,让G1等待并让出M,让其他G去使用。
学新通

同时G1也会被抽象成含有G1指针和send元素的sudog结构体保存到hchan的sendq等待被唤醒

随后 G2 执行了recv操作 p := <-ch,G2从缓存队列中取出数据,channel会将等待队列 sendq 中的G1推出,将G1 要发送的数据推到缓存中,然后调用Go的scheduler,唤醒G1,并把G1放到 P 可运行的 Goroutine 队列中。

如果是 G2 在接收数据时被阻塞,加入recvq 等待唤醒。然后 G1 发送了数据,此时不需要经过 buf 队列,而是直接把数据从G1直接copy到了G2的栈中,然后调度器唤醒 G2,无需获取锁

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfibha
系列文章
更多 icon
同类精品
更多 icon
继续加载