From 911e600c41b8318c690f887ec29c9fd5ed5a5d32 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 16 Dec 2019 20:19:05 -0800 Subject: [PATCH 1/9] Terminate background upon receiving SIGTERM or SIGINT --- background.go | 5 +++-- internal/rdb/rdb.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/background.go b/background.go index 42731fa..8cc03ea 100644 --- a/background.go +++ b/background.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "sync" + "syscall" "time" "github.com/hibiken/asynq/internal/rdb" @@ -74,9 +75,9 @@ func (bg *Background) Run(handler Handler) { bg.start(handler) defer bg.stop() - // Wait for a signal to exit. + // Wait for a signal to terminate. sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt, os.Kill) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) <-sigs fmt.Println() log.Println("[INFO] Starting graceful shutdown...") diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 859e0b7..c8f9e5f 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -144,7 +144,7 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", modified, err) } - // KEYS[1] -> asynq:in_progress + // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:retry // ARGV[1] -> TaskMessage value to remove from InProgress queue // ARGV[2] -> TaskMessage value to add to Retry queue From 3e30c5916b91434155dca060503b4d68c03705f8 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 17 Dec 2019 05:32:31 -0800 Subject: [PATCH 2/9] Trap and handle TSTP signal --- background.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/background.go b/background.go index 8cc03ea..9649a2a 100644 --- a/background.go +++ b/background.go @@ -77,8 +77,16 @@ func (bg *Background) Run(handler Handler) { // Wait for a signal to terminate. sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) - <-sigs + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP) + for { + sig := <-sigs + fmt.Printf("[DEBUG] Got %v\n", sig) // TODO: Remove this + if sig == syscall.SIGTSTP { + fmt.Println("[DEBUG] Stop processing tasks") + continue + } + break + } fmt.Println() log.Println("[INFO] Starting graceful shutdown...") } From 24dd78b31c2be16c2e2c2619cdfd58ce721220bc Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 17 Dec 2019 06:18:22 -0800 Subject: [PATCH 3/9] Stop processing more tasks from the queue once TSTP signal is received --- background.go | 3 +-- processor.go | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/background.go b/background.go index 9649a2a..4571af1 100644 --- a/background.go +++ b/background.go @@ -80,9 +80,8 @@ func (bg *Background) Run(handler Handler) { signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP) for { sig := <-sigs - fmt.Printf("[DEBUG] Got %v\n", sig) // TODO: Remove this if sig == syscall.SIGTSTP { - fmt.Println("[DEBUG] Stop processing tasks") + bg.processor.stop() continue } break diff --git a/processor.go b/processor.go index 8653401..d391257 100644 --- a/processor.go +++ b/processor.go @@ -5,6 +5,7 @@ import ( "log" "math" "math/rand" + "sync" "time" "github.com/hibiken/asynq/internal/rdb" @@ -25,7 +26,9 @@ type processor struct { sema chan struct{} // channel to communicate back to the long running "processor" goroutine. + // once is used to send value to the channel only once. done chan struct{} + once sync.Once // quit channel communicates to the in-flight worker goroutines to stop. quit chan struct{} @@ -42,11 +45,20 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { } } +// Note: stops only the "processor" goroutine, does not stop workers. +// It's safe to call this method multiple times. +func (p *processor) stop() { + p.once.Do(func() { + log.Println("[INFO] Processor shutting down...") + // Signal the processor goroutine to stop processing tasks + // from the queue. + p.done <- struct{}{} + }) +} + // NOTE: once terminated, processor cannot be re-started. func (p *processor) terminate() { - log.Println("[INFO] Processor shutting down...") - // Signal the processor goroutine to stop processing tasks from the queue. - p.done <- struct{}{} + p.stop() // TODO(hibiken): Allow user to customize this timeout value. const timeout = 8 * time.Second From 5ddba8ca981e5e8ddf361fca31c3bbf3cabe1eef Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 17 Dec 2019 20:07:17 -0800 Subject: [PATCH 4/9] Unblock processor shutdown process if processor is waiting for semaphore token --- internal/rdb/rdb.go | 19 ++++++++ internal/rdb/rdb_test.go | 53 +++++++++++++++++++++ processor.go | 99 ++++++++++++++++++++++++++-------------- 3 files changed, 137 insertions(+), 34 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c8f9e5f..e44c051 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -116,6 +116,25 @@ func (r *RDB) Done(msg *TaskMessage) error { return nil } +// Requeue moves the task from in-progress queue to the default +// queue. +func (r *RDB) Requeue(msg *TaskMessage) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + } + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:queues:default + // ARGV[1] -> taskMessage value + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + redis.call("RPUSH", KEYS[2], ARGV[1]) + return redis.status_reply("OK") + `) + _, err = script.Run(r.client, []string{inProgressQ, defaultQ}, string(bytes)).Result() + return err +} + // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { bytes, err := json.Marshal(msg) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index b6f7089..951523a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -112,6 +112,59 @@ func TestDone(t *testing.T) { } } +func TestRequeue(t *testing.T) { + r := setup(t) + t1 := newTaskMessage("send_email", nil) + t2 := newTaskMessage("export_csv", nil) + + tests := []struct { + enqueued []*TaskMessage // initial state of the default queue + inProgress []*TaskMessage // initial state of the in-progress list + target *TaskMessage // task to requeue + wantEnqueued []*TaskMessage // final state of the default queue + wantInProgress []*TaskMessage // final state of the in-progress list + }{ + { + enqueued: []*TaskMessage{}, + inProgress: []*TaskMessage{t1, t2}, + target: t1, + wantEnqueued: []*TaskMessage{t1}, + wantInProgress: []*TaskMessage{t2}, + }, + { + enqueued: []*TaskMessage{t1}, + inProgress: []*TaskMessage{t2}, + target: t2, + wantEnqueued: []*TaskMessage{t1, t2}, + wantInProgress: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + flushDB(t, r) // clean up db before each test case + seedDefaultQueue(t, r, tc.enqueued) + seedInProgressQueue(t, r, tc.inProgress) + + err := r.Requeue(tc.target) + if err != nil { + t.Errorf("(*RDB).Requeue(task) = %v, want nil", err) + continue + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", defaultQ, diff) + } + + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", inProgressQ, diff) + } + } +} + func TestKill(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", nil) diff --git a/processor.go b/processor.go index d391257..9d88ffd 100644 --- a/processor.go +++ b/processor.go @@ -21,14 +21,19 @@ type processor struct { // in case of a program shutdown or additon of a new queue. dequeueTimeout time.Duration + // running represents the state of the "processor" goroutine + mu sync.Mutex + running bool + // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} // channel to communicate back to the long running "processor" goroutine. - // once is used to send value to the channel only once. done chan struct{} - once sync.Once + + // shutdown channel is closed when the shutdown of the "processor" goroutine starts. + shutdown chan struct{} // quit channel communicates to the in-flight worker goroutines to stop. quit chan struct{} @@ -38,9 +43,10 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { return &processor{ rdb: r, handler: handler, - dequeueTimeout: 5 * time.Second, + dequeueTimeout: 2 * time.Second, sema: make(chan struct{}, numWorkers), done: make(chan struct{}), + shutdown: make(chan struct{}), quit: make(chan struct{}), } } @@ -48,12 +54,18 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { // Note: stops only the "processor" goroutine, does not stop workers. // It's safe to call this method multiple times. func (p *processor) stop() { - p.once.Do(func() { - log.Println("[INFO] Processor shutting down...") - // Signal the processor goroutine to stop processing tasks - // from the queue. - p.done <- struct{}{} - }) + p.mu.Lock() + defer p.mu.Unlock() + if !p.running { + return + } + log.Println("[INFO] Processor shutting down...") + // Unblock if processor is waiting for sema token. + close(p.shutdown) + // Signal the processor goroutine to stop processing tasks + // from the queue. + p.done <- struct{}{} + p.running = false } // NOTE: once terminated, processor cannot be re-started. @@ -73,6 +85,11 @@ func (p *processor) terminate() { } func (p *processor) start() { + p.mu.Lock() + defer p.mu.Unlock() + if p.running { + return + } // NOTE: The call to "restore" needs to complete before starting // the processor goroutine. p.restore() @@ -87,6 +104,7 @@ func (p *processor) start() { } } }() + p.running = true } // exec pulls a task out of the queue and starts a worker goroutine to @@ -102,37 +120,43 @@ func (p *processor) exec() { return } - p.sema <- struct{}{} // acquire token - go func() { - defer func() { <-p.sema /* release token */ }() - - resCh := make(chan error, 1) - task := &Task{Type: msg.Type, Payload: msg.Payload} + select { + case <-p.shutdown: + // shutdown is starting, return immediately after requeuing the message. + p.requeue(msg) + return + case p.sema <- struct{}{}: // acquire token go func() { - resCh <- perform(p.handler, task) - }() + defer func() { <-p.sema /* release token */ }() - select { - case <-p.quit: - // time is up, quit this worker goroutine. - return - case resErr := <-resCh: - // Note: One of three things should happen. - // 1) Done -> Removes the message from InProgress - // 2) Retry -> Removes the message from InProgress & Adds the message to Retry - // 3) Kill -> Removes the message from InProgress & Adds the message to Dead - if resErr != nil { - if msg.Retried >= msg.Retry { - p.kill(msg, resErr.Error()) + resCh := make(chan error, 1) + task := &Task{Type: msg.Type, Payload: msg.Payload} + go func() { + resCh <- perform(p.handler, task) + }() + + select { + case <-p.quit: + // time is up, quit this worker goroutine. + return + case resErr := <-resCh: + // Note: One of three things should happen. + // 1) Done -> Removes the message from InProgress + // 2) Retry -> Removes the message from InProgress & Adds the message to Retry + // 3) Kill -> Removes the message from InProgress & Adds the message to Dead + if resErr != nil { + if msg.Retried >= msg.Retry { + p.kill(msg, resErr.Error()) + return + } + p.retry(msg, resErr.Error()) return } - p.retry(msg, resErr.Error()) + p.markAsDone(msg) return } - p.markAsDone(msg) - return - } - }() + }() + } } // restore moves all tasks from "in-progress" back to queue @@ -144,6 +168,13 @@ func (p *processor) restore() { } } +func (p *processor) requeue(msg *rdb.TaskMessage) { + err := p.rdb.Requeue(msg) + if err != nil { + log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err) + } +} + func (p *processor) markAsDone(msg *rdb.TaskMessage) { err := p.rdb.Done(msg) if err != nil { From c40e779fdb4387bc0351f9bfe98dc271a3ee1937 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 17 Dec 2019 20:13:41 -0800 Subject: [PATCH 5/9] Modify poller to wait for Time.After channel instead of time.Sleep --- poller.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/poller.go b/poller.go index e64ceb8..9fdd8b6 100644 --- a/poller.go +++ b/poller.go @@ -39,9 +39,8 @@ func (p *poller) start() { case <-p.done: log.Println("[INFO] Poller done.") return - default: + case <-time.After(p.avgInterval): p.exec() - time.Sleep(p.avgInterval) } } }() From 69b46a7f0d07d09def6277ec0c6e788c78471bc0 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 17 Dec 2019 20:34:56 -0800 Subject: [PATCH 6/9] Use sync.Once --- internal/rdb/rdb.go | 1 + processor.go | 32 ++++++++++---------------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index e44c051..b75922c 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -123,6 +123,7 @@ func (r *RDB) Requeue(msg *TaskMessage) error { if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + // Note: Use RPUSH to push to the head of the queue. // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:queues:default // ARGV[1] -> taskMessage value diff --git a/processor.go b/processor.go index 9d88ffd..f0650fa 100644 --- a/processor.go +++ b/processor.go @@ -21,16 +21,14 @@ type processor struct { // in case of a program shutdown or additon of a new queue. dequeueTimeout time.Duration - // running represents the state of the "processor" goroutine - mu sync.Mutex - running bool - // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} // channel to communicate back to the long running "processor" goroutine. + // once is used to send value to the channel only once. done chan struct{} + once sync.Once // shutdown channel is closed when the shutdown of the "processor" goroutine starts. shutdown chan struct{} @@ -54,18 +52,14 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { // Note: stops only the "processor" goroutine, does not stop workers. // It's safe to call this method multiple times. func (p *processor) stop() { - p.mu.Lock() - defer p.mu.Unlock() - if !p.running { - return - } - log.Println("[INFO] Processor shutting down...") - // Unblock if processor is waiting for sema token. - close(p.shutdown) - // Signal the processor goroutine to stop processing tasks - // from the queue. - p.done <- struct{}{} - p.running = false + p.once.Do(func() { + log.Println("[INFO] Processor shutting down...") + // Unblock if processor is waiting for sema token. + close(p.shutdown) + // Signal the processor goroutine to stop processing tasks + // from the queue. + p.done <- struct{}{} + }) } // NOTE: once terminated, processor cannot be re-started. @@ -85,11 +79,6 @@ func (p *processor) terminate() { } func (p *processor) start() { - p.mu.Lock() - defer p.mu.Unlock() - if p.running { - return - } // NOTE: The call to "restore" needs to complete before starting // the processor goroutine. p.restore() @@ -104,7 +93,6 @@ func (p *processor) start() { } } }() - p.running = true } // exec pulls a task out of the queue and starts a worker goroutine to From b2bc0ef91ce2c0cc676b9d82cc4e0104a84fdde3 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 17 Dec 2019 20:37:54 -0800 Subject: [PATCH 7/9] [ci skip] Update todos --- asynq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynq.go b/asynq.go index 9f45ddd..d652160 100644 --- a/asynq.go +++ b/asynq.go @@ -4,9 +4,9 @@ import "github.com/go-redis/redis/v7" /* TODOs: -- [P0] Proper OS Signal handling - TTIN to stop the processor - [P0] asynqmon kill , asynqmon killall - [P0] Better Payload API - Assigning int or any number type to Payload will be converted to float64 in handler +- [P0] Show elapsed time for InProgress tasks (asynqmon ls inprogress) - [P0] Redis Memory Usage, Connection info in stats - [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment From 33e9da953d3d00b2c4c0c8a45955c26552094988 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 18 Dec 2019 18:55:08 -0800 Subject: [PATCH 8/9] Log warning and info messages when unfinished tasks get aborted --- internal/rdb/rdb.go | 16 ++++++++++++---- internal/rdb/rdb_test.go | 10 ++++++++-- processor.go | 6 +++++- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b75922c..1adf18a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -216,8 +216,9 @@ func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { return err } -// RestoreUnfinished moves all tasks from in-progress list to the queue. -func (r *RDB) RestoreUnfinished() error { +// RestoreUnfinished moves all tasks from in-progress list to the queue +// and reports the number of tasks restored. +func (r *RDB) RestoreUnfinished() (int64, error) { script := redis.NewScript(` local len = redis.call("LLEN", KEYS[1]) for i = len, 1, -1 do @@ -225,8 +226,15 @@ func (r *RDB) RestoreUnfinished() error { end return len `) - _, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() - return err + res, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil } // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 951523a..f5d3324 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -255,24 +255,28 @@ func TestRestoreUnfinished(t *testing.T) { tests := []struct { inProgress []*TaskMessage enqueued []*TaskMessage + want int64 wantInProgress []*TaskMessage wantEnqueued []*TaskMessage }{ { inProgress: []*TaskMessage{t1, t2, t3}, enqueued: []*TaskMessage{}, + want: 3, wantInProgress: []*TaskMessage{}, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { inProgress: []*TaskMessage{}, enqueued: []*TaskMessage{t1, t2, t3}, + want: 0, wantInProgress: []*TaskMessage{}, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { inProgress: []*TaskMessage{t2, t3}, enqueued: []*TaskMessage{t1}, + want: 2, wantInProgress: []*TaskMessage{}, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, @@ -283,8 +287,10 @@ func TestRestoreUnfinished(t *testing.T) { seedInProgressQueue(t, r, tc.inProgress) seedDefaultQueue(t, r, tc.enqueued) - if err := r.RestoreUnfinished(); err != nil { - t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err) + got, err := r.RestoreUnfinished() + + if got != tc.want || err != nil { + t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want) continue } diff --git a/processor.go b/processor.go index f0650fa..4e86282 100644 --- a/processor.go +++ b/processor.go @@ -126,6 +126,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. + log.Printf("[WARN] Terminating in-progress task %+v\n", msg) return case resErr := <-resCh: // Note: One of three things should happen. @@ -150,10 +151,13 @@ func (p *processor) exec() { // restore moves all tasks from "in-progress" back to queue // to restore all unfinished tasks. func (p *processor) restore() { - err := p.rdb.RestoreUnfinished() + n, err := p.rdb.RestoreUnfinished() if err != nil { log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) } + if n > 0 { + log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n) + } } func (p *processor) requeue(msg *rdb.TaskMessage) { From 8b98b6e5a04ef358c5d50d8c5fdbe1a39f6ca93c Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 18 Dec 2019 18:57:48 -0800 Subject: [PATCH 9/9] Rename channel name --- processor.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/processor.go b/processor.go index 4e86282..8dcee58 100644 --- a/processor.go +++ b/processor.go @@ -30,8 +30,8 @@ type processor struct { done chan struct{} once sync.Once - // shutdown channel is closed when the shutdown of the "processor" goroutine starts. - shutdown chan struct{} + // abort channel is closed when the shutdown of the "processor" goroutine starts. + abort chan struct{} // quit channel communicates to the in-flight worker goroutines to stop. quit chan struct{} @@ -44,7 +44,7 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { dequeueTimeout: 2 * time.Second, sema: make(chan struct{}, numWorkers), done: make(chan struct{}), - shutdown: make(chan struct{}), + abort: make(chan struct{}), quit: make(chan struct{}), } } @@ -55,7 +55,7 @@ func (p *processor) stop() { p.once.Do(func() { log.Println("[INFO] Processor shutting down...") // Unblock if processor is waiting for sema token. - close(p.shutdown) + close(p.abort) // Signal the processor goroutine to stop processing tasks // from the queue. p.done <- struct{}{} @@ -109,7 +109,7 @@ func (p *processor) exec() { } select { - case <-p.shutdown: + case <-p.abort: // shutdown is starting, return immediately after requeuing the message. p.requeue(msg) return