2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update ForwardIfReady test with group

This commit is contained in:
Ken Hibino 2022-03-06 06:04:56 -08:00
parent 4e8885276c
commit ed69667e86
3 changed files with 10 additions and 7 deletions

View File

@ -316,6 +316,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
"msg": encoded, "msg": encoded,
"state": state.String(), "state": state.String(),
"unique_key": msg.UniqueKey, "unique_key": msg.UniqueKey,
"group": msg.GroupKey,
} }
if err := c.HSet(context.Background(), key, data).Err(); err != nil { if err := c.HSet(context.Background(), key, data).Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)
@ -344,6 +345,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
"msg": encoded, "msg": encoded,
"state": state.String(), "state": state.String(),
"unique_key": msg.UniqueKey, "unique_key": msg.UniqueKey,
"group": msg.GroupKey,
} }
if err := c.HSet(context.Background(), key, data).Err(); err != nil { if err := c.HSet(context.Background(), key, data).Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)

View File

@ -925,7 +925,7 @@ local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 10
for _, id in ipairs(ids) do for _, id in ipairs(ids) do
local taskKey = ARGV[2] .. id local taskKey = ARGV[2] .. id
local group = redis.call("HGET", taskKey, "group") local group = redis.call("HGET", taskKey, "group")
if group then if group and group ~= '' then
redis.call("ZADD", ARGV[4] .. group, ARGV[1], id) redis.call("ZADD", ARGV[4] .. group, ARGV[1], id)
redis.call("ZREM", KEYS[1], id) redis.call("ZREM", KEYS[1], id)
redis.call("HSET", taskKey, redis.call("HSET", taskKey,

View File

@ -2180,12 +2180,18 @@ func TestForwardIfReadyWithGroup(t *testing.T) {
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now)) r.SetClock(timeutil.NewSimulatedClock(now))
ctx := context.Background()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("generate_csv", nil) t2 := h.NewTaskMessage("generate_csv", nil)
t3 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessage("gen_thumbnail", nil)
t4 := h.NewTaskMessageWithQueue("important_task", nil, "critical") t4 := h.NewTaskMessageWithQueue("important_task", nil, "critical")
t5 := h.NewTaskMessageWithQueue("minor_task", nil, "low") t5 := h.NewTaskMessageWithQueue("minor_task", nil, "low")
// Set group keys for the tasks.
t1.GroupKey = "notifications"
t2.GroupKey = "csv"
t4.GroupKey = "critical_task_group"
t5.GroupKey = "minor_task_group"
ctx := context.Background()
secondAgo := now.Add(-time.Second) secondAgo := now.Add(-time.Second)
tests := []struct { tests := []struct {
@ -2269,11 +2275,6 @@ func TestForwardIfReadyWithGroup(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
// Set "group" field under the task key.
r.client.HSet(ctx, base.TaskKey(t1.Queue, t1.ID), "group", "notifications")
r.client.HSet(ctx, base.TaskKey(t2.Queue, t2.ID), "group", "csv")
r.client.HSet(ctx, base.TaskKey(t4.Queue, t4.ID), "group", "critical_task_group")
r.client.HSet(ctx, base.TaskKey(t5.Queue, t5.ID), "group", "minor_task_group")
err := r.ForwardIfReady(tc.qnames...) err := r.ForwardIfReady(tc.qnames...)
if err != nil { if err != nil {