From ed69667e86ef9df4da31b7d7c91482765b363f45 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 6 Mar 2022 06:04:56 -0800 Subject: [PATCH] Update ForwardIfReady test with group --- internal/asynqtest/asynqtest.go | 2 ++ internal/rdb/rdb.go | 2 +- internal/rdb/rdb_test.go | 13 +++++++------ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index ce440e5..259c82b 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -316,6 +316,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, "msg": encoded, "state": state.String(), "unique_key": msg.UniqueKey, + "group": msg.GroupKey, } if err := c.HSet(context.Background(), key, data).Err(); err != nil { tb.Fatal(err) @@ -344,6 +345,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, "msg": encoded, "state": state.String(), "unique_key": msg.UniqueKey, + "group": msg.GroupKey, } if err := c.HSet(context.Background(), key, data).Err(); err != nil { tb.Fatal(err) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 73ad529..7b9fab6 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -925,7 +925,7 @@ local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 10 for _, id in ipairs(ids) do local taskKey = ARGV[2] .. id local group = redis.call("HGET", taskKey, "group") - if group then + if group and group ~= '' then redis.call("ZADD", ARGV[4] .. group, ARGV[1], id) redis.call("ZREM", KEYS[1], id) redis.call("HSET", taskKey, diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index a1df964..9444961 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2180,12 +2180,18 @@ func TestForwardIfReadyWithGroup(t *testing.T) { now := time.Now() r.SetClock(timeutil.NewSimulatedClock(now)) - ctx := context.Background() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("generate_csv", nil) t3 := h.NewTaskMessage("gen_thumbnail", nil) t4 := h.NewTaskMessageWithQueue("important_task", nil, "critical") t5 := h.NewTaskMessageWithQueue("minor_task", nil, "low") + // Set group keys for the tasks. + t1.GroupKey = "notifications" + t2.GroupKey = "csv" + t4.GroupKey = "critical_task_group" + t5.GroupKey = "minor_task_group" + + ctx := context.Background() secondAgo := now.Add(-time.Second) tests := []struct { @@ -2269,11 +2275,6 @@ func TestForwardIfReadyWithGroup(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) - // Set "group" field under the task key. - r.client.HSet(ctx, base.TaskKey(t1.Queue, t1.ID), "group", "notifications") - r.client.HSet(ctx, base.TaskKey(t2.Queue, t2.ID), "group", "csv") - r.client.HSet(ctx, base.TaskKey(t4.Queue, t4.ID), "group", "critical_task_group") - r.client.HSet(ctx, base.TaskKey(t5.Queue, t5.ID), "group", "minor_task_group") err := r.ForwardIfReady(tc.qnames...) if err != nil {