Skip to content

Commit 96ff7c2

Browse files
committed
update
1 parent 026ba4d commit 96ff7c2

File tree

23 files changed

+1217
-31
lines changed

23 files changed

+1217
-31
lines changed

examples/task-queue/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module task-queue
2+
3+
go 1.17

examples/task-queue/main.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package main
2+
3+
import (
4+
"container/list"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"log"
9+
"math/rand"
10+
"sort"
11+
"sync"
12+
"time"
13+
)
14+
15+
//1. 队列存储调度系统中各种操作任务
16+
//2. 支持add,pop,close接口,会有多个并发的调用方,pop接口如果没有任务可以pop的话会阻塞
17+
//3. add函数包含一个额外参数指定任务能被pop的最早延时,默认为0(马上能pop)
18+
//4. pop函数支持传入一个处理函数,如果处理函数失败,自动把任务重新add到队列中,重新add时的延时和任务重试的次数有关系,重试次数越多,等待越长
19+
//5. pop时按照任务能被pop的最早时间顺序,没有到任务指定的时间不能pop
20+
//6. 队列需要支持关闭,关闭后pop操作直接报错
21+
22+
type Handler func() error
23+
24+
type Task struct {
25+
Name string
26+
MinPopAt time.Duration //最小的调用时延
27+
RunAt time.Time
28+
Handler Handler // 处理函数
29+
}
30+
31+
type Queue struct {
32+
ctx context.Context
33+
cancel context.CancelFunc
34+
wg *sync.WaitGroup
35+
Locker *sync.Mutex
36+
Tasks []*Task
37+
Tasks2 *list.List
38+
Task chan *Task
39+
}
40+
41+
func NewQueue(size uint) *Queue {
42+
ctx, cancel := context.WithCancel(context.Background())
43+
q := &Queue{
44+
ctx: ctx,
45+
cancel: cancel,
46+
wg: new(sync.WaitGroup),
47+
Locker: new(sync.Mutex),
48+
Task: make(chan *Task, size),
49+
Tasks2: list.New(),
50+
}
51+
52+
q.wg.Add(1)
53+
go func() {
54+
defer q.wg.Done()
55+
for {
56+
select {
57+
case <-ctx.Done():
58+
return
59+
default:
60+
q.Locker.Lock()
61+
if len(q.Tasks) > 0 {
62+
time.Sleep(q.Tasks[0].RunAt.Sub(time.Now()))
63+
q.Task <- q.Tasks[0]
64+
if len(q.Tasks) > 1 {
65+
q.Tasks = q.Tasks[1:]
66+
}else {
67+
q.Tasks = nil
68+
}
69+
//for i, task := range q.Tasks {
70+
// if task.RunAt.Before(time.Now()) {
71+
// q.Task <- task
72+
// log.Println("i ", i, len(q.Tasks))
73+
// q.Tasks = append(q.Tasks[:i], q.Tasks[i+1:]...)
74+
// }
75+
//}
76+
}
77+
q.Locker.Unlock()
78+
}
79+
}
80+
}()
81+
return q
82+
}
83+
84+
// Add 添加任务
85+
func (q *Queue) Add(task *Task, minPopAt time.Duration) {
86+
task.RunAt = time.Now().Add(minPopAt)
87+
task.MinPopAt = minPopAt
88+
q.Locker.Lock()
89+
//if len(q.Tasks) == 0 {
90+
// q.Tasks = append(q.Tasks, task)
91+
//}
92+
//for i, v := range q.Tasks {
93+
//if v.RunAt.After(task.RunAt) {
94+
// q.Tasks = append(q.Tasks[:i], task)
95+
// q.Tasks = append(q.Tasks, q.Tasks[i+1:]...)
96+
// break
97+
//}
98+
//}
99+
// 1 2
100+
log.Printf("add task %s, pop at %s", task.Name, task.RunAt.Format("2006/01/02 15:04:05"))
101+
//if q.Tasks2.Len() == 0 {
102+
// q.Tasks2.PushBack(task)
103+
//} else {
104+
// for e := q.Tasks2.Front(); e != nil; e = e.Next() {
105+
// if e.Value.(*Task).RunAt.Before() {}
106+
//}
107+
//}
108+
q.Tasks = append(q.Tasks, task)
109+
sort.Slice(q.Tasks, func(i, j int) bool {
110+
return q.Tasks[i].RunAt.Before(q.Tasks[j].RunAt)
111+
})
112+
q.Locker.Unlock()
113+
}
114+
115+
// Pop 执行任务
116+
func (q *Queue) Pop(handler Handler) {
117+
task, ok := <-q.Task
118+
if !ok {
119+
panic("channel already close")
120+
}
121+
log.Printf("pop task %s", task.Name)
122+
if err := handler(); err != nil {
123+
log.Printf("pop task %s err reAdd", task.Name)
124+
q.Add(task, task.MinPopAt*2)
125+
return
126+
}
127+
}
128+
129+
// Close 关闭队列
130+
func (q *Queue) Close() {
131+
q.cancel()
132+
close(q.Task)
133+
q.wg.Wait()
134+
log.Println("close queue...")
135+
}
136+
137+
func main1() {
138+
queue := NewQueue(10)
139+
for i := 0; i < 10; i++ {
140+
go queue.Add(&Task{Name: fmt.Sprintf("%d", i+1), Handler: func() error { return nil }}, time.Second*time.Duration(rand.Intn(5)+2))
141+
}
142+
143+
for i := 0; i < 8; i++ {
144+
if i == 6 {
145+
go queue.Pop(func() error { return errors.New("pop err") })
146+
}
147+
go queue.Pop(func() error { return nil })
148+
}
149+
time.AfterFunc(time.Second*10, func() {
150+
queue.Close()
151+
})
152+
go queue.Pop(func() error { return nil })
153+
time.Sleep(time.Second * 10)
154+
}

examples/task-queue/queue.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"log"
7+
"sort"
8+
"sync"
9+
"time"
10+
)
11+
12+
//1. 队列存储调度系统中各种操作任务
13+
//2. 支持add,pop,close接口,会有多个并发的调用方,pop接口如果没有任务可以pop的话会阻塞
14+
//3. add函数包含一个额外参数指定任务能被pop的最早延时,默认为0(马上能pop)
15+
//4. pop函数支持传入一个处理函数,如果处理函数失败,自动把任务重新add到队列中,重新add时的延时和任务重试的次数有关系,重试次数越多,等待越长
16+
//5. pop时按照任务能被pop的最早时间顺序,没有到任务指定的时间不能pop
17+
//6. 队列需要支持关闭,关闭后pop操作直接报错
18+
19+
type job struct {
20+
name string // 任务名
21+
at time.Time // 任务执行时间
22+
duration time.Duration // 最小的延时
23+
action func() error // 执行函数
24+
}
25+
26+
type jobs []job
27+
28+
type queue struct {
29+
jobs jobs // 任务队列
30+
jobChAdd chan job // 添加的任务
31+
jobChPop chan job // 可以消费的任务
32+
stop chan struct{} // 关闭队列标志
33+
once sync.Once // 单例执行
34+
wg sync.WaitGroup // 等待协程完成
35+
mu sync.Mutex // 锁
36+
}
37+
38+
// do 任务监控
39+
func (q *queue) do() {
40+
q.wg.Add(1)
41+
go func() {
42+
defer q.wg.Done()
43+
for {
44+
select {
45+
case <-q.stop:
46+
close(q.stop)
47+
close(q.jobChAdd)
48+
close(q.jobChPop)
49+
return
50+
case job, ok := <-q.jobChAdd:
51+
if !ok {
52+
return
53+
}
54+
// 将来的执行时间
55+
job.at = time.Now().Add(job.duration)
56+
q.mu.Lock()
57+
q.jobs = append(q.jobs, job)
58+
log.Printf("add job %s run at %s size %d\n", job.name, job.at.Format("2006/01/02 15:04:05"), len(q.jobs))
59+
// 对队列里面的任务根据最小的时延排序,最先执行的任务在队首
60+
sort.Slice(q.jobs, func(i, j int) bool {
61+
return q.jobs[i].at.Before(q.jobs[j].at)
62+
})
63+
q.mu.Unlock()
64+
default:
65+
q.mu.Lock()
66+
if len(q.jobs) > 0 {
67+
sub := q.jobs[0].at.Sub(time.Now())
68+
if sub > time.Duration(0) {
69+
time.Sleep(sub)
70+
}
71+
select {
72+
case q.jobChPop <- q.jobs[0]:
73+
if len(q.jobs) == 1 {
74+
q.jobs = nil
75+
} else {
76+
q.jobs = q.jobs[1:]
77+
}
78+
//default: // 消费队列已满,丢掉当前的任务,或去掉default分支阻塞消费
79+
// log.Printf("break %s", q.jobs[0].name)
80+
}
81+
}
82+
q.mu.Unlock()
83+
time.Sleep(time.Millisecond)
84+
}
85+
}
86+
}()
87+
}
88+
89+
// add 添加任务
90+
// 1. 当队列里面的任务满了,会丢弃添加的任务
91+
// 2. 如果不丢弃的,去掉default分支,会阻塞当前任务添加,当队列里面的任务有被消费时候添加
92+
func (q *queue) add(job job, duration time.Duration) {
93+
job.duration = duration
94+
select {
95+
case q.jobChAdd <- job:
96+
//default:
97+
}
98+
}
99+
100+
// pop 任务出队
101+
func (q *queue) pop(handler func() error) {
102+
job, ok := <-q.jobChPop
103+
if !ok {
104+
panic("sorry close channel")
105+
}
106+
if err := handler(); err != nil {
107+
log.Printf("pop %s error \n", job.name)
108+
q.add(job, job.duration*2) // 执行失败,最小的延时*2
109+
return
110+
}
111+
log.Printf("pop %s success\n", job.name)
112+
113+
}
114+
115+
// 关闭队列
116+
func (q *queue) close() {
117+
q.once.Do(func() {
118+
q.stop <- struct{}{}
119+
q.wg.Wait()
120+
log.Println("close...")
121+
})
122+
}
123+
124+
func main() {
125+
q := queue{
126+
jobChAdd: make(chan job, 10),
127+
jobChPop: make(chan job, 10),
128+
stop: make(chan struct{}),
129+
}
130+
q.do()
131+
for i := 1; i < 110; i++ {
132+
go q.add(job{name: fmt.Sprintf("%d", i), action: func() error { return nil }}, time.Second*time.Duration(i%4+1))
133+
if i%10 == 0 {
134+
go q.pop(func() error { return errors.New("pop err") })
135+
} else {
136+
go q.pop(func() error { return nil })
137+
}
138+
}
139+
for i := 0; i < 50; i++ {
140+
go q.pop(func() error { return nil })
141+
}
142+
143+
time.AfterFunc(time.Second*30, func() {
144+
q.close()
145+
q.pop(func() error { return nil })
146+
})
147+
time.Sleep(time.Second * 100)
148+
}

go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ require (
2020
github.com/lib/pq v1.10.3
2121
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
2222
github.com/robfig/cron/v3 v3.0.1
23+
github.com/shirou/gopsutil/v3 v3.22.6
2324
github.com/sirupsen/logrus v1.8.1
2425
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07
2526
github.com/tbrandon/mbserver v0.0.0-20210320091329-a1f8ae952881
2627
github.com/tealeg/xlsx v1.0.5
2728
github.com/tidwall/evio v1.0.8
2829
github.com/xiegeo/modbusone v0.2.3
2930
github.com/zing-dev/4g-lte-sdk v0.0.1
30-
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0
31+
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
3132
golang.org/x/tour v0.1.0
3233
google.golang.org/protobuf v1.27.1
3334
gorm.io/driver/mysql v1.1.2
@@ -40,6 +41,7 @@ require (
4041
require (
4142
github.com/denisenkom/go-mssqldb v0.10.0 // indirect
4243
github.com/emirpasic/gods v1.12.0 // indirect
44+
github.com/go-ole/go-ole v1.2.6 // indirect
4345
github.com/go-sql-driver/mysql v1.6.0 // indirect
4446
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
4547
github.com/golang/mock v1.3.1 // indirect
@@ -58,14 +60,17 @@ require (
5860
github.com/jinzhu/now v1.1.2 // indirect
5961
github.com/json-iterator/go v1.1.9 // indirect
6062
github.com/kavu/go_reuseport v1.5.0 // indirect
63+
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
6164
github.com/mattn/go-sqlite3 v1.14.8 // indirect
6265
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
6366
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
6467
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
6568
github.com/pkg/errors v0.9.1 // indirect
69+
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
6670
github.com/tidwall/gjson v1.2.1 // indirect
6771
github.com/tidwall/match v1.0.1 // indirect
6872
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
73+
github.com/yusufpapurcu/wmi v1.2.2 // indirect
6974
go.uber.org/atomic v1.6.0 // indirect
7075
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
7176
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8 // indirect

0 commit comments

Comments
 (0)