mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Merge pull request #17 from hibiken/feature/stop
Implement better signal handling
This commit is contained in:
commit
bc7991c757
2
asynq.go
2
asynq.go
@ -4,9 +4,9 @@ import "github.com/go-redis/redis/v7"
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
TODOs:
|
TODOs:
|
||||||
- [P0] Proper OS Signal handling - TTIN to stop the processor
|
|
||||||
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
||||||
- [P0] Better Payload API - Assigning int or any number type to Payload will be converted to float64 in handler
|
- [P0] Better Payload API - Assigning int or any number type to Payload will be converted to float64 in handler
|
||||||
|
- [P0] Show elapsed time for InProgress tasks (asynqmon ls inprogress)
|
||||||
- [P0] Redis Memory Usage, Connection info in stats
|
- [P0] Redis Memory Usage, Connection info in stats
|
||||||
- [P0] Processed, Failed count for today
|
- [P0] Processed, Failed count for today
|
||||||
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
|
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
@ -74,10 +75,17 @@ func (bg *Background) Run(handler Handler) {
|
|||||||
bg.start(handler)
|
bg.start(handler)
|
||||||
defer bg.stop()
|
defer bg.stop()
|
||||||
|
|
||||||
// Wait for a signal to exit.
|
// Wait for a signal to terminate.
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, os.Interrupt, os.Kill)
|
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
|
||||||
<-sigs
|
for {
|
||||||
|
sig := <-sigs
|
||||||
|
if sig == syscall.SIGTSTP {
|
||||||
|
bg.processor.stop()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
log.Println("[INFO] Starting graceful shutdown...")
|
log.Println("[INFO] Starting graceful shutdown...")
|
||||||
}
|
}
|
||||||
|
@ -116,6 +116,26 @@ func (r *RDB) Done(msg *TaskMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Requeue moves the task from in-progress queue to the default
|
||||||
|
// queue.
|
||||||
|
func (r *RDB) Requeue(msg *TaskMessage) error {
|
||||||
|
bytes, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
|
}
|
||||||
|
// Note: Use RPUSH to push to the head of the queue.
|
||||||
|
// KEYS[1] -> asynq:in_progress
|
||||||
|
// KEYS[2] -> asynq:queues:default
|
||||||
|
// ARGV[1] -> taskMessage value
|
||||||
|
script := redis.NewScript(`
|
||||||
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
|
redis.call("RPUSH", KEYS[2], ARGV[1])
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
_, err = script.Run(r.client, []string{inProgressQ, defaultQ}, string(bytes)).Result()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Schedule adds the task to the backlog queue to be processed in the future.
|
// Schedule adds the task to the backlog queue to be processed in the future.
|
||||||
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
||||||
bytes, err := json.Marshal(msg)
|
bytes, err := json.Marshal(msg)
|
||||||
@ -144,7 +164,7 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
||||||
}
|
}
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:retry
|
// KEYS[2] -> asynq:retry
|
||||||
// ARGV[1] -> TaskMessage value to remove from InProgress queue
|
// ARGV[1] -> TaskMessage value to remove from InProgress queue
|
||||||
// ARGV[2] -> TaskMessage value to add to Retry queue
|
// ARGV[2] -> TaskMessage value to add to Retry queue
|
||||||
@ -196,8 +216,9 @@ func (r *RDB) Kill(msg *TaskMessage, errMsg string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreUnfinished moves all tasks from in-progress list to the queue.
|
// RestoreUnfinished moves all tasks from in-progress list to the queue
|
||||||
func (r *RDB) RestoreUnfinished() error {
|
// and reports the number of tasks restored.
|
||||||
|
func (r *RDB) RestoreUnfinished() (int64, error) {
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local len = redis.call("LLEN", KEYS[1])
|
local len = redis.call("LLEN", KEYS[1])
|
||||||
for i = len, 1, -1 do
|
for i = len, 1, -1 do
|
||||||
@ -205,8 +226,15 @@ func (r *RDB) RestoreUnfinished() error {
|
|||||||
end
|
end
|
||||||
return len
|
return len
|
||||||
`)
|
`)
|
||||||
_, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result()
|
res, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result()
|
||||||
return err
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
n, ok := res.(int64)
|
||||||
|
if !ok {
|
||||||
|
return 0, fmt.Errorf("could not cast %v to int64", res)
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
|
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
|
||||||
|
@ -112,6 +112,59 @@ func TestDone(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRequeue(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
t1 := newTaskMessage("send_email", nil)
|
||||||
|
t2 := newTaskMessage("export_csv", nil)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
enqueued []*TaskMessage // initial state of the default queue
|
||||||
|
inProgress []*TaskMessage // initial state of the in-progress list
|
||||||
|
target *TaskMessage // task to requeue
|
||||||
|
wantEnqueued []*TaskMessage // final state of the default queue
|
||||||
|
wantInProgress []*TaskMessage // final state of the in-progress list
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
enqueued: []*TaskMessage{},
|
||||||
|
inProgress: []*TaskMessage{t1, t2},
|
||||||
|
target: t1,
|
||||||
|
wantEnqueued: []*TaskMessage{t1},
|
||||||
|
wantInProgress: []*TaskMessage{t2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
enqueued: []*TaskMessage{t1},
|
||||||
|
inProgress: []*TaskMessage{t2},
|
||||||
|
target: t2,
|
||||||
|
wantEnqueued: []*TaskMessage{t1, t2},
|
||||||
|
wantInProgress: []*TaskMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
flushDB(t, r) // clean up db before each test case
|
||||||
|
seedDefaultQueue(t, r, tc.enqueued)
|
||||||
|
seedInProgressQueue(t, r, tc.inProgress)
|
||||||
|
|
||||||
|
err := r.Requeue(tc.target)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("(*RDB).Requeue(task) = %v, want nil", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
|
||||||
|
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
||||||
|
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q: (-want, +got):\n%s", defaultQ, diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val()
|
||||||
|
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
||||||
|
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q: (-want, +got):\n%s", inProgressQ, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestKill(t *testing.T) {
|
func TestKill(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
t1 := newTaskMessage("send_email", nil)
|
t1 := newTaskMessage("send_email", nil)
|
||||||
@ -202,24 +255,28 @@ func TestRestoreUnfinished(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
inProgress []*TaskMessage
|
inProgress []*TaskMessage
|
||||||
enqueued []*TaskMessage
|
enqueued []*TaskMessage
|
||||||
|
want int64
|
||||||
wantInProgress []*TaskMessage
|
wantInProgress []*TaskMessage
|
||||||
wantEnqueued []*TaskMessage
|
wantEnqueued []*TaskMessage
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
inProgress: []*TaskMessage{t1, t2, t3},
|
inProgress: []*TaskMessage{t1, t2, t3},
|
||||||
enqueued: []*TaskMessage{},
|
enqueued: []*TaskMessage{},
|
||||||
|
want: 3,
|
||||||
wantInProgress: []*TaskMessage{},
|
wantInProgress: []*TaskMessage{},
|
||||||
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
inProgress: []*TaskMessage{},
|
inProgress: []*TaskMessage{},
|
||||||
enqueued: []*TaskMessage{t1, t2, t3},
|
enqueued: []*TaskMessage{t1, t2, t3},
|
||||||
|
want: 0,
|
||||||
wantInProgress: []*TaskMessage{},
|
wantInProgress: []*TaskMessage{},
|
||||||
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
inProgress: []*TaskMessage{t2, t3},
|
inProgress: []*TaskMessage{t2, t3},
|
||||||
enqueued: []*TaskMessage{t1},
|
enqueued: []*TaskMessage{t1},
|
||||||
|
want: 2,
|
||||||
wantInProgress: []*TaskMessage{},
|
wantInProgress: []*TaskMessage{},
|
||||||
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
||||||
},
|
},
|
||||||
@ -230,8 +287,10 @@ func TestRestoreUnfinished(t *testing.T) {
|
|||||||
seedInProgressQueue(t, r, tc.inProgress)
|
seedInProgressQueue(t, r, tc.inProgress)
|
||||||
seedDefaultQueue(t, r, tc.enqueued)
|
seedDefaultQueue(t, r, tc.enqueued)
|
||||||
|
|
||||||
if err := r.RestoreUnfinished(); err != nil {
|
got, err := r.RestoreUnfinished()
|
||||||
t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err)
|
|
||||||
|
if got != tc.want || err != nil {
|
||||||
|
t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,9 +39,8 @@ func (p *poller) start() {
|
|||||||
case <-p.done:
|
case <-p.done:
|
||||||
log.Println("[INFO] Poller done.")
|
log.Println("[INFO] Poller done.")
|
||||||
return
|
return
|
||||||
default:
|
case <-time.After(p.avgInterval):
|
||||||
p.exec()
|
p.exec()
|
||||||
time.Sleep(p.avgInterval)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
95
processor.go
95
processor.go
@ -5,6 +5,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
@ -25,7 +26,12 @@ type processor struct {
|
|||||||
sema chan struct{}
|
sema chan struct{}
|
||||||
|
|
||||||
// channel to communicate back to the long running "processor" goroutine.
|
// channel to communicate back to the long running "processor" goroutine.
|
||||||
|
// once is used to send value to the channel only once.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
once sync.Once
|
||||||
|
|
||||||
|
// abort channel is closed when the shutdown of the "processor" goroutine starts.
|
||||||
|
abort chan struct{}
|
||||||
|
|
||||||
// quit channel communicates to the in-flight worker goroutines to stop.
|
// quit channel communicates to the in-flight worker goroutines to stop.
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@ -35,18 +41,30 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
|||||||
return &processor{
|
return &processor{
|
||||||
rdb: r,
|
rdb: r,
|
||||||
handler: handler,
|
handler: handler,
|
||||||
dequeueTimeout: 5 * time.Second,
|
dequeueTimeout: 2 * time.Second,
|
||||||
sema: make(chan struct{}, numWorkers),
|
sema: make(chan struct{}, numWorkers),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
abort: make(chan struct{}),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: stops only the "processor" goroutine, does not stop workers.
|
||||||
|
// It's safe to call this method multiple times.
|
||||||
|
func (p *processor) stop() {
|
||||||
|
p.once.Do(func() {
|
||||||
|
log.Println("[INFO] Processor shutting down...")
|
||||||
|
// Unblock if processor is waiting for sema token.
|
||||||
|
close(p.abort)
|
||||||
|
// Signal the processor goroutine to stop processing tasks
|
||||||
|
// from the queue.
|
||||||
|
p.done <- struct{}{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: once terminated, processor cannot be re-started.
|
// NOTE: once terminated, processor cannot be re-started.
|
||||||
func (p *processor) terminate() {
|
func (p *processor) terminate() {
|
||||||
log.Println("[INFO] Processor shutting down...")
|
p.stop()
|
||||||
// Signal the processor goroutine to stop processing tasks from the queue.
|
|
||||||
p.done <- struct{}{}
|
|
||||||
|
|
||||||
// TODO(hibiken): Allow user to customize this timeout value.
|
// TODO(hibiken): Allow user to customize this timeout value.
|
||||||
const timeout = 8 * time.Second
|
const timeout = 8 * time.Second
|
||||||
@ -90,46 +108,63 @@ func (p *processor) exec() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.sema <- struct{}{} // acquire token
|
select {
|
||||||
go func() {
|
case <-p.abort:
|
||||||
defer func() { <-p.sema /* release token */ }()
|
// shutdown is starting, return immediately after requeuing the message.
|
||||||
|
p.requeue(msg)
|
||||||
resCh := make(chan error, 1)
|
return
|
||||||
task := &Task{Type: msg.Type, Payload: msg.Payload}
|
case p.sema <- struct{}{}: // acquire token
|
||||||
go func() {
|
go func() {
|
||||||
resCh <- perform(p.handler, task)
|
defer func() { <-p.sema /* release token */ }()
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
resCh := make(chan error, 1)
|
||||||
case <-p.quit:
|
task := &Task{Type: msg.Type, Payload: msg.Payload}
|
||||||
// time is up, quit this worker goroutine.
|
go func() {
|
||||||
return
|
resCh <- perform(p.handler, task)
|
||||||
case resErr := <-resCh:
|
}()
|
||||||
// Note: One of three things should happen.
|
|
||||||
// 1) Done -> Removes the message from InProgress
|
select {
|
||||||
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
case <-p.quit:
|
||||||
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
// time is up, quit this worker goroutine.
|
||||||
if resErr != nil {
|
log.Printf("[WARN] Terminating in-progress task %+v\n", msg)
|
||||||
if msg.Retried >= msg.Retry {
|
return
|
||||||
p.kill(msg, resErr.Error())
|
case resErr := <-resCh:
|
||||||
|
// Note: One of three things should happen.
|
||||||
|
// 1) Done -> Removes the message from InProgress
|
||||||
|
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||||
|
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||||
|
if resErr != nil {
|
||||||
|
if msg.Retried >= msg.Retry {
|
||||||
|
p.kill(msg, resErr.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.retry(msg, resErr.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.retry(msg, resErr.Error())
|
p.markAsDone(msg)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.markAsDone(msg)
|
}()
|
||||||
return
|
}
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore moves all tasks from "in-progress" back to queue
|
// restore moves all tasks from "in-progress" back to queue
|
||||||
// to restore all unfinished tasks.
|
// to restore all unfinished tasks.
|
||||||
func (p *processor) restore() {
|
func (p *processor) restore() {
|
||||||
err := p.rdb.RestoreUnfinished()
|
n, err := p.rdb.RestoreUnfinished()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
|
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
|
||||||
}
|
}
|
||||||
|
if n > 0 {
|
||||||
|
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *processor) requeue(msg *rdb.TaskMessage) {
|
||||||
|
err := p.rdb.Requeue(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) markAsDone(msg *rdb.TaskMessage) {
|
func (p *processor) markAsDone(msg *rdb.TaskMessage) {
|
||||||
|
Loading…
Reference in New Issue
Block a user