2024-01-06  2024-09-18    1327 字  3 分钟

介绍

元素为[]byte的队列的golang实现(适用于多线程环境下,当然单线程也能用 如果想更改队列的元素类型,请自行将queue [][]byte中的[]byte替换为其他类型,同时修改函数中的相关代码

  1. 应使用NewQueue(size int) *Queue来创建队列以初始化队列最大长度
  2. 通过互斥锁和信号池保证了在队列非空非满的情况下能够同时pushpop,且在队列空和满的状态下pushpop不会冲突
  3. 如果队列为空,则pop会阻塞,同理队列为满时,push会阻塞
  4. 由于元素[]byte为slice,deepCopy参数决定pushpop出来的[]byte是深拷贝还是浅拷贝
  5. 队列的数据存储使用的循环数组,避免频繁append导致的资源损耗

queue.go

  1package queue
  2
  3import (
  4  "sync"
  5  "sync/atomic"
  6)
  7
  8type Queue struct {
  9  poolStart chan bool
 10  poolEnd   chan bool
 11  pushLock  sync.Mutex
 12  popLock   sync.Mutex
 13  maxSize   int
 14  curSize   int32
 15  wIndex    int
 16  rIndex    int
 17  queue     [][]byte
 18}
 19
 20func NewQueue(size int) *Queue {
 21  return &Queue{
 22    // Start和End信号池用于保证push和pop操作不会互相干扰
 23    // 每次Push和Pop操作后,两个信号池中的信号数量都会保持一致
 24    poolStart: make(chan bool, size),
 25    poolEnd:   make(chan bool, size),
 26    // 保证push操作完整性
 27    pushLock: sync.Mutex{},
 28    // 保证pop操作完整性
 29    popLock: sync.Mutex{},
 30    // 队列中元素最大数量
 31    maxSize: size,
 32    // 队列当前元素数量
 33    curSize: 0,
 34    // push指针
 35    wIndex: 0,
 36    // pop指针
 37    rIndex: 0,
 38    // 元素数组
 39    queue: make([][]byte, size),
 40  }
 41}
 42
 43func (q *Queue) Push(item []byte, deepCopy bool) {
 44  q.pushLock.Lock()
 45  defer func() {
 46    // push成功后队列大小+1
 47    atomic.AddInt32(&q.curSize, 1)
 48    q.pushLock.Unlock()
 49    // 操作必定成功,向End信号池发送一个信号,表示完成此次push
 50    q.poolEnd <- true
 51  }()
 52  // 操作成功代表队列不满,向Start信号池发送一个信号,表示开始push
 53  q.poolStart <- true
 54  
 55  if deepCopy {
 56    q.queue[q.wIndex] = make([]byte, len(item))
 57    copy(q.queue[q.wIndex], item)
 58  } else {
 59    q.queue[q.wIndex] = item
 60  }
 61  
 62  q.wIndex++
 63  if q.wIndex >= q.maxSize {
 64    q.wIndex = 0
 65  }
 66}
 67
 68func (q *Queue) Pop(deepCopy bool) (item []byte) {
 69  q.popLock.Lock()
 70  defer func() {
 71    // pop成功后队列大小-1
 72    atomic.AddInt32(&q.curSize, -1)
 73    q.popLock.Unlock()
 74    // 操作必定成功,当前元素已经成功取出,释放当前位置
 75    <-q.poolStart
 76  }()
 77  // 操作成功代表队列非空,只有End信号池中有信号,才能保证有完整的元素在队列中
 78  <-q.poolEnd
 79  
 80  if deepCopy {
 81    item = make([]byte, len(q.queue[q.rIndex]))
 82    copy(item, q.queue[q.rIndex])
 83  } else {
 84    item = q.queue[q.rIndex]
 85  }
 86  q.queue[q.rIndex] = nil
 87  
 88  q.rIndex++
 89  if q.rIndex >= q.maxSize {
 90    q.rIndex = 0
 91  }
 92  return
 93}
 94
 95func (q *Queue) Size() int32 {
 96  return atomic.LoadInt32(&q.curSize)
 97}
 98
 99func (q *Queue) IsEmpty() bool {
100  return atomic.LoadInt32(&q.curSize) == 0
101}

queue_test.go

 1package queue
 2
 3import (
 4  "fmt"
 5  "testing"
 6  "time"
 7)
 8
 9func TestQueue(t *testing.T) {
10  queue := NewQueue(3)
11  b1 := []byte{11, 11}
12  b2 := []byte{22, 22}
13  b3 := []byte{33, 33}
14  b4 := []byte{44, 44}
15  
16  fmt.Printf("push   b1 curSize:%d\n", queue.Size())
17  queue.Push(b1, true)
18  fmt.Printf("pushed b1 curSize:%d\n", queue.Size())
19  fmt.Printf("push   b2 curSize:%d\n", queue.Size())
20  queue.Push(b2, false)
21  fmt.Printf("pushed b2 curSize:%d\n", queue.Size())
22  fmt.Printf("push   b3 curSize:%d\n", queue.Size())
23  queue.Push(b3, false)
24  fmt.Printf("pushed b3 curSize:%d\n", queue.Size())
25  
26  // test: push堵塞
27  go func() {
28    time.Sleep(3 * time.Second)
29    fmt.Printf("pop    b10 curSize:%d\n", queue.Size())
30    // test: 深push, 浅pop
31    b1[0] = 22
32    b10 := queue.Pop(false)
33    fmt.Printf("poped  b10 [%d,%d] curSize:%d\n", b10[0], b10[1], queue.Size())
34    if b10[0] != 11 || b10[1] != 11 {
35      t.Errorf("b10 expected [11,11], but [%d,%d] got", b10[0], b10[1])
36    }
37  }()
38  fmt.Printf("push   b4 curSize:%d\n", queue.Size())
39  queue.Push(b4, true)
40  fmt.Printf("pushed b4 curSize:%d\n", queue.Size())
41  if queue.Size() != 3 {
42    t.Errorf("queue expected 3, but %d got", queue.Size())
43  }
44  
45  time.Sleep(5 * time.Second)
46  fmt.Printf("pop    b11 curSize:%d\n", queue.Size())
47  b11 := queue.Pop(true)
48  // test: 浅push, 深pop
49  b2[0] = 33
50  fmt.Printf("poped  b11 [%d,%d] curSize:%d\n", b11[0], b11[1], queue.Size())
51  if b11[0] != 22 || b11[1] != 22 {
52    t.Errorf("b11 expected [22,22], but [%d,%d] got", b11[0], b11[1])
53  }
54  fmt.Printf("pop    b12 curSize:%d\n", queue.Size())
55  b12 := queue.Pop(false)
56  // test: 浅push, 浅pop
57  b3[0] = 44
58  fmt.Printf("poped  b12 [%d,%d] curSize:%d\n", b12[0], b12[1], queue.Size())
59  if b12[0] != 44 || b12[1] != 33 {
60    t.Errorf("b12 expected [44,33], but [%d,%d] got", b12[0], b12[1])
61  }
62  fmt.Printf("pop    b13 curSize:%d\n", queue.Size())
63  b13 := queue.Pop(true)
64  // test: 深push, 深pop
65  b4[0] = 55
66  fmt.Printf("poped  b13 [%d,%d] curSize:%d\n", b13[0], b13[1], queue.Size())
67  if b13[0] != 44 || b13[1] != 44 {
68    t.Errorf("b13 expected [44,44], but [%d,%d] got", b13[0], b13[1])
69  }
70  
71  b5 := []byte{55, 55}
72  // test: pop堵塞
73  go func() {
74    time.Sleep(3 * time.Second)
75    fmt.Printf("push   b5 curSize:%d\n", queue.Size())
76    queue.Push(b5, false)
77    fmt.Printf("pushed b5 curSize:%d\n", queue.Size())
78  }()
79  fmt.Printf("pop    b14 curSize:%d\n", queue.Size())
80  b14 := queue.Pop(false)
81  // test: 浅push, 浅pop
82  fmt.Printf("poped  b14 [%d,%d] curSize:%d\n", b14[0], b14[1], queue.Size())
83  if b14[0] != 55 || b14[1] != 55 {
84    t.Errorf("b14 expected [55,55], but [%d,%d] got", b14[0], b14[1])
85  }
86  
87  if !queue.IsEmpty() {
88    t.Errorf("queue expected empty, but %v got", queue.IsEmpty())
89  }
90}

除另有声明外本博客文章均采用 知识共享 (Creative Commons) 署名 4.0 国际许可协议 进行许可转载请注明原作者与文章出处