介绍
元素为[]byte
的队列的golang实现(适用于多线程环境下,当然单线程也能用
如果想更改队列的元素类型,请自行将queue [][]byte
中的[]byte
替换为其他类型,同时修改函数中的相关代码
- 应使用
NewQueue(size int) *Queue
来创建队列以初始化队列最大长度 - 通过互斥锁和信号池保证了在队列非空非满的情况下能够同时
push
和pop
,且在队列空和满的状态下push
和pop
不会冲突 - 如果队列为空,则
pop
会阻塞,同理队列为满时,push
会阻塞 - 由于元素
[]byte
为slice,deepCopy
参数决定push
和pop
出来的[]byte
是深拷贝还是浅拷贝 - 队列的数据存储使用的循环数组,避免频繁append导致的资源损耗
queue.go
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Queue struct {
9 poolStart chan bool
10 poolEnd chan bool
11 pushLock sync.Mutex
12 popLock sync.Mutex
13 maxSize int
14 curSize int32
15 wIndex int
16 rIndex int
17 queue [][]byte
18}
19
20func NewQueue(size int) *Queue {
21 return &Queue{
22 // Start和End信号池用于保证push和pop操作不会互相干扰
23 // 每次Push和Pop操作后,两个信号池中的信号数量都会保持一致
24 poolStart: make(chan bool, size),
25 poolEnd: make(chan bool, size),
26 // 保证push操作完整性
27 pushLock: sync.Mutex{},
28 // 保证pop操作完整性
29 popLock: sync.Mutex{},
30 // 队列中元素最大数量
31 maxSize: size,
32 // 队列当前元素数量
33 curSize: 0,
34 // push指针
35 wIndex: 0,
36 // pop指针
37 rIndex: 0,
38 // 元素数组
39 queue: make([][]byte, size),
40 }
41}
42
43func (q *Queue) Push(item []byte, deepCopy bool) {
44 q.pushLock.Lock()
45 defer func() {
46 // push成功后队列大小+1
47 atomic.AddInt32(&q.curSize, 1)
48 q.pushLock.Unlock()
49 // 操作必定成功,向End信号池发送一个信号,表示完成此次push
50 q.poolEnd <- true
51 }()
52 // 操作成功代表队列不满,向Start信号池发送一个信号,表示开始push
53 q.poolStart <- true
54
55 if deepCopy {
56 q.queue[q.wIndex] = make([]byte, len(item))
57 copy(q.queue[q.wIndex], item)
58 } else {
59 q.queue[q.wIndex] = item
60 }
61
62 q.wIndex++
63 if q.wIndex >= q.maxSize {
64 q.wIndex = 0
65 }
66}
67
68func (q *Queue) Pop(deepCopy bool) (item []byte) {
69 q.popLock.Lock()
70 defer func() {
71 // pop成功后队列大小-1
72 atomic.AddInt32(&q.curSize, -1)
73 q.popLock.Unlock()
74 // 操作必定成功,当前元素已经成功取出,释放当前位置
75 <-q.poolStart
76 }()
77 // 操作成功代表队列非空,只有End信号池中有信号,才能保证有完整的元素在队列中
78 <-q.poolEnd
79
80 if deepCopy {
81 item = make([]byte, len(q.queue[q.rIndex]))
82 copy(item, q.queue[q.rIndex])
83 } else {
84 item = q.queue[q.rIndex]
85 }
86 q.queue[q.rIndex] = nil
87
88 q.rIndex++
89 if q.rIndex >= q.maxSize {
90 q.rIndex = 0
91 }
92 return
93}
94
95func (q *Queue) Size() int32 {
96 return atomic.LoadInt32(&q.curSize)
97}
98
99func (q *Queue) IsEmpty() bool {
100 return atomic.LoadInt32(&q.curSize) == 0
101}
queue_test.go
1package queue
2
3import (
4 "fmt"
5 "testing"
6 "time"
7)
8
9func TestQueue(t *testing.T) {
10 queue := NewQueue(3)
11 b1 := []byte{11, 11}
12 b2 := []byte{22, 22}
13 b3 := []byte{33, 33}
14 b4 := []byte{44, 44}
15
16 fmt.Printf("push b1 curSize:%d\n", queue.Size())
17 queue.Push(b1, true)
18 fmt.Printf("pushed b1 curSize:%d\n", queue.Size())
19 fmt.Printf("push b2 curSize:%d\n", queue.Size())
20 queue.Push(b2, false)
21 fmt.Printf("pushed b2 curSize:%d\n", queue.Size())
22 fmt.Printf("push b3 curSize:%d\n", queue.Size())
23 queue.Push(b3, false)
24 fmt.Printf("pushed b3 curSize:%d\n", queue.Size())
25
26 // test: push堵塞
27 go func() {
28 time.Sleep(3 * time.Second)
29 fmt.Printf("pop b10 curSize:%d\n", queue.Size())
30 // test: 深push, 浅pop
31 b1[0] = 22
32 b10 := queue.Pop(false)
33 fmt.Printf("poped b10 [%d,%d] curSize:%d\n", b10[0], b10[1], queue.Size())
34 if b10[0] != 11 || b10[1] != 11 {
35 t.Errorf("b10 expected [11,11], but [%d,%d] got", b10[0], b10[1])
36 }
37 }()
38 fmt.Printf("push b4 curSize:%d\n", queue.Size())
39 queue.Push(b4, true)
40 fmt.Printf("pushed b4 curSize:%d\n", queue.Size())
41 if queue.Size() != 3 {
42 t.Errorf("queue expected 3, but %d got", queue.Size())
43 }
44
45 time.Sleep(5 * time.Second)
46 fmt.Printf("pop b11 curSize:%d\n", queue.Size())
47 b11 := queue.Pop(true)
48 // test: 浅push, 深pop
49 b2[0] = 33
50 fmt.Printf("poped b11 [%d,%d] curSize:%d\n", b11[0], b11[1], queue.Size())
51 if b11[0] != 22 || b11[1] != 22 {
52 t.Errorf("b11 expected [22,22], but [%d,%d] got", b11[0], b11[1])
53 }
54 fmt.Printf("pop b12 curSize:%d\n", queue.Size())
55 b12 := queue.Pop(false)
56 // test: 浅push, 浅pop
57 b3[0] = 44
58 fmt.Printf("poped b12 [%d,%d] curSize:%d\n", b12[0], b12[1], queue.Size())
59 if b12[0] != 44 || b12[1] != 33 {
60 t.Errorf("b12 expected [44,33], but [%d,%d] got", b12[0], b12[1])
61 }
62 fmt.Printf("pop b13 curSize:%d\n", queue.Size())
63 b13 := queue.Pop(true)
64 // test: 深push, 深pop
65 b4[0] = 55
66 fmt.Printf("poped b13 [%d,%d] curSize:%d\n", b13[0], b13[1], queue.Size())
67 if b13[0] != 44 || b13[1] != 44 {
68 t.Errorf("b13 expected [44,44], but [%d,%d] got", b13[0], b13[1])
69 }
70
71 b5 := []byte{55, 55}
72 // test: pop堵塞
73 go func() {
74 time.Sleep(3 * time.Second)
75 fmt.Printf("push b5 curSize:%d\n", queue.Size())
76 queue.Push(b5, false)
77 fmt.Printf("pushed b5 curSize:%d\n", queue.Size())
78 }()
79 fmt.Printf("pop b14 curSize:%d\n", queue.Size())
80 b14 := queue.Pop(false)
81 // test: 浅push, 浅pop
82 fmt.Printf("poped b14 [%d,%d] curSize:%d\n", b14[0], b14[1], queue.Size())
83 if b14[0] != 55 || b14[1] != 55 {
84 t.Errorf("b14 expected [55,55], but [%d,%d] got", b14[0], b14[1])
85 }
86
87 if !queue.IsEmpty() {
88 t.Errorf("queue expected empty, but %v got", queue.IsEmpty())
89 }
90}
除另有声明外,本博客文章均采用 知识共享 (Creative Commons) 署名 4.0 国际许可协议 进行许可。转载请注明原作者与文章出处。