diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index ec10970..3abecf4 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -159,16 +159,10 @@ func FlushDB(tb testing.TB, r *redis.Client) { } // SeedEnqueuedQueue initializes the specified queue with the given messages. -// -// If queue name option is not passed, it defaults to the default queue. -func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, queueOpt ...string) { +func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, qname string) { tb.Helper() - queue := base.DefaultQueue - if len(queueOpt) > 0 { - queue = base.QueueKey(queueOpt[0]) - } - r.SAdd(base.AllQueues, queue) - seedRedisList(tb, r, queue, msgs) + r.SAdd(base.AllQueues, qname) + seedRedisList(tb, r, base.QueueKey(qname), msgs) } // SeedAllEnqueuedQueues initializes all of the specified queues with the given messages. @@ -180,30 +174,35 @@ func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][ } } +// TODO: need to scope to a queue // SeedInProgressQueue initializes the in-progress queue with the given messages. func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { tb.Helper() seedRedisList(tb, r, base.InProgressQueue, msgs) } +// TODO: need to scope to a queue // SeedScheduledQueue initializes the scheduled queue with the given messages. func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z) { tb.Helper() seedRedisZSet(tb, r, base.ScheduledQueue, entries) } +// TODO: need to scope to a queue // SeedRetryQueue initializes the retry queue with the given messages. func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z) { tb.Helper() seedRedisZSet(tb, r, base.RetryQueue, entries) } +// TODO: need to scope to a queue // SeedDeadQueue initializes the dead queue with the given messages. func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z) { tb.Helper() seedRedisZSet(tb, r, base.DeadQueue, entries) } +// TODO: need to scope to a queue // SeedDeadlines initializes the deadlines set with the given entries. func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z) { tb.Helper() @@ -228,64 +227,58 @@ func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) { } } -// GetEnqueuedMessages returns all task messages in the specified queue. -// -// If queue name option is not passed, it defaults to the default queue. -func GetEnqueuedMessages(tb testing.TB, r *redis.Client, queueOpt ...string) []*base.TaskMessage { +// GetEnqueuedMessages returns all enqueued messages in the given queue. +func GetEnqueuedMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { tb.Helper() - queue := base.DefaultQueue - if len(queueOpt) > 0 { - queue = base.QueueKey(queueOpt[0]) - } - return getListMessages(tb, r, queue) + return getListMessages(tb, r, base.QueueKey(qname)) } -// GetInProgressMessages returns all task messages in the in-progress queue. -func GetInProgressMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { +// GetInProgressMessages returns all in-progress messages in the given queue. +func GetInProgressMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { tb.Helper() - return getListMessages(tb, r, base.InProgressQueue) + return getListMessages(tb, r, base.InProgressKey(qname)) } -// GetScheduledMessages returns all task messages in the scheduled queue. -func GetScheduledMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { +// GetScheduledMessages returns all scheduled task messages in the given queue. +func GetScheduledMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.ScheduledQueue) + return getZSetMessages(tb, r, base.ScheduledKey(qname)) } -// GetRetryMessages returns all task messages in the retry queue. -func GetRetryMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { +// GetRetryMessages returns all retry messages in the given queue. +func GetRetryMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.RetryQueue) + return getZSetMessages(tb, r, base.RetryKey(qname)) } -// GetDeadMessages returns all task messages in the dead queue. -func GetDeadMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { +// GetDeadMessages returns all dead messages in the given queue. +func GetDeadMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.DeadQueue) + return getZSetMessages(tb, r, base.DeadKey(qname)) } -// GetScheduledEntries returns all task messages and its score in the scheduled queue. -func GetScheduledEntries(tb testing.TB, r *redis.Client) []base.Z { +// GetScheduledEntries returns all scheduled messages and its score in the given queue. +func GetScheduledEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.ScheduledQueue) + return getZSetEntries(tb, r, base.ScheduledKey(qname)) } -// GetRetryEntries returns all task messages and its score in the retry queue. -func GetRetryEntries(tb testing.TB, r *redis.Client) []base.Z { +// GetRetryEntries returns all retry messages and its score in the given queue. +func GetRetryEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.RetryQueue) + return getZSetEntries(tb, r, base.RetryKey(qname)) } -// GetDeadEntries returns all task messages and its score in the dead queue. -func GetDeadEntries(tb testing.TB, r *redis.Client) []base.Z { +// GetDeadEntries returns all dead messages and its score in the given queue. +func GetDeadEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.DeadQueue) + return getZSetEntries(tb, r, base.DeadKey(qname)) } -// GetDeadlinesEntries returns all task messages and its score in the deadlines set. -func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []base.Z { +// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. +func GetDeadlinesEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.KeyDeadlines) + return getZSetEntries(tb, r, base.DeadlinesKey(qname)) } func getListMessages(tb testing.TB, r *redis.Client, list string) []*base.TaskMessage { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 69a90dd..4da6fd4 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -50,27 +50,22 @@ func (r *RDB) Ping() error { return r.client.Ping().Err() } -// KEYS[1] -> asynq:queues: -// KEYS[2] -> asynq:queues -// ARGV[1] -> task message data -var enqueueCmd = redis.NewScript(` -redis.call("LPUSH", KEYS[1], ARGV[1]) -redis.call("SADD", KEYS[2], KEYS[1]) -return 1`) - // Enqueue inserts the given task to the tail of the queue. func (r *RDB) Enqueue(msg *base.TaskMessage) error { encoded, err := base.EncodeMessage(msg) if err != nil { return err } + err := r.client.SAdd(base.AllQueues, msg.Queue).Err() + if err != nil { + return err + } key := base.QueueKey(msg.Queue) - return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, encoded).Err() + return r.client.LPush(key, encoded).Err() } -// KEYS[1] -> unique key in the form :: -// KEYS[2] -> asynq:queues: -// KEYS[2] -> asynq:queues +// KEYS[1] -> unique key +// KEYS[2] -> asynq:{} // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> task message data @@ -80,7 +75,6 @@ if not ok then return 0 end redis.call("LPUSH", KEYS[2], ARGV[3]) -redis.call("SADD", KEYS[3], KEYS[2]) return 1 `) @@ -91,9 +85,12 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { if err != nil { return err } - key := base.QueueKey(msg.Queue) + err := r.client.SAdd(base.AllQueues, msg.Queue).Err() + if err != nil { + return err + } res, err := enqueueUniqueCmd.Run(r.client, - []string{msg.UniqueKey, key, base.AllQueues}, + []string{msg.UniqueKey, base.QueueKey(msg.Queue)}, msg.ID.String(), int(ttl.Seconds()), encoded).Result() if err != nil { return err @@ -194,7 +191,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:deadlines // KEYS[3] -> asynq:processed: -// KEYS[4] -> unique key in the format :: +// KEYS[4] -> unique key // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp // ARGV[3] -> task ID @@ -257,45 +254,32 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { encoded).Err() } -// KEYS[1] -> asynq:scheduled -// KEYS[2] -> asynq:queues -// ARGV[1] -> score (process_at timestamp) -// ARGV[2] -> task message -// ARGV[3] -> queue key -var scheduleCmd = redis.NewScript(` -redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) -redis.call("SADD", KEYS[2], ARGV[3]) -return 1 -`) - // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { encoded, err := base.EncodeMessage(msg) if err != nil { return err } - qkey := base.QueueKey(msg.Queue) + err := r.client.SAdd(base.AllQueues, msg.Queue).Err() + if err != nil { + return err + } score := float64(processAt.Unix()) - return scheduleCmd.Run(r.client, - []string{base.ScheduledQueue, base.AllQueues}, - score, encoded, qkey).Err() + return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err() } -// KEYS[1] -> unique key in the format :: -// KEYS[2] -> asynq:scheduled -// KEYS[3] -> asynq:queues +// KEYS[1] -> unique key +// KEYS[2] -> asynq:{}:scheduled // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> score (process_at timestamp) // ARGV[4] -> task message -// ARGV[5] -> queue key var scheduleUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4]) -redis.call("SADD", KEYS[3], ARGV[5]) return 1 `) @@ -306,11 +290,14 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim if err != nil { return err } - qkey := base.QueueKey(msg.Queue) + err := r.client.SAdd(base.AllQueues, msg.Queue).Err() + if err != nil { + return err + } score := float64(processAt.Unix()) res, err := scheduleUniqueCmd.Run(r.client, - []string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues}, - msg.ID.String(), int(ttl.Seconds()), score, encoded, qkey).Result() + []string{msg.UniqueKey, base.ScheduledKey(msg.Queue)}, + msg.ID.String(), int(ttl.Seconds()), score, encoded).Result() if err != nil { return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index db4fa54..2e5d75d 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -34,10 +34,8 @@ func setup(t *testing.T) *RDB { func TestEnqueue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) - t2 := h.NewTaskMessage("generate_csv", map[string]interface{}{}) - t2.Queue = "csv" - t3 := h.NewTaskMessage("sync", nil) - t3.Queue = "low" + t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") + t3 := h.NewTaskMessageWithQueue("sync", nil, "low") tests := []struct { msg *base.TaskMessage @@ -55,17 +53,16 @@ func TestEnqueue(t *testing.T) { t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err) } - qkey := base.QueueKey(tc.msg.Queue) gotEnqueued := h.GetEnqueuedMessages(t, r.client, tc.msg.Queue) if len(gotEnqueued) != 1 { - t.Errorf("%q has length %d, want 1", qkey, len(gotEnqueued)) + t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotEnqueued)) continue } if diff := cmp.Diff(tc.msg, gotEnqueued[0]); diff != "" { t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) } - if !r.client.SIsMember(base.AllQueues, qkey).Val() { - t.Errorf("%q is not a member of SET %q", qkey, base.AllQueues) + if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { + t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) } } } @@ -77,7 +74,7 @@ func TestEnqueueUnique(t *testing.T) { Type: "email", Payload: map[string]interface{}{"user_id": 123}, Queue: base.DefaultQueueName, - UniqueKey: "email:user_id=123:default", + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), } tests := []struct { @@ -103,12 +100,14 @@ func TestEnqueueUnique(t *testing.T) { tc.msg, tc.ttl, got, ErrDuplicateTask) continue } - gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) continue } + if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { + t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) + } } } @@ -673,15 +672,20 @@ func TestSchedule(t *testing.T) { continue } - gotScheduled := h.GetScheduledEntries(t, r.client) + gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue) if len(gotScheduled) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue) + t.Errorf("%s inserted %d items to %q, want 1 items inserted", + desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue)) continue } if int64(gotScheduled[0].Score) != tc.processAt.Unix() { - t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) + t.Errorf("%s inserted an item with score %d, want %d", + desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) continue } + if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { + t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) + } } } @@ -692,7 +696,7 @@ func TestScheduleUnique(t *testing.T) { Type: "email", Payload: map[string]interface{}{"user_id": 123}, Queue: base.DefaultQueueName, - UniqueKey: "email:user_id=123:default", + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), } tests := []struct { @@ -713,9 +717,9 @@ func TestScheduleUnique(t *testing.T) { continue } - gotScheduled := h.GetScheduledEntries(t, r.client) + gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue) if len(gotScheduled) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue) + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue)) continue } if int64(gotScheduled[0].Score) != tc.processAt.Unix() { @@ -734,6 +738,10 @@ func TestScheduleUnique(t *testing.T) { t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) continue } + if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { + t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) + continue + } } }