From 67f381269ab692b87113f271407953c87b320442 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 9 Jan 2020 06:21:43 -0800 Subject: [PATCH] Maintain a set of queue names in redis set --- internal/base/base.go | 1 + internal/rdb/rdb.go | 7 ++++++- internal/rdb/rdb_test.go | 24 +++++++++++++++++------- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index d14cc40..2fae65c 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -19,6 +19,7 @@ const ( processedPrefix = "asynq:processed:" // STRING - asynq:processed: failurePrefix = "asynq:failure:" // STRING - asynq:failure: QueuePrefix = "asynq:queues:" // LIST - asynq:queues: + AllQueues = "asynq:queues" // SET DefaultQueue = QueuePrefix + DefaultQueueName // LIST ScheduledQueue = "asynq:scheduled" // ZSET RetryQueue = "asynq:retry" // ZSET diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 39bbd1d..f2db220 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -48,7 +48,12 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { return err } key := base.QueueKey(msg.Queue) - return r.client.LPush(key, string(bytes)).Err() + script := redis.NewScript(` + redis.call("LPUSH", KEYS[1], ARGV[1]) + redis.call("SADD", KEYS[2], KEYS[1]) + return 1 + `) + return script.Run(r.client, []string{key, base.AllQueues}, string(bytes)).Err() } // Dequeue queries given queues in order and pops a task message if there diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index bb29aef..4e34785 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -29,12 +29,18 @@ func setup(t *testing.T) *RDB { func TestEnqueue(t *testing.T) { r := setup(t) + t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) + t2 := h.NewTaskMessage("generate_csv", map[string]interface{}{}) + t2.Queue = "csv" + t3 := h.NewTaskMessage("sync", nil) + t3.Queue = "low" + tests := []struct { msg *base.TaskMessage }{ - {h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})}, - {h.NewTaskMessage("generate_csv", map[string]interface{}{})}, - {h.NewTaskMessage("sync", nil)}, + {t1}, + {t2}, + {t3}, } for _, tc := range tests { @@ -42,17 +48,21 @@ func TestEnqueue(t *testing.T) { err := r.Enqueue(tc.msg) if err != nil { - t.Errorf("(*RDB).Enqueue = %v, want nil", err) - continue + t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err) } - gotEnqueued := h.GetEnqueuedMessages(t, r.client) + + qkey := base.QueueKey(tc.msg.Queue) + gotEnqueued := h.GetEnqueuedMessages(t, r.client, tc.msg.Queue) if len(gotEnqueued) != 1 { - t.Errorf("%q has length %d, want 1", base.DefaultQueue, len(gotEnqueued)) + t.Errorf("%q has length %d, want 1", qkey, len(gotEnqueued)) continue } if diff := cmp.Diff(tc.msg, gotEnqueued[0]); diff != "" { t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) } + if !r.client.SIsMember(base.AllQueues, qkey).Val() { + t.Errorf("%q is not a member of SET %q", qkey, base.AllQueues) + } } }