mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Add methods to rdb to enqueue all tasks from dead, retry and scheduled
queue
This commit is contained in:
parent
c0185061eb
commit
0d74c518bf
7
asynq.go
7
asynq.go
@ -5,13 +5,18 @@ import "github.com/go-redis/redis/v7"
|
|||||||
/*
|
/*
|
||||||
TODOs:
|
TODOs:
|
||||||
- [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue
|
- [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue
|
||||||
|
- [P0] asynqmon del <taskID>, asynqmon delall <qname>
|
||||||
|
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
||||||
|
- [P0] Redis Memory Usage, Connection info in stats
|
||||||
|
- [P0] Processed, Failed count for today
|
||||||
- [P0] Go docs + CONTRIBUTION.md + Github issue template
|
- [P0] Go docs + CONTRIBUTION.md + Github issue template
|
||||||
|
- [P0] Redis Sentinel support
|
||||||
- [P1] Add Support for multiple queues and priority
|
- [P1] Add Support for multiple queues and priority
|
||||||
- [P1] User defined max-retry count
|
- [P1] User defined max-retry count
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Max retry count by default
|
// Max retry count by default
|
||||||
const defaultMaxRetry = 25
|
const defaultMaxRetry = 1
|
||||||
|
|
||||||
// Task represents a task to be performed.
|
// Task represents a task to be performed.
|
||||||
type Task struct {
|
type Task struct {
|
||||||
|
@ -272,6 +272,21 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue.
|
||||||
|
func (r *RDB) EnqueueAllScheduledTasks() error {
|
||||||
|
return r.removeAndEnqueueAll(scheduledQ)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnqueueAllRetryTasks enqueues all tasks from retry queue.
|
||||||
|
func (r *RDB) EnqueueAllRetryTasks() error {
|
||||||
|
return r.removeAndEnqueueAll(retryQ)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnqueueAllDeadTasks enqueues all tasks from dead queue.
|
||||||
|
func (r *RDB) EnqueueAllDeadTasks() error {
|
||||||
|
return r.removeAndEnqueueAll(deadQ)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
|
func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
|
||||||
@ -295,3 +310,19 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
|
|||||||
}
|
}
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RDB) removeAndEnqueueAll(zset string) error {
|
||||||
|
script := redis.NewScript(`
|
||||||
|
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
|
||||||
|
for _, msg in ipairs(msgs) do
|
||||||
|
redis.call("ZREM", KEYS[1], msg)
|
||||||
|
redis.call("LPUSH", KEYS[2], msg)
|
||||||
|
end
|
||||||
|
return table.getn(msgs)
|
||||||
|
`)
|
||||||
|
_, err := script.Run(r.client, []string{zset, defaultQ}).Result()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -698,3 +698,156 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEnqueueAllScheduledTasks(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
t1 := randomTask("send_email", "default", nil)
|
||||||
|
t2 := randomTask("gen_thumbnail", "default", nil)
|
||||||
|
t3 := randomTask("reindex", "default", nil)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
scheduled []*TaskMessage
|
||||||
|
wantEnqueued []*TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "with tasks in scheduled queue",
|
||||||
|
scheduled: []*TaskMessage{t1, t2, t3},
|
||||||
|
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "with empty scheduled queue",
|
||||||
|
scheduled: []*TaskMessage{},
|
||||||
|
wantEnqueued: []*TaskMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// clean up db before each test case.
|
||||||
|
if err := r.client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// initialize scheduled queue
|
||||||
|
for _, msg := range tc.scheduled {
|
||||||
|
err := r.client.ZAdd(scheduledQ, &redis.Z{
|
||||||
|
Member: mustMarshal(t, msg),
|
||||||
|
Score: float64(time.Now().Add(time.Hour).Unix())}).Err()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.EnqueueAllScheduledTasks()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, want nil", tc.description, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
|
||||||
|
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
||||||
|
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEnqueueAllRetryTasks(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
t1 := randomTask("send_email", "default", nil)
|
||||||
|
t2 := randomTask("gen_thumbnail", "default", nil)
|
||||||
|
t3 := randomTask("reindex", "default", nil)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
retry []*TaskMessage
|
||||||
|
wantEnqueued []*TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "with tasks in retry queue",
|
||||||
|
retry: []*TaskMessage{t1, t2, t3},
|
||||||
|
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "with empty retry queue",
|
||||||
|
retry: []*TaskMessage{},
|
||||||
|
wantEnqueued: []*TaskMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// clean up db before each test case.
|
||||||
|
if err := r.client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// initialize retry queue
|
||||||
|
for _, msg := range tc.retry {
|
||||||
|
err := r.client.ZAdd(retryQ, &redis.Z{
|
||||||
|
Member: mustMarshal(t, msg),
|
||||||
|
Score: float64(time.Now().Add(time.Hour).Unix())}).Err()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.EnqueueAllRetryTasks()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%s; r.EnqueueAllRetryTasks = %v, want nil", tc.description, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
|
||||||
|
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
||||||
|
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEnqueueAllDeadTasks(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
t1 := randomTask("send_email", "default", nil)
|
||||||
|
t2 := randomTask("gen_thumbnail", "default", nil)
|
||||||
|
t3 := randomTask("reindex", "default", nil)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
dead []*TaskMessage
|
||||||
|
wantEnqueued []*TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "with tasks in dead queue",
|
||||||
|
dead: []*TaskMessage{t1, t2, t3},
|
||||||
|
wantEnqueued: []*TaskMessage{t1, t2, t3},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "with empty dead queue",
|
||||||
|
dead: []*TaskMessage{},
|
||||||
|
wantEnqueued: []*TaskMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// clean up db before each test case.
|
||||||
|
if err := r.client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// initialize dead queue
|
||||||
|
for _, msg := range tc.dead {
|
||||||
|
err := r.client.ZAdd(deadQ, &redis.Z{
|
||||||
|
Member: mustMarshal(t, msg),
|
||||||
|
Score: float64(time.Now().Add(time.Hour).Unix())}).Err()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.EnqueueAllDeadTasks()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%s; r.EnqueueAllDeadTasks = %v, want nil", tc.description, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
|
||||||
|
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
||||||
|
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user