From d865d899007f6160dfdbacfc3fee3415f47a73dc Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 8 Feb 2022 06:47:31 -0800 Subject: [PATCH] Update RDB.Dequeue to insert task ID to lease set --- internal/asynqtest/asynqtest.go | 7 ++ internal/base/base.go | 2 +- internal/rdb/benchmark_test.go | 4 +- internal/rdb/rdb.go | 60 +++++----------- internal/rdb/rdb_test.go | 111 +++++++++++++----------------- internal/testbroker/testbroker.go | 4 +- 6 files changed, 77 insertions(+), 111 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index ec6f510..932668c 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -423,6 +423,13 @@ func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) [ return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive) } +// GetLeaseEntries returns all task IDs and its score in the lease set for the given queue. +// It also asserts the state field of the task. +func GetLeaseEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { + tb.Helper() + return getMessagesFromZSetWithScores(tb, r, qname, base.LeaseKey, base.TaskStateActive) +} + // GetCompletedEntries returns all completed messages and its score in the given queue. // It also asserts the state field of the task. func GetCompletedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { diff --git a/internal/base/base.go b/internal/base/base.go index 5eb999a..cede294 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -615,7 +615,7 @@ type Broker interface { Ping() error Enqueue(ctx context.Context, msg *TaskMessage) error EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error - Dequeue(qnames ...string) (*TaskMessage, time.Time, error) + Dequeue(qnames ...string) (*TaskMessage, error) Done(msg *TaskMessage) error MarkAsComplete(msg *TaskMessage) error Requeue(msg *TaskMessage) error diff --git a/internal/rdb/benchmark_test.go b/internal/rdb/benchmark_test.go index 468e309..3e28564 100644 --- a/internal/rdb/benchmark_test.go +++ b/internal/rdb/benchmark_test.go @@ -113,7 +113,7 @@ func BenchmarkDequeueSingleQueue(b *testing.B) { } b.StartTimer() - if _, _, err := r.Dequeue(base.DefaultQueueName); err != nil { + if _, err := r.Dequeue(base.DefaultQueueName); err != nil { b.Fatalf("Dequeue failed: %v", err) } } @@ -139,7 +139,7 @@ func BenchmarkDequeueMultipleQueues(b *testing.B) { } b.StartTimer() - if _, _, err := r.Dequeue(qnames...); err != nil { + if _, err := r.Dequeue(qnames...); err != nil { b.Fatalf("Dequeue failed: %v", err) } } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d0568a3..d3ab9c8 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -20,6 +20,8 @@ import ( const statsTTL = 90 * 24 * time.Hour // 90 days +const leaseDuration = 30 * time.Second + // RDB is a client interface to query and mutate task queues. type RDB struct { client redis.UniversalClient @@ -213,20 +215,17 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time // KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:paused // KEYS[3] -> asynq:{}:active -// KEYS[4] -> asynq:{}:deadlines +// KEYS[4] -> asynq:{}:lease // -- -// ARGV[1] -> current time in Unix time +// ARGV[1] -> initial lease expiration Unix time // ARGV[2] -> task key prefix // // Output: // Returns nil if no processable task is found in the given queue. -// Returns tuple {msg , deadline} if task is found, where `msg` is the encoded -// TaskMessage, and `deadline` is Unix time in seconds. +// Returns an encoded TaskMessage. // // Note: dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. -// It computes the task deadline by inspecting Timout and Deadline fields, -// and inserts the task to the deadlines zset with the computed deadline. var dequeueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[2]) == 0 then local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) @@ -234,70 +233,45 @@ if redis.call("EXISTS", KEYS[2]) == 0 then local key = ARGV[2] .. id redis.call("HSET", key, "state", "active") redis.call("HDEL", key, "pending_since") - local data = redis.call("HMGET", key, "msg", "timeout", "deadline") - local msg = data[1] - local timeout = tonumber(data[2]) - local deadline = tonumber(data[3]) - local score - if timeout ~= 0 and deadline ~= 0 then - score = math.min(ARGV[1]+timeout, deadline) - elseif timeout ~= 0 then - score = ARGV[1] + timeout - elseif deadline ~= 0 then - score = deadline - else - return redis.error_reply("asynq internal error: both timeout and deadline are not set") - end - redis.call("ZADD", KEYS[4], score, id) - return {msg, score} + redis.call("ZADD", KEYS[4], ARGV[1], id) + return redis.call("HGET", key, "msg") end end return nil`) // Dequeue queries given queues in order and pops a task message -// off a queue if one exists and returns the message and deadline. +// off a queue if one exists and returns the message. // Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. -func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { +func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) { var op errors.Op = "rdb.Dequeue" for _, qname := range qnames { keys := []string{ base.PendingKey(qname), base.PausedKey(qname), base.ActiveKey(qname), - base.DeadlinesKey(qname), + base.LeaseKey(qname), } argv := []interface{}{ - r.clock.Now().Unix(), + r.clock.Now().Add(leaseDuration).Unix(), base.TaskKeyPrefix(qname), } res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result() if err == redis.Nil { continue } else if err != nil { - return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } - data, err := cast.ToSliceE(res) + encoded, err := cast.ToStringE(res) if err != nil { - return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) - } - if len(data) != 2 { - return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("Lua script returned %d values; expected 2", len(data))) - } - encoded, err := cast.ToStringE(data[0]) - if err != nil { - return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) - } - d, err := cast.ToInt64E(data[1]) - if err != nil { - return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) + return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) } if msg, err = base.DecodeMessage([]byte(encoded)); err != nil { - return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err)) + return nil, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err)) } - return msg, time.Unix(d, 0), nil + return msg, nil } - return nil, time.Time{}, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask) + return nil, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask) } // KEYS[1] -> asynq:{}:active diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index b08d54e..fa8e058 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -322,7 +322,6 @@ func TestDequeue(t *testing.T) { Timeout: 1800, Deadline: 0, } - t1Deadline := now.Unix() + t1.Timeout t2 := &base.TaskMessage{ ID: uuid.NewString(), Type: "export_csv", @@ -331,7 +330,6 @@ func TestDequeue(t *testing.T) { Timeout: 0, Deadline: 1593021600, } - t2Deadline := t2.Deadline t3 := &base.TaskMessage{ ID: uuid.NewString(), Type: "reindex", @@ -342,29 +340,27 @@ func TestDequeue(t *testing.T) { } tests := []struct { - pending map[string][]*base.TaskMessage - args []string // list of queues to query - wantMsg *base.TaskMessage - wantDeadline time.Time - wantPending map[string][]*base.TaskMessage - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z + pending map[string][]*base.TaskMessage + qnames []string // list of queues to query + wantMsg *base.TaskMessage + wantPending map[string][]*base.TaskMessage + wantActive map[string][]*base.TaskMessage + wantLease map[string][]base.Z }{ { pending: map[string][]*base.TaskMessage{ "default": {t1}, }, - args: []string{"default"}, - wantMsg: t1, - wantDeadline: time.Unix(t1Deadline, 0), + qnames: []string{"default"}, + wantMsg: t1, wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantActive: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}}, }, }, { @@ -373,9 +369,8 @@ func TestDequeue(t *testing.T) { "critical": {t2}, "low": {t3}, }, - args: []string{"critical", "default", "low"}, - wantMsg: t2, - wantDeadline: time.Unix(t2Deadline, 0), + qnames: []string{"critical", "default", "low"}, + wantMsg: t2, wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {}, @@ -386,9 +381,9 @@ func TestDequeue(t *testing.T) { "critical": {t2}, "low": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, - "critical": {{Message: t2, Score: t2Deadline}}, + "critical": {{Message: t2, Score: now.Add(leaseDuration).Unix()}}, "low": {}, }, }, @@ -398,9 +393,8 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {t3}, }, - args: []string{"critical", "default", "low"}, - wantMsg: t1, - wantDeadline: time.Unix(t1Deadline, 0), + qnames: []string{"critical", "default", "low"}, + wantMsg: t1, wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, @@ -411,8 +405,8 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}}, "critical": {}, "low": {}, }, @@ -423,19 +417,14 @@ func TestDequeue(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllPendingQueues(t, r.client, tc.pending) - gotMsg, gotDeadline, err := r.Dequeue(tc.args...) + gotMsg, err := r.Dequeue(tc.qnames...) if err != nil { - t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.args, err) + t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.qnames, err) continue } if !cmp.Equal(gotMsg, tc.wantMsg) { t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v", - tc.args, gotMsg, tc.wantMsg) - continue - } - if !cmp.Equal(gotDeadline, tc.wantDeadline) { - t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", - tc.args, gotDeadline, tc.wantDeadline) + tc.qnames, gotMsg, tc.wantMsg) continue } @@ -451,10 +440,10 @@ func TestDequeue(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff) } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LeaseKey(queue), diff) } } } @@ -465,18 +454,18 @@ func TestDequeueError(t *testing.T) { defer r.Close() tests := []struct { - pending map[string][]*base.TaskMessage - args []string // list of queues to query - wantErr error - wantPending map[string][]*base.TaskMessage - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z + pending map[string][]*base.TaskMessage + qnames []string // list of queues to query + wantErr error + wantPending map[string][]*base.TaskMessage + wantActive map[string][]*base.TaskMessage + wantLease map[string][]base.Z }{ { pending: map[string][]*base.TaskMessage{ "default": {}, }, - args: []string{"default"}, + qnames: []string{"default"}, wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -484,7 +473,7 @@ func TestDequeueError(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, }, }, @@ -494,7 +483,7 @@ func TestDequeueError(t *testing.T) { "critical": {}, "low": {}, }, - args: []string{"critical", "default", "low"}, + qnames: []string{"critical", "default", "low"}, wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -506,7 +495,7 @@ func TestDequeueError(t *testing.T) { "critical": {}, "low": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, "critical": {}, "low": {}, @@ -518,18 +507,14 @@ func TestDequeueError(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllPendingQueues(t, r.client, tc.pending) - gotMsg, gotDeadline, gotErr := r.Dequeue(tc.args...) + gotMsg, gotErr := r.Dequeue(tc.qnames...) if !errors.Is(gotErr, tc.wantErr) { t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v", - tc.args, gotErr, tc.wantErr) + tc.qnames, gotErr, tc.wantErr) continue } if gotMsg != nil { - t.Errorf("(*RDB).Dequeue(%v) returned message %v; want nil", tc.args, gotMsg) - continue - } - if !gotDeadline.IsZero() { - t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", tc.args, gotDeadline, time.Time{}) + t.Errorf("(*RDB).Dequeue(%v) returned message %v; want nil", tc.qnames, gotMsg) continue } @@ -545,10 +530,10 @@ func TestDequeueError(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff) } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LeaseKey(queue), diff) } } } @@ -577,7 +562,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { tests := []struct { paused []string // list of paused queues pending map[string][]*base.TaskMessage - args []string // list of queues to query + qnames []string // list of queues to query wantMsg *base.TaskMessage wantErr error wantPending map[string][]*base.TaskMessage @@ -589,7 +574,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { "default": {t1}, "critical": {t2}, }, - args: []string{"default", "critical"}, + qnames: []string{"default", "critical"}, wantMsg: t2, wantErr: nil, wantPending: map[string][]*base.TaskMessage{ @@ -606,7 +591,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { pending: map[string][]*base.TaskMessage{ "default": {t1}, }, - args: []string{"default"}, + qnames: []string{"default"}, wantMsg: nil, wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ @@ -622,7 +607,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { "default": {t1}, "critical": {t2}, }, - args: []string{"default", "critical"}, + qnames: []string{"default", "critical"}, wantMsg: nil, wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ @@ -645,10 +630,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { } h.SeedAllPendingQueues(t, r.client, tc.pending) - got, _, err := r.Dequeue(tc.args...) + got, err := r.Dequeue(tc.qnames...) if !cmp.Equal(got, tc.wantMsg) || !errors.Is(err, tc.wantErr) { t.Errorf("Dequeue(%v) = %v, %v; want %v, %v", - tc.args, got, err, tc.wantMsg, tc.wantErr) + tc.qnames, got, err, tc.wantMsg, tc.wantErr) continue } diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 5f3a023..2b5e8f8 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -64,11 +64,11 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, return tb.real.EnqueueUnique(ctx, msg, ttl) } -func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) { +func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, error) { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { - return nil, time.Time{}, errRedisDown + return nil, errRedisDown } return tb.real.Dequeue(qnames...) }