2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

Remove timeout and deadline fields under task key

This commit is contained in:
Ken Hibino 2022-02-16 06:17:08 -08:00
parent 8211167de2
commit 59927509d8
2 changed files with 7 additions and 61 deletions

View File

@ -87,9 +87,7 @@ func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *
// -- // --
// ARGV[1] -> task message data // ARGV[1] -> task message data
// ARGV[2] -> task ID // ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[3] -> current unix time in nsec
// ARGV[4] -> task deadline in unix time (0 if no deadline)
// ARGV[5] -> current unix time in nsec
// //
// Output: // Output:
// Returns 1 if successfully enqueued // Returns 1 if successfully enqueued
@ -101,9 +99,7 @@ end
redis.call("HSET", KEYS[1], redis.call("HSET", KEYS[1],
"msg", ARGV[1], "msg", ARGV[1],
"state", "pending", "state", "pending",
"timeout", ARGV[3], "pending_since", ARGV[3])
"deadline", ARGV[4],
"pending_since", ARGV[5])
redis.call("LPUSH", KEYS[2], ARGV[2]) redis.call("LPUSH", KEYS[2], ARGV[2])
return 1 return 1
`) `)
@ -125,8 +121,6 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
argv := []interface{}{ argv := []interface{}{
encoded, encoded,
msg.ID, msg.ID,
msg.Timeout,
msg.Deadline,
r.clock.Now().UnixNano(), r.clock.Now().UnixNano(),
} }
n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...) n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
@ -148,9 +142,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL // ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data // ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> current unix time in nsec
// ARGV[5] -> task deadline in unix time (0 if no deadline)
// ARGV[6] -> current unix time in nsec
// //
// Output: // Output:
// Returns 1 if successfully enqueued // Returns 1 if successfully enqueued
@ -167,9 +159,7 @@ end
redis.call("HSET", KEYS[2], redis.call("HSET", KEYS[2],
"msg", ARGV[3], "msg", ARGV[3],
"state", "pending", "state", "pending",
"timeout", ARGV[4], "pending_since", ARGV[4],
"deadline", ARGV[5],
"pending_since", ARGV[6],
"unique_key", KEYS[1]) "unique_key", KEYS[1])
redis.call("LPUSH", KEYS[3], ARGV[1]) redis.call("LPUSH", KEYS[3], ARGV[1])
return 1 return 1
@ -195,8 +185,6 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
msg.ID, msg.ID,
int(ttl.Seconds()), int(ttl.Seconds()),
encoded, encoded,
msg.Timeout,
msg.Deadline,
r.clock.Now().UnixNano(), r.clock.Now().UnixNano(),
} }
n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...) n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...)
@ -511,11 +499,10 @@ func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error {
// KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:scheduled // KEYS[2] -> asynq:{<qname>}:scheduled
// -------
// ARGV[1] -> task message data // ARGV[1] -> task message data
// ARGV[2] -> process_at time in Unix time // ARGV[2] -> process_at time in Unix time
// ARGV[3] -> task ID // ARGV[3] -> task ID
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
// //
// Output: // Output:
// Returns 1 if successfully enqueued // Returns 1 if successfully enqueued
@ -526,9 +513,7 @@ if redis.call("EXISTS", KEYS[1]) == 1 then
end end
redis.call("HSET", KEYS[1], redis.call("HSET", KEYS[1],
"msg", ARGV[1], "msg", ARGV[1],
"state", "scheduled", "state", "scheduled")
"timeout", ARGV[4],
"deadline", ARGV[5])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1 return 1
`) `)
@ -551,8 +536,6 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim
encoded, encoded,
processAt.Unix(), processAt.Unix(),
msg.ID, msg.ID,
msg.Timeout,
msg.Deadline,
} }
n, err := r.runScriptWithErrorCode(ctx, op, scheduleCmd, keys, argv...) n, err := r.runScriptWithErrorCode(ctx, op, scheduleCmd, keys, argv...)
if err != nil { if err != nil {
@ -567,12 +550,11 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim
// KEYS[1] -> unique key // KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:t:<task_id>
// KEYS[3] -> asynq:{<qname>}:scheduled // KEYS[3] -> asynq:{<qname>}:scheduled
// -------
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL // ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> score (process_at timestamp) // ARGV[3] -> score (process_at timestamp)
// ARGV[4] -> task message // ARGV[4] -> task message
// ARGV[5] -> task timeout in seconds (0 if not timeout)
// ARGV[6] -> task deadline in unix time (0 if no deadline)
// //
// Output: // Output:
// Returns 1 if successfully scheduled // Returns 1 if successfully scheduled
@ -589,8 +571,6 @@ end
redis.call("HSET", KEYS[2], redis.call("HSET", KEYS[2],
"msg", ARGV[4], "msg", ARGV[4],
"state", "scheduled", "state", "scheduled",
"timeout", ARGV[5],
"deadline", ARGV[6],
"unique_key", KEYS[1]) "unique_key", KEYS[1])
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])
return 1 return 1
@ -617,8 +597,6 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
int(ttl.Seconds()), int(ttl.Seconds()),
processAt.Unix(), processAt.Unix(),
encoded, encoded,
msg.Timeout,
msg.Deadline,
} }
n, err := r.runScriptWithErrorCode(ctx, op, scheduleUniqueCmd, keys, argv...) n, err := r.runScriptWithErrorCode(ctx, op, scheduleUniqueCmd, keys, argv...)
if err != nil { if err != nil {

View File

@ -112,14 +112,6 @@ func TestEnqueue(t *testing.T) {
if state != "pending" { if state != "pending" {
t.Errorf("state field under task-key is set to %q, want %q", state, "pending") t.Errorf("state field under task-key is set to %q, want %q", state, "pending")
} }
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want { if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want {
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
@ -234,14 +226,6 @@ func TestEnqueueUnique(t *testing.T) {
if state != "pending" { if state != "pending" {
t.Errorf("state field under task-key is set to %q, want %q", state, "pending") t.Errorf("state field under task-key is set to %q, want %q", state, "pending")
} }
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want { if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want {
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
@ -1236,14 +1220,6 @@ func TestSchedule(t *testing.T) {
t.Errorf("state field under task-key is set to %q, want %q", t.Errorf("state field under task-key is set to %q, want %q",
state, want) state, want)
} }
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want)
}
// Check queue is in the AllQueues set. // Check queue is in the AllQueues set.
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() { if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
@ -1352,14 +1328,6 @@ func TestScheduleUnique(t *testing.T) {
t.Errorf("state field under task-key is set to %q, want %q", t.Errorf("state field under task-key is set to %q, want %q",
state, want) state, want)
} }
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
if uniqueKey != tc.msg.UniqueKey { if uniqueKey != tc.msg.UniqueKey {
t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey) t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey)