diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 3606fb1..81fbd82 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -9,6 +9,7 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "time" "github.com/go-redis/redis/v7" @@ -470,6 +471,44 @@ func (r *RDB) forward(src string) (int, error) { return cast.ToInt(res), nil } +// KEYS[1] -> asynq:deadlines +// KEYS[2] -> asynq:in_progress +// ARGV[1] -> max deadline score in unix time +// ARGV[2] -> queue prefix +/* +var requeueDeadlineExceededCmd = redis.NewScript(` +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + local qkey = ARGV[2] .. decoded["Queue"] + redis.call("LPUSH", qkey, msg) + redis.call("ZREM", KEYS[1], msg) + redis.call("LREM", KEYS[2], 0, msg) +end +return table.getn(msgs)`) +*/ + +// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. +func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { + var msgs []*base.TaskMessage + opt := &redis.ZRangeBy{ + Min: "-inf", + Max: strconv.FormatInt(deadline.Unix(), 10), + } + res, err := r.client.ZRangeByScore(base.KeyDeadlines, opt).Result() + if err != nil { + return nil, err + } + for _, s := range res { + msg, err := base.DecodeMessage(s) + if err != nil { + return nil, err + } + msgs = append(msgs, msg) + } + return msgs, nil +} + // KEYS[1] -> asynq:servers: // KEYS[2] -> asynq:servers // KEYS[3] -> asynq:workers @@ -517,7 +556,7 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo // KEYS[2] -> asynq:servers: // KEYS[3] -> asynq:workers // KEYS[4] -> asynq:workers -var clearProcessInfoCmd = redis.NewScript(` +var clearServerStateCmd = redis.NewScript(` redis.call("ZREM", KEYS[1], KEYS[2]) redis.call("DEL", KEYS[2]) redis.call("ZREM", KEYS[3], KEYS[4]) @@ -528,7 +567,7 @@ return redis.status_reply("OK")`) func (r *RDB) ClearServerState(host string, pid int, serverID string) error { skey := base.ServerInfoKey(host, pid, serverID) wkey := base.WorkersKey(host, pid, serverID) - return clearProcessInfoCmd.Run(r.client, + return clearServerStateCmd.Run(r.client, []string{base.AllServers, skey, base.AllWorkers, wkey}).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 713b48c..77e88c0 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1121,6 +1121,226 @@ func TestCheckAndEnqueue(t *testing.T) { } } +func TestListDeadlineExceeded(t *testing.T) { + t1 := h.NewTaskMessage("task1", nil) + t2 := h.NewTaskMessage("task2", nil) + t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") + + now := time.Now() + oneHourFromNow := now.Add(1 * time.Hour) + fiveMinutesFromNow := now.Add(5 * time.Minute) + fiveMinutesAgo := now.Add(-5 * time.Minute) + oneHourAgo := now.Add(-1 * time.Hour) + + tests := []struct { + desc string + deadlines []h.ZSetEntry + t time.Time + want []*base.TaskMessage + }{ + { + desc: "with one task in-progress", + deadlines: []h.ZSetEntry{ + {Msg: t1, Score: float64(fiveMinutesAgo.Unix())}, + }, + t: time.Now(), + want: []*base.TaskMessage{t1}, + }, + { + desc: "with multiple tasks in-progress, and one expired", + deadlines: []h.ZSetEntry{ + {Msg: t1, Score: float64(oneHourAgo.Unix())}, + {Msg: t2, Score: float64(fiveMinutesFromNow.Unix())}, + {Msg: t3, Score: float64(oneHourFromNow.Unix())}, + }, + t: time.Now(), + want: []*base.TaskMessage{t1}, + }, + { + desc: "with multiple expired tasks in-progress", + deadlines: []h.ZSetEntry{ + {Msg: t1, Score: float64(oneHourAgo.Unix())}, + {Msg: t2, Score: float64(fiveMinutesAgo.Unix())}, + {Msg: t3, Score: float64(oneHourFromNow.Unix())}, + }, + t: time.Now(), + want: []*base.TaskMessage{t1, t2}, + }, + { + desc: "with empty in-progress queue", + deadlines: []h.ZSetEntry{}, + t: time.Now(), + want: []*base.TaskMessage{}, + }, + } + + r := setup(t) + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedDeadlines(t, r.client, tc.deadlines) + + got, err := r.ListDeadlineExceeded(tc.t) + if err != nil { + t.Errorf("%s; ListDeadlineExceeded(%v) returned error: %v", tc.desc, tc.t, err) + continue + } + + if diff := cmp.Diff(tc.want, got, h.SortMsgOpt); diff != "" { + t.Errorf("%s; ListDeadlineExceeded(%v) returned %v, want %v;(-want,+got)\n%s", + tc.desc, tc.t, got, tc.want, diff) + } + } +} + +/* +func TestRequeueDeadlineExceeded(t *testing.T) { + t1 := h.NewTaskMessage("task1", nil) + t2 := h.NewTaskMessage("task2", nil) + t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") + + now := time.Now() + oneHourFromNow := now.Add(1 * time.Hour) + fiveMinutesFromNow := now.Add(5 * time.Minute) + fiveMinutesAgo := now.Add(-5 * time.Minute) + oneHourAgo := now.Add(-1 * time.Hour) + + tests := []struct { + desc string + inProgress []*base.TaskMessage + deadlines []h.ZSetEntry + enqueued map[string][]*base.TaskMessage + wantN int + wantInProgress []*base.TaskMessage + wantDeadlines []h.ZSetEntry + wantEnqueued map[string][]*base.TaskMessage + }{ + { + desc: "with one task in-progress", + inProgress: []*base.TaskMessage{t1}, + deadlines: []h.ZSetEntry{ + {Msg: t1, Score: float64(fiveMinutesAgo.Unix())}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantN: 1, + wantInProgress: []*base.TaskMessage{}, + wantDeadlines: []h.ZSetEntry{}, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + }, + { + desc: "with multiple tasks in-progress, and one expired", + inProgress: []*base.TaskMessage{t1, t2, t3}, + deadlines: []h.ZSetEntry{ + {Msg: t1, Score: float64(oneHourAgo.Unix())}, + {Msg: t2, Score: float64(fiveMinutesFromNow.Unix())}, + {Msg: t3, Score: float64(oneHourFromNow.Unix())}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantN: 1, + wantInProgress: []*base.TaskMessage{t2, t3}, + wantDeadlines: []h.ZSetEntry{ + {Msg: t2, Score: float64(fiveMinutesFromNow.Unix())}, + {Msg: t3, Score: float64(oneHourFromNow.Unix())}, + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + }, + { + desc: "with multiple expired tasks in-progress", + inProgress: []*base.TaskMessage{t1, t2, t3}, + deadlines: []h.ZSetEntry{ + {Msg: t1, Score: float64(oneHourAgo.Unix())}, + {Msg: t2, Score: float64(fiveMinutesAgo.Unix())}, + {Msg: t3, Score: float64(oneHourFromNow.Unix())}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantN: 2, + wantInProgress: []*base.TaskMessage{t3}, + wantDeadlines: []h.ZSetEntry{ + {Msg: t3, Score: float64(oneHourFromNow.Unix())}, + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1, t2}, + }, + }, + { + desc: "with empty in-progress queue", + inProgress: []*base.TaskMessage{}, + deadlines: []h.ZSetEntry{}, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantN: 0, + wantInProgress: []*base.TaskMessage{}, + wantDeadlines: []h.ZSetEntry{}, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + }, + { + desc: "push back task to appropriate queue", + inProgress: []*base.TaskMessage{t3}, + deadlines: []h.ZSetEntry{ + {Msg: t3, Score: float64(fiveMinutesAgo.Unix())}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantN: 1, + wantInProgress: []*base.TaskMessage{}, + wantDeadlines: []h.ZSetEntry{}, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t3}, + }, + }, + } + + r := setup(t) + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedInProgressQueue(t, r.client, tc.inProgress) + h.SeedDeadlines(t, r.client, tc.deadlines) + for queue, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, queue) + } + + gotN, err := r.RequeueDeadlineExceeded() + if err != nil { + t.Errorf("%s; RequeueDeadlineExceeded() returned error: %v", tc.desc, err) + continue + } + if gotN != tc.wantN { + t.Errorf("%s; RequeueDeadlineExeeded() == %d want %d", tc.desc, gotN, tc.wantN) + } + + gotInProgress := h.GetInProgressMessages(t, r.client) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.InProgressQueue, diff) + } + gotDeadlines := h.GetDeadlinesEntries(t, r.client) + if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.KeyDeadlines, diff) + } + for queue, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want,+got):\n%s", tc.desc, base.QueueKey(queue), diff) + } + } + } +} +*/ + func TestWriteServerState(t *testing.T) { r := setup(t)