Skip to main content

chan for

code

// 在前面的例子中,我们用 [互斥锁](mutexes) 进行了明确的锁定,
// 来让共享的 state 跨多个 Go 协程同步访问。
// 另一个选择是,使用内建协程和通道的同步特性来达到同样的效果。
// Go 共享内存的思想是,通过通信使每个数据仅被单个协程所拥有,即通过通信实现共享内存。
// 基于通道的方法与该思想完全一致!

package main

import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)

// 在这个例子中,state 将被一个单独的协程拥有。
// 这能保证数据在并行读取时不会混乱。
// 为了对 state 进行读取或者写入,
// 其它的协程将发送一条数据到目前拥有数据的协程中,
// 然后等待接收对应的回复。
// 结构体 `readOp` 和 `writeOp` 封装了这些请求,并提供了响应协程的方法。
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}

func main() {

// 和前面的例子一样,我们会计算操作执行的次数。
var readOps uint64
var writeOps uint64

// 其他协程将通过 `reads` 和 `writes` 通道来发布 `读` 和 `写` 请求。
reads := make(chan readOp)
writes := make(chan writeOp)

// 这就是拥有 `state` 的那个协程,
// 和前面例子中的 map 一样,不过这里的 state 是被这个状态协程私有的。
// 这个协程不断地在 `reads` 和 `writes` 通道上进行选择,并在请求到达时做出响应。
// 首先,执行请求的操作;然后,执行响应,在响应通道 `resp` 上发送一个值,表明请求成功(`reads` 的值则为 state 对应的值)。
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()

// 启动 100 个协程通过 `reads` 通道向拥有 state 的协程发起读取请求。
// 每个读取请求需要构造一个 `readOp`,发送它到 `reads` 通道中,
// 并通过给定的 `resp` 通道接收结果。
for r := 0; r < 100; r++ {
go func() {
for {
read := readOp{
key: rand.Intn(5),
resp: make(chan int)
}
// reads <- read:将 readOp 结构体发送到 reads 通道。这是一个请求读取操作。
// <-read.resp:等待从 resp 通道接收结果。这个通道用于返回读取操作的结果。
reads <- read
<-read.resp
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}

// 用相同的方法启动 10 个写操作。
for w := 0; w < 10; w++ {
go func() {
for {
write := writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
// writes <- write:将 writeOp 结构体发送到 writes 通道。这是一个请求写入操作。
// <-write.resp:等待从 resp 通道接收确认。这个通道用于返回写入操作的确认信号。
writes <- write
<-write.resp
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}

// 让协程们跑 1s。
time.Sleep(time.Second)

// 最后,获取并报告 `ops` 值。
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}

for 循环作用

这段代码中有三个 for 循环,每个循环都有特定的作用:

  1. 状态管理循环

    • 位置:在一个 goroutine 内部。
    • 作用:持续监听 readswrites 通道,处理读取和写入请求。
    • 逻辑:使用 select 语句选择处理 readswrites 通道中的请求,更新状态或返回结果。
  2. 读取操作循环

    • 位置:启动 100 个 goroutine,每个 goroutine 内部有一个循环。
    • 作用:模拟并发读取操作。
    • 逻辑
      • 创建 readOp,随机选择一个键。
      • 发送到 reads 通道并等待响应。
      • 增加 readOps 计数器。
      • 睡眠一段时间以限制操作频率。
  3. 写入操作循环

    • 位置:启动 10 个 goroutine,每个 goroutine 内部有一个循环。
    • 作用:模拟并发写入操作。
    • 逻辑
      • 创建 writeOp,随机选择一个键和值。
      • 发送到 writes 通道并等待响应。
      • 增加 writeOps 计数器。
      • 睡眠一段时间以限制操作频率。

总结

  • 状态管理循环:处理所有的读写请求。
  • 读取操作循环:生成并发读取请求。
  • 写入操作循环:生成并发写入请求。

这种设计通过 goroutine 和通道的结合,实现了对共享状态的安全并发访问。

⚡️ 上述代码:不同的读 goroutine 之间阻塞?不同的写 goroutine 之间阻塞?

是的,在使用无缓冲通道的情况下:

不同的读 goroutine 之间的阻塞

  • 读操作:当多个读 goroutine 试图同时发送请求到 reads 通道时,由于通道是无缓冲的,它们必须等待处理 goroutine 处理完当前请求后,才能继续处理下一个请求。这意味着读操作是同步的,一个接一个地进行。

不同的写 goroutine 之间的阻塞

  • 写操作:同样地,当多个写 goroutine 试图同时发送请求到 writes 通道时,它们也会因为无缓冲通道而阻塞,直到当前写请求被处理 goroutine 处理完毕。写入操作也是同步的。

影响

  • 同步性:这种设计确保了操作的同步性,避免了数据竞争。
  • 性能:由于操作是逐个处理的,可能会影响并发性能,特别是在高并发情况下。
  • 一致性:确保数据的一致性,因为每次只有一个操作(读或写)在进行。