mirror of
https://github.com/hibiken/asynq.git
synced 2025-01-12 16:03:37 +08:00
Record time when task moved to pending state
This commit is contained in:
parent
9f2c321e98
commit
6e7106c8f2
@ -63,7 +63,7 @@ type DailyStats struct {
|
|||||||
Time time.Time
|
Time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:<qname>
|
// KEYS[1] -> asynq:<qname>:pending
|
||||||
// KEYS[2] -> asynq:<qname>:active
|
// KEYS[2] -> asynq:<qname>:active
|
||||||
// KEYS[3] -> asynq:<qname>:scheduled
|
// KEYS[3] -> asynq:<qname>:scheduled
|
||||||
// KEYS[4] -> asynq:<qname>:retry
|
// KEYS[4] -> asynq:<qname>:retry
|
||||||
|
@ -73,6 +73,7 @@ func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *
|
|||||||
// ARGV[2] -> task ID
|
// ARGV[2] -> task ID
|
||||||
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
||||||
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
||||||
|
// ARGV[5] -> current time
|
||||||
//
|
//
|
||||||
// Output:
|
// Output:
|
||||||
// Returns 1 if successfully enqueued
|
// Returns 1 if successfully enqueued
|
||||||
@ -85,7 +86,8 @@ redis.call("HSET", KEYS[1],
|
|||||||
"msg", ARGV[1],
|
"msg", ARGV[1],
|
||||||
"state", "pending",
|
"state", "pending",
|
||||||
"timeout", ARGV[3],
|
"timeout", ARGV[3],
|
||||||
"deadline", ARGV[4])
|
"deadline", ARGV[4],
|
||||||
|
"pending_since", ARGV[5])
|
||||||
redis.call("LPUSH", KEYS[2], ARGV[2])
|
redis.call("LPUSH", KEYS[2], ARGV[2])
|
||||||
return 1
|
return 1
|
||||||
`)
|
`)
|
||||||
@ -109,6 +111,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
|||||||
msg.ID,
|
msg.ID,
|
||||||
msg.Timeout,
|
msg.Timeout,
|
||||||
msg.Deadline,
|
msg.Deadline,
|
||||||
|
time.Now().Unix(),
|
||||||
}
|
}
|
||||||
n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
|
n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -131,6 +134,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
|||||||
// ARGV[3] -> task message data
|
// ARGV[3] -> task message data
|
||||||
// ARGV[4] -> task timeout in seconds (0 if not timeout)
|
// ARGV[4] -> task timeout in seconds (0 if not timeout)
|
||||||
// ARGV[5] -> task deadline in unix time (0 if no deadline)
|
// ARGV[5] -> task deadline in unix time (0 if no deadline)
|
||||||
|
// ARGV[6] -> current time
|
||||||
//
|
//
|
||||||
// Output:
|
// Output:
|
||||||
// Returns 1 if successfully enqueued
|
// Returns 1 if successfully enqueued
|
||||||
@ -149,6 +153,7 @@ redis.call("HSET", KEYS[2],
|
|||||||
"state", "pending",
|
"state", "pending",
|
||||||
"timeout", ARGV[4],
|
"timeout", ARGV[4],
|
||||||
"deadline", ARGV[5],
|
"deadline", ARGV[5],
|
||||||
|
"pending_since", ARGV[6],
|
||||||
"unique_key", KEYS[1])
|
"unique_key", KEYS[1])
|
||||||
redis.call("LPUSH", KEYS[3], ARGV[1])
|
redis.call("LPUSH", KEYS[3], ARGV[1])
|
||||||
return 1
|
return 1
|
||||||
@ -176,6 +181,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
|||||||
encoded,
|
encoded,
|
||||||
msg.Timeout,
|
msg.Timeout,
|
||||||
msg.Deadline,
|
msg.Deadline,
|
||||||
|
time.Now().Unix(),
|
||||||
}
|
}
|
||||||
n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...)
|
n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -762,15 +768,17 @@ local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 10
|
|||||||
for _, id in ipairs(ids) do
|
for _, id in ipairs(ids) do
|
||||||
redis.call("LPUSH", KEYS[2], id)
|
redis.call("LPUSH", KEYS[2], id)
|
||||||
redis.call("ZREM", KEYS[1], id)
|
redis.call("ZREM", KEYS[1], id)
|
||||||
redis.call("HSET", ARGV[2] .. id, "state", "pending")
|
redis.call("HSET", ARGV[2] .. id,
|
||||||
|
"state", "pending",
|
||||||
|
"pending_since", ARGV[1])
|
||||||
end
|
end
|
||||||
return table.getn(ids)`)
|
return table.getn(ids)`)
|
||||||
|
|
||||||
// forward moves tasks with a score less than the current unix time
|
// forward moves tasks with a score less than the current unix time
|
||||||
// from the src zset to the dst list. It returns the number of tasks moved.
|
// from the src zset to the dst list. It returns the number of tasks moved.
|
||||||
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
|
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
|
||||||
now := float64(time.Now().Unix())
|
res, err := forwardCmd.Run(context.Background(), r.client,
|
||||||
res, err := forwardCmd.Run(context.Background(), r.client, []string{src, dst}, now, taskKeyPrefix).Result()
|
[]string{src, dst}, time.Now().Unix(), taskKeyPrefix).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,7 @@ func TestEnqueue(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client) // clean up db before each test case.
|
h.FlushDB(t, r.client) // clean up db before each test case.
|
||||||
|
|
||||||
|
enqueueTime := time.Now()
|
||||||
err := r.Enqueue(context.Background(), tc.msg)
|
err := r.Enqueue(context.Background(), tc.msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||||
@ -115,6 +116,10 @@ func TestEnqueue(t *testing.T) {
|
|||||||
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
|
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
|
||||||
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
|
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
|
||||||
}
|
}
|
||||||
|
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
|
||||||
|
if want := strconv.Itoa(int(enqueueTime.Unix())); pendingSince != want {
|
||||||
|
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
|
||||||
|
}
|
||||||
|
|
||||||
// Check queue is in the AllQueues set.
|
// Check queue is in the AllQueues set.
|
||||||
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
|
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
|
||||||
@ -181,6 +186,7 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case.
|
h.FlushDB(t, r.client) // clean up db before each test case.
|
||||||
|
|
||||||
// Enqueue the first message, should succeed.
|
// Enqueue the first message, should succeed.
|
||||||
|
enqueueTime := time.Now()
|
||||||
err := r.EnqueueUnique(context.Background(), tc.msg, tc.ttl)
|
err := r.EnqueueUnique(context.Background(), tc.msg, tc.ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil",
|
t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil",
|
||||||
@ -230,6 +236,10 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
|
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
|
||||||
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
|
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
|
||||||
}
|
}
|
||||||
|
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
|
||||||
|
if want := strconv.Itoa(int(enqueueTime.Unix())); pendingSince != want {
|
||||||
|
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
|
||||||
|
}
|
||||||
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
|
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
|
||||||
if uniqueKey != tc.msg.UniqueKey {
|
if uniqueKey != tc.msg.UniqueKey {
|
||||||
t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey)
|
t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey)
|
||||||
@ -2055,6 +2065,7 @@ func TestForwardIfReady(t *testing.T) {
|
|||||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||||
|
|
||||||
|
now := time.Now() // time when the method is called
|
||||||
err := r.ForwardIfReady(tc.qnames...)
|
err := r.ForwardIfReady(tc.qnames...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err)
|
t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err)
|
||||||
@ -2066,6 +2077,13 @@ func TestForwardIfReady(t *testing.T) {
|
|||||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
|
||||||
}
|
}
|
||||||
|
// Make sure "pending_since" field is set
|
||||||
|
for _, msg := range gotPending {
|
||||||
|
pendingSince := r.client.HGet(context.Background(), base.TaskKey(msg.Queue, msg.ID), "pending_since").Val()
|
||||||
|
if want := strconv.Itoa(int(now.Unix())); pendingSince != want {
|
||||||
|
t.Error("pending_since field is not set for newly pending message")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantScheduled {
|
for qname, want := range tc.wantScheduled {
|
||||||
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
|
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
|
||||||
|
Loading…
Reference in New Issue
Block a user