• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

go语言管道(channel)

武飞扬头像
两片空白
帮助1

前言

        channel式go语言协程中数据通信的双向通道。但是在实际应用中,为了代码的简单和易懂,一般使用的channel是单向的。

学新通

使用

1. channel的定义和收发数据

  1.  
    package channel
  2.  
     
  3.  
     
  4.  
    func main(){
  5.  
    //var c chan int c的默认值位nil,一般不使用
  6.  
    c := make(chan int)
  7.  
    c <- 1 //发数据
  8.  
    n := <-c //收数据
  9.  
    }

但是上面收发数据的写法是错误的,因为一个协程往管道里发送数据,必须要有一个协程来接受数据,否则会造成死锁。(协程往管道发送数据和从管道接受数据为了防止数据紊乱,会加上锁)

注意:

        协程从管道里接收数据时,当没有收到数据时,会一直等待。这样效率不是很高,使用select,可以实现同步的功能,后面有介绍。

正确写法:

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func chanDemo() {
  9.  
    c := make(chan int)
  10.  
    go func() {
  11.  
    for {
  12.  
    n := <-c //收数据
  13.  
    fmt.Println(n)
  14.  
    }
  15.  
    }()
  16.  
    c <- 1 //发数据
  17.  
    c <- 2
  18.  
     
  19.  
    //防止数据还没有在协程里打印,main函数退出
  20.  
    time.Sleep(time.Millisecond)
  21.  
    }
  22.  
     
  23.  
    func main() {
  24.  
    chanDemo()
  25.  
    }
学新通

2. channel作为参数

        channel在go语言里作为一等公民,可以作为参数和返回值

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func worker(c chan int) {
  9.  
    for {
  10.  
    n := <-c
  11.  
    fmt.Println(n)
  12.  
    }
  13.  
    }
  14.  
     
  15.  
    func chanDemo() {
  16.  
    var channels [10]chan int
  17.  
    for i := 0; i < 10; i {
  18.  
    channels[i] = make(chan int)
  19.  
    go worker(channels[i])
  20.  
    }
  21.  
     
  22.  
    for i := 0; i < 10; i {
  23.  
    channels[i] <- i
  24.  
    }
  25.  
     
  26.  
    //防止数据还没有在协程里打印,main函数退出
  27.  
    time.Sleep(time.Second)
  28.  
    }
  29.  
     
  30.  
    func main() {
  31.  
    chanDemo()
  32.  
    }
学新通

3. channel做返回值

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func worker(id int, c chan int) {
  9.  
    for {
  10.  
    fmt.Printf("worker %d, reseived %d\n", id, <-c)
  11.  
    }
  12.  
    }
  13.  
     
  14.  
    //返回一个channel
  15.  
    func createWoker(id int) chan int {
  16.  
    c := make(chan int)
  17.  
    go worker(id, c)
  18.  
    return c
  19.  
    }
  20.  
     
  21.  
    func chanDemo() {
  22.  
    var channels [10]chan int
  23.  
    for i := 0; i < 10; i {
  24.  
    channels[i] = createWoker(i)
  25.  
    }
  26.  
     
  27.  
    for i := 0; i < 10; i {
  28.  
    channels[i] <- i
  29.  
    }
  30.  
     
  31.  
    //防止数据还没有在协程里打印,main函数退出
  32.  
    time.Sleep(time.Second)
  33.  
    }
  34.  
     
  35.  
    func main() {
  36.  
    chanDemo()
  37.  
    }
学新通

但是在实际应用中,当项目很复杂时,createWoker返回的channel需要告诉调用方,返回的channel该如何使用,我们可以在返回的channel加上限制。如下:

  1.  
    //返回一个channel,只能被接收数据
  2.  
    func createWoker(id int) <-chan int {
  3.  
    c := make(chan int)
  4.  
    go worker(id, c)
  5.  
    return c
  6.  
    }
  7.  
     
  8.  
    //返回一个channel,只能发送数据
  9.  
    func createWoker(id int) chan<- int {
  10.  
    c := make(chan int)
  11.  
    go worker(id, c)
  12.  
    return c
  13.  
    }

Channel缓存

        上面说,当一个协程向channel发送数据时,必须要有一个协程来接收数据,否则会造成死锁。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func channel() {
  9.  
    c := make(chan int)
  10.  
    c <- 1
  11.  
    }
  12.  
     
  13.  
    func main() {
  14.  
    channel()
  15.  
    }
学新通

学新通

         但是我们可以建立一个channel缓存,先将数据放到缓存中,需要时再使用,即使,没有协程接收也不会造成死锁。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func bufferedChannel() {
  9.  
    //channel缓存
  10.  
    c := make(chan int, 3)
  11.  
     
  12.  
    c <- 1
  13.  
    c <- 2
  14.  
    c <- 3
  15.  
    }
  16.  
     
  17.  
    func main() {
  18.  
    bufferedChannel()
  19.  
    }
学新通

学新通

 关闭channel,并不是销毁

        当发送完数据,channel是可以被关闭的。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func worker(id int, c chan int) {
  9.  
    for {
  10.  
    fmt.Printf("worker %d, reseived %d\n", id, <-c)
  11.  
    }
  12.  
    }
  13.  
     
  14.  
    func closeChannel() {
  15.  
    c := make(chan int)
  16.  
    go worker(0, c)
  17.  
    c <- 1
  18.  
    c <- 2
  19.  
    c <- 3
  20.  
    close(c)
  21.  
     
  22.  
    time.Sleep(time.Millisecond)
  23.  
    }
  24.  
     
  25.  
    func main() {
  26.  
    bufferedChannel()
  27.  
    }
学新通

        当发送方关闭channel后,接收方还是可以从channel接收到数据,接收到的数据是后面类型的默认值。

学新通

         但是,这样是不合理的,接收方同样可以判断发送方是否关闭了channel

        有两种方法可以判断发送方是否关闭了channel

方法一:利用判断。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func worker(id int, c chan int) {
  9.  
    for {
  10.  
    n, ok := <-c
  11.  
    //说明发送方关闭了channel
  12.  
    if !ok {
  13.  
    break
  14.  
    }
  15.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  16.  
    }
  17.  
    }
  18.  
     
  19.  
    func closeChannel() {
  20.  
    c := make(chan int)
  21.  
    go worker(0, c)
  22.  
    c <- 1
  23.  
    c <- 2
  24.  
    c <- 3
  25.  
    close(c)
  26.  
     
  27.  
    time.Sleep(time.Millisecond)
  28.  
    }
  29.  
     
  30.  
    func main() {
  31.  
    closeChannel()
  32.  
    }
学新通

学新通

 方法二:利用range,当发送方关闭了channel,range也会自动退出。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "time"
  6.  
    )
  7.  
     
  8.  
    func worker(id int, c chan int) {
  9.  
    for n := range c {
  10.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  11.  
    }
  12.  
    }
  13.  
     
  14.  
    func closeChannel() {
  15.  
    c := make(chan int)
  16.  
    go worker(0, c)
  17.  
    c <- 1
  18.  
    c <- 2
  19.  
    c <- 3
  20.  
    close(c)
  21.  
     
  22.  
    time.Sleep(time.Millisecond)
  23.  
    }
  24.  
     
  25.  
    func main() {
  26.  
    closeChannel()
  27.  
    }
学新通

使用channel等待任务结束

        上面在每一个函数中,我们都加了一段延时,来等待协程的结束。但是,这种做法很不好。

        处理一般是,当协程任务结束,通知外层任务结束。一种思想,通过通信来共享内存,而不是通过共享内存来通信。延时就不需要了。

方法一:用一个channel来通知外层

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    )
  6.  
     
  7.  
    type worker struct {
  8.  
    in chan int
  9.  
    done chan bool
  10.  
    }
  11.  
     
  12.  
    func doWorker(id int, in chan int, done chan bool) {
  13.  
    for n := range in {
  14.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  15.  
    //说明任务结束
  16.  
    done <- true
  17.  
    }
  18.  
    }
  19.  
     
  20.  
    //返回一个channel
  21.  
    func createWoker(id int) worker {
  22.  
    w := worker{
  23.  
    in: make(chan int),
  24.  
    done: make(chan bool),
  25.  
    }
  26.  
    go doWorker(id, w.in, w.done)
  27.  
    return w
  28.  
    }
  29.  
     
  30.  
    func chanDemo() {
  31.  
    var channels [10]worker
  32.  
    for i := 0; i < 10; i {
  33.  
    channels[i] = createWoker(i)
  34.  
    }
  35.  
     
  36.  
    for i := 0; i < 10; i {
  37.  
    channels[i].in <- i
  38.  
    //等到任务结束
  39.  
    <-channels[i].done
  40.  
    }
  41.  
     
  42.  
    }
  43.  
     
  44.  
    func main() {
  45.  
    chanDemo()
  46.  
    }
学新通

学新通

注意:

  1.  
    //等到任务结束
  2.  
    <-channels[i].done

channel发送和接收数据时阻塞式的,必须要有人接收和发送数据,不然会一直等待。

如果在doWorker中,不返回任务结束的消息,会造成死锁。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    )
  6.  
     
  7.  
    type worker struct {
  8.  
    in chan int
  9.  
    done chan bool
  10.  
    }
  11.  
     
  12.  
    func doWorker(id int, in chan int, done chan bool) {
  13.  
    for n := range in {
  14.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  15.  
    // //说明任务结束
  16.  
    // done <- true
  17.  
    }
  18.  
    }
  19.  
     
  20.  
    //返回一个channel
  21.  
    func createWoker(id int) worker {
  22.  
    w := worker{
  23.  
    in: make(chan int),
  24.  
    done: make(chan bool),
  25.  
    }
  26.  
    go doWorker(id, w.in, w.done)
  27.  
    return w
  28.  
    }
  29.  
     
  30.  
    func chanDemo() {
  31.  
    var channels [10]worker
  32.  
    for i := 0; i < 10; i {
  33.  
    channels[i] = createWoker(i)
  34.  
    }
  35.  
     
  36.  
    for i := 0; i < 10; i {
  37.  
    channels[i].in <- i
  38.  
    //等到任务结束
  39.  
    <-channels[i].done
  40.  
    }
  41.  
     
  42.  
    }
  43.  
     
  44.  
    func main() {
  45.  
    chanDemo()
  46.  
    }
学新通

学新通

 但是,上面正确的情况打印是顺序打印的,没有体现并行的好处。这是因为发送数据时,是发一个数据,等一个数据。我们需要做到,将数据全部发出去,再进行全部的等待。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    )
  6.  
     
  7.  
    type worker struct {
  8.  
    in chan int
  9.  
    done chan bool
  10.  
    }
  11.  
     
  12.  
    func doWorker(id int, in chan int, done chan bool) {
  13.  
    for n := range in {
  14.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  15.  
    // //说明任务结束
  16.  
    done <- true
  17.  
    }
  18.  
    }
  19.  
     
  20.  
    //返回一个channel
  21.  
    func createWoker(id int) worker {
  22.  
    w := worker{
  23.  
    in: make(chan int),
  24.  
    done: make(chan bool),
  25.  
    }
  26.  
    go doWorker(id, w.in, w.done)
  27.  
    return w
  28.  
    }
  29.  
     
  30.  
    func chanDemo() {
  31.  
    var channels [10]worker
  32.  
    for i := 0; i < 10; i {
  33.  
    channels[i] = createWoker(i)
  34.  
    }
  35.  
     
  36.  
    //发送一次
  37.  
    for i, worker := range channels {
  38.  
    worker.in <- i
  39.  
    }
  40.  
    //发送第二次
  41.  
    for i, worker := range channels {
  42.  
    worker.in <- i * 10
  43.  
    }
  44.  
     
  45.  
    //等待任务结束
  46.  
    for _, worker := range channels {
  47.  
    //发送了两次,接收两次
  48.  
    <-worker.done
  49.  
    <-worker.done
  50.  
    }
  51.  
     
  52.  
    }
  53.  
     
  54.  
    func main() {
  55.  
    chanDemo()
  56.  
    }
学新通

但是上面会造成死锁,因为,发送第一次数据的时候,在协程中,往done中发送了数据,没有接收,一直在done<-true等待,第二次往in中再发送数据,没有接收,造成死锁。

学新通

 解决:

1. 我们可以在在协程里再开一个协程,专门往done中发送数据,那么从in中接收数据的协程就不会阻塞了。

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    )
  6.  
     
  7.  
    type worker struct {
  8.  
    in chan int
  9.  
    done chan bool
  10.  
    }
  11.  
     
  12.  
    func doWorker(id int, in chan int, done chan bool) {
  13.  
    for n := range in {
  14.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  15.  
    // //说明任务结束
  16.  
    go func() {
  17.  
    done <- true
  18.  
    }()
  19.  
    }
  20.  
    }
  21.  
     
  22.  
    //返回一个channel
  23.  
    func createWoker(id int) worker {
  24.  
    w := worker{
  25.  
    in: make(chan int),
  26.  
    done: make(chan bool),
  27.  
    }
  28.  
    go doWorker(id, w.in, w.done)
  29.  
    return w
  30.  
    }
  31.  
     
  32.  
    func chanDemo() {
  33.  
    var channels [10]worker
  34.  
    for i := 0; i < 10; i {
  35.  
    channels[i] = createWoker(i)
  36.  
    }
  37.  
     
  38.  
    //发送一次
  39.  
    for i, worker := range channels {
  40.  
    worker.in <- i
  41.  
    }
  42.  
    //发送第二次
  43.  
    for i, worker := range channels {
  44.  
    worker.in <- i * 10
  45.  
    }
  46.  
     
  47.  
    //等待任务结束
  48.  
    for _, worker := range channels {
  49.  
    //发送了两次,接收两次
  50.  
    <-worker.done
  51.  
    <-worker.done
  52.  
    }
  53.  
     
  54.  
    }
  55.  
     
  56.  
    func main() {
  57.  
    chanDemo()
  58.  
    }
学新通

学新通

 解决2:使用sync官方库的waitgroup

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "sync"
  6.  
    )
  7.  
     
  8.  
    type worker struct {
  9.  
    in chan int
  10.  
    wg *sync.WaitGroup
  11.  
    }
  12.  
     
  13.  
    func doWorker(id int, in chan int, wg *sync.WaitGroup) {
  14.  
    for n := range in {
  15.  
    fmt.Printf("worker %d, reseived %d\n", id, n)
  16.  
     
  17.  
    //任务做完
  18.  
    wg.Done()
  19.  
    }
  20.  
    }
  21.  
     
  22.  
    //返回一个channel
  23.  
    func createWoker(id int, wg *sync.WaitGroup) worker {
  24.  
    w := worker{
  25.  
    in: make(chan int),
  26.  
    wg: wg,
  27.  
    }
  28.  
    go doWorker(id, w.in, w.wg)
  29.  
    return w
  30.  
    }
  31.  
     
  32.  
    func chanDemo() {
  33.  
    var wg sync.WaitGroup
  34.  
     
  35.  
    var channels [10]worker
  36.  
    for i := 0; i < 10; i {
  37.  
    channels[i] = createWoker(i, &wg)
  38.  
    }
  39.  
     
  40.  
    //增加20个任务进行等待,总共有20个任务
  41.  
    wg.Add(20)
  42.  
    //发送一次
  43.  
    for i, worker := range channels {
  44.  
    worker.in <- i
  45.  
    }
  46.  
    //发送第二次
  47.  
    for i, worker := range channels {
  48.  
    worker.in <- i * 10
  49.  
    }
  50.  
     
  51.  
    //等待任务结束
  52.  
    wg.Wait()
  53.  
    }
  54.  
     
  55.  
    func main() {
  56.  
    chanDemo()
  57.  
    }
学新通

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfibbg
系列文章
更多 icon
同类精品
更多 icon
继续加载