diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index c7e1eb3..ad861fb 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -63,7 +63,7 @@ type DailyStats struct { Time time.Time } -// KEYS[1] -> asynq: +// KEYS[1] -> asynq::pending // KEYS[2] -> asynq::active // KEYS[3] -> asynq::scheduled // KEYS[4] -> asynq::retry diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index a2a0fe7..191f930 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -73,6 +73,7 @@ func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script * // ARGV[2] -> task ID // ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task deadline in unix time (0 if no deadline) +// ARGV[5] -> current time // // Output: // Returns 1 if successfully enqueued @@ -85,7 +86,8 @@ redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "pending", "timeout", ARGV[3], - "deadline", ARGV[4]) + "deadline", ARGV[4], + "pending_since", ARGV[5]) redis.call("LPUSH", KEYS[2], ARGV[2]) return 1 `) @@ -109,6 +111,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { msg.ID, msg.Timeout, msg.Deadline, + time.Now().Unix(), } n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...) if err != nil { @@ -131,6 +134,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { // ARGV[3] -> task message data // ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[5] -> task deadline in unix time (0 if no deadline) +// ARGV[6] -> current time // // Output: // Returns 1 if successfully enqueued @@ -149,6 +153,7 @@ redis.call("HSET", KEYS[2], "state", "pending", "timeout", ARGV[4], "deadline", ARGV[5], + "pending_since", ARGV[6], "unique_key", KEYS[1]) redis.call("LPUSH", KEYS[3], ARGV[1]) return 1 @@ -176,6 +181,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time encoded, msg.Timeout, msg.Deadline, + time.Now().Unix(), } n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...) if err != nil { @@ -762,15 +768,17 @@ local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 10 for _, id in ipairs(ids) do redis.call("LPUSH", KEYS[2], id) redis.call("ZREM", KEYS[1], id) - redis.call("HSET", ARGV[2] .. id, "state", "pending") + redis.call("HSET", ARGV[2] .. id, + "state", "pending", + "pending_since", ARGV[1]) end return table.getn(ids)`) // forward moves tasks with a score less than the current unix time // from the src zset to the dst list. It returns the number of tasks moved. func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) { - now := float64(time.Now().Unix()) - res, err := forwardCmd.Run(context.Background(), r.client, []string{src, dst}, now, taskKeyPrefix).Result() + res, err := forwardCmd.Run(context.Background(), r.client, + []string{src, dst}, time.Now().Unix(), taskKeyPrefix).Result() if err != nil { return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 8d6a464..b60b24b 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -78,6 +78,7 @@ func TestEnqueue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case. + enqueueTime := time.Now() err := r.Enqueue(context.Background(), tc.msg) if err != nil { t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err) @@ -115,6 +116,10 @@ func TestEnqueue(t *testing.T) { if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want { t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want) } + pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field + if want := strconv.Itoa(int(enqueueTime.Unix())); pendingSince != want { + t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) + } // Check queue is in the AllQueues set. if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() { @@ -181,6 +186,7 @@ func TestEnqueueUnique(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case. // Enqueue the first message, should succeed. + enqueueTime := time.Now() err := r.EnqueueUnique(context.Background(), tc.msg, tc.ttl) if err != nil { t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil", @@ -230,6 +236,10 @@ func TestEnqueueUnique(t *testing.T) { if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want { t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want) } + pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field + if want := strconv.Itoa(int(enqueueTime.Unix())); pendingSince != want { + t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) + } uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field if uniqueKey != tc.msg.UniqueKey { t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey) @@ -2055,6 +2065,7 @@ func TestForwardIfReady(t *testing.T) { h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) + now := time.Now() // time when the method is called err := r.ForwardIfReady(tc.qnames...) if err != nil { t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err) @@ -2066,6 +2077,13 @@ func TestForwardIfReady(t *testing.T) { if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) } + // Make sure "pending_since" field is set + for _, msg := range gotPending { + pendingSince := r.client.HGet(context.Background(), base.TaskKey(msg.Queue, msg.ID), "pending_since").Val() + if want := strconv.Itoa(int(now.Unix())); pendingSince != want { + t.Error("pending_since field is not set for newly pending message") + } + } } for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledMessages(t, r.client, qname)