From 7f30fa2bb6819b9aef47cf77f6a0cd73e3338d5d Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 13 Jun 2020 06:09:54 -0700 Subject: [PATCH] Fix requeue logic in processor --- internal/base/base.go | 1 - internal/rdb/rdb.go | 26 --------- internal/rdb/rdb_test.go | 92 ------------------------------- internal/testbroker/testbroker.go | 9 --- processor.go | 21 ++----- 5 files changed, 4 insertions(+), 145 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index f97a964..4733aa8 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -271,7 +271,6 @@ type Broker interface { ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Kill(msg *TaskMessage, errMsg string) error - RequeueAll() (int64, error) CheckAndEnqueue() error WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 5112242..af71be9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -374,32 +374,6 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() } -// KEYS[1] -> asynq:in_progress -// ARGV[1] -> queue prefix -var requeueAllCmd = redis.NewScript(` -local msgs = redis.call("LRANGE", KEYS[1], 0, -1) -for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - local qkey = ARGV[1] .. decoded["Queue"] - redis.call("RPUSH", qkey, msg) - redis.call("LREM", KEYS[1], 0, msg) -end -return table.getn(msgs)`) - -// RequeueAll moves all tasks from in-progress list to the queue -// and reports the number of tasks restored. -func (r *RDB) RequeueAll() (int64, error) { - res, err := requeueAllCmd.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result() - if err != nil { - return 0, err - } - n, ok := res.(int64) - if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) - } - return n, nil -} - // CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that // are ready to be processed. func (r *RDB) CheckAndEnqueue() (err error) { diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 417dd17..ca3389d 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -753,98 +753,6 @@ func TestKill(t *testing.T) { } } -func TestRequeueAll(t *testing.T) { - r := setup(t) - t1 := h.NewTaskMessage("send_email", nil) - t2 := h.NewTaskMessage("export_csv", nil) - t3 := h.NewTaskMessage("sync_stuff", nil) - t4 := h.NewTaskMessageWithQueue("important", nil, "critical") - t5 := h.NewTaskMessageWithQueue("minor", nil, "low") - - tests := []struct { - inProgress []*base.TaskMessage - enqueued map[string][]*base.TaskMessage - want int64 - wantInProgress []*base.TaskMessage - wantEnqueued map[string][]*base.TaskMessage - }{ - { - inProgress: []*base.TaskMessage{t1, t2, t3}, - enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {}, - }, - want: 3, - wantInProgress: []*base.TaskMessage{}, - wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1, t2, t3}, - }, - }, - { - inProgress: []*base.TaskMessage{}, - enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1, t2, t3}, - }, - want: 0, - wantInProgress: []*base.TaskMessage{}, - wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1, t2, t3}, - }, - }, - { - inProgress: []*base.TaskMessage{t2, t3}, - enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1}, - }, - want: 2, - wantInProgress: []*base.TaskMessage{}, - wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1, t2, t3}, - }, - }, - { - inProgress: []*base.TaskMessage{t2, t3, t4, t5}, - enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1}, - "critical": {}, - "low": {}, - }, - want: 4, - wantInProgress: []*base.TaskMessage{}, - wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1, t2, t3}, - "critical": {t4}, - "low": {t5}, - }, - }, - } - - for _, tc := range tests { - h.FlushDB(t, r.client) // clean up db before each test case - h.SeedInProgressQueue(t, r.client, tc.inProgress) - for qname, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, qname) - } - - got, err := r.RequeueAll() - if got != tc.want || err != nil { - t.Errorf("(*RDB).RequeueAll() = %v %v, want %v nil", got, err, tc.want) - continue - } - - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) - } - - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.QueueKey(qname), diff) - } - } - } -} - func TestCheckAndEnqueue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 9044e17..08e407f 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -123,15 +123,6 @@ func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error { return tb.real.Kill(msg, errMsg) } -func (tb *TestBroker) RequeueAll() (int64, error) { - tb.mu.Lock() - defer tb.mu.Unlock() - if tb.sleeping { - return 0, errRedisDown - } - return tb.real.RequeueAll() -} - func (tb *TestBroker) CheckAndEnqueue() error { tb.mu.Lock() defer tb.mu.Unlock() diff --git a/processor.go b/processor.go index 4b2f03b..5216a62 100644 --- a/processor.go +++ b/processor.go @@ -137,13 +137,9 @@ func (p *processor) terminate() { p.sema <- struct{}{} } p.logger.Info("All workers have finished") - p.restore() // move any unfinished tasks back to the queue. } func (p *processor) start(wg *sync.WaitGroup) { - // NOTE: The call to "restore" needs to complete before starting - // the processor goroutine. - p.restore() wg.Add(1) go func() { defer wg.Done() @@ -206,8 +202,9 @@ func (p *processor) exec() { select { case <-p.quit: - // time is up, quit this worker goroutine. + // time is up, push the message back to queue and quit this worker goroutine. p.logger.Warnf("Quitting worker. task id=%s", msg.ID) + p.requeue(msg) return case resErr := <-resCh: // Note: One of three things should happen. @@ -231,22 +228,12 @@ func (p *processor) exec() { } } -// restore moves all tasks from "in-progress" back to queue -// to restore all unfinished tasks. -func (p *processor) restore() { - n, err := p.broker.RequeueAll() - if err != nil { - p.logger.Errorf("Could not restore unfinished tasks: %v", err) - } - if n > 0 { - p.logger.Infof("Restored %d unfinished tasks back to queue", n) - } -} - func (p *processor) requeue(msg *base.TaskMessage) { err := p.broker.Requeue(msg) if err != nil { p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err) + } else { + p.logger.Infof("Pushed task id=%s back to queue", msg.ID) } }