2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Update RDB.RunArchivedTask, RDB.RunRetryTask, RDB.ScheduledTask

This commit is contained in:
Ken Hibino 2021-02-26 06:40:28 -08:00
parent 3f10e97e30
commit 0f09f936a9
2 changed files with 28 additions and 40 deletions

View File

@ -431,8 +431,8 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
// RunArchivedTask finds an archived task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String(), float64(score))
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@ -445,8 +445,8 @@ func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
// RunRetryTask finds a retry task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String(), float64(score))
func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@ -459,8 +459,8 @@ func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
// RunScheduledTask finds a scheduled task that matches the given id and score from
// from the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String(), float64(score))
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@ -488,20 +488,20 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
}
// KEYS[1] -> sorted set to remove the id from
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
var removeAndRunCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
return 1
local n = redis.call("ZREM", KEYS[1], ARGV[1])
if n == 0 then
return 0
end
end
return 0`)
redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
`)
func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, score, id).Result()
func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result()
if err != nil {
return 0, err
}

View File

@ -996,7 +996,7 @@ var (
zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score
)
func TestRunDeadTask(t *testing.T) {
func TestRunArchivedTask(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
@ -1008,9 +1008,8 @@ func TestRunDeadTask(t *testing.T) {
tests := []struct {
archived map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunDeadTask
want error // expected return value from calling RunArchivedTask
wantArchived map[string][]*base.TaskMessage
wantPending map[string][]*base.TaskMessage
}{
@ -1022,7 +1021,6 @@ func TestRunDeadTask(t *testing.T) {
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantArchived: map[string][]*base.TaskMessage{
@ -1040,8 +1038,7 @@ func TestRunDeadTask(t *testing.T) {
},
},
qname: "default",
score: 123,
id: t2.ID,
id: uuid.New(),
want: ErrTaskNotFound,
wantArchived: map[string][]*base.TaskMessage{
"default": {t1, t2},
@ -1061,7 +1058,6 @@ func TestRunDeadTask(t *testing.T) {
},
},
qname: "critical",
score: s1,
id: t3.ID,
want: nil,
wantArchived: map[string][]*base.TaskMessage{
@ -1079,9 +1075,9 @@ func TestRunDeadTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.RunArchivedTask(tc.qname, tc.id, tc.score)
got := r.RunArchivedTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@ -1113,7 +1109,6 @@ func TestRunRetryTask(t *testing.T) {
tests := []struct {
retry map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunRetryTask
wantRetry map[string][]*base.TaskMessage
@ -1127,7 +1122,6 @@ func TestRunRetryTask(t *testing.T) {
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantRetry: map[string][]*base.TaskMessage{
@ -1145,8 +1139,7 @@ func TestRunRetryTask(t *testing.T) {
},
},
qname: "default",
score: 123,
id: t2.ID,
id: uuid.New(),
want: ErrTaskNotFound,
wantRetry: map[string][]*base.TaskMessage{
"default": {t1, t2},
@ -1166,7 +1159,6 @@ func TestRunRetryTask(t *testing.T) {
},
},
qname: "low",
score: s2,
id: t3.ID,
want: nil,
wantRetry: map[string][]*base.TaskMessage{
@ -1184,9 +1176,9 @@ func TestRunRetryTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue
got := r.RunRetryTask(tc.qname, tc.id, tc.score)
got := r.RunRetryTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@ -1218,7 +1210,6 @@ func TestRunScheduledTask(t *testing.T) {
tests := []struct {
scheduled map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunScheduledTask
wantScheduled map[string][]*base.TaskMessage
@ -1232,7 +1223,6 @@ func TestRunScheduledTask(t *testing.T) {
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
@ -1250,8 +1240,7 @@ func TestRunScheduledTask(t *testing.T) {
},
},
qname: "default",
score: 123,
id: t2.ID,
id: uuid.New(),
want: ErrTaskNotFound,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1, t2},
@ -1271,7 +1260,6 @@ func TestRunScheduledTask(t *testing.T) {
},
},
qname: "notifications",
score: s1,
id: t3.ID,
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
@ -1289,9 +1277,9 @@ func TestRunScheduledTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got := r.RunScheduledTask(tc.qname, tc.id, tc.score)
got := r.RunScheduledTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}