2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00

Update Pause and Unpause methods in RDB

This commit is contained in:
Ken Hibino 2020-08-11 21:36:49 -07:00
parent 24b13bd865
commit 44a3d177f0
2 changed files with 47 additions and 103 deletions

View File

@ -753,32 +753,28 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
return workers, nil return workers, nil
} }
// KEYS[1] -> asynq:paused
// ARGV[1] -> asynq:queues:<qname> - queue to pause
var pauseCmd = redis.NewScript(`
local ismem = redis.call("SISMEMBER", KEYS[1], ARGV[1])
if ismem == 1 then
return redis.error_reply("queue is already paused")
end
return redis.call("SADD", KEYS[1], ARGV[1])`)
// Pause pauses processing of tasks from the given queue. // Pause pauses processing of tasks from the given queue.
func (r *RDB) Pause(qname string) error { func (r *RDB) Pause(qname string) error {
qkey := base.QueueKey(qname) key := base.PauseKey(qname)
return pauseCmd.Run(r.client, []string{base.PausedQueues}, qkey).Err() exists, err := r.client.SetNX(key, time.Now().Unix(), 0).Result()
if err != nil {
return err
}
if exists {
return fmt.Errorf("queue %q is already paused", qname)
}
return nil
} }
// KEYS[1] -> asynq:paused
// ARGV[1] -> asynq:queues:<qname> - queue to unpause
var unpauseCmd = redis.NewScript(`
local ismem = redis.call("SISMEMBER", KEYS[1], ARGV[1])
if ismem == 0 then
return redis.error_reply("queue is not paused")
end
return redis.call("SREM", KEYS[1], ARGV[1])`)
// Unpause resumes processing of tasks from the given queue. // Unpause resumes processing of tasks from the given queue.
func (r *RDB) Unpause(qname string) error { func (r *RDB) Unpause(qname string) error {
qkey := base.QueueKey(qname) key := base.PauseKey(qname)
return unpauseCmd.Run(r.client, []string{base.PausedQueues}, qkey).Err() deleted, err := r.client.Del(key).Result()
if err != nil {
return err
}
if deleted == 0 {
return fmt.Errorf("queue %q is not paused", qname)
}
return nil
} }

View File

@ -2194,37 +2194,22 @@ func TestPause(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
initial []string // initial keys in the paused set
qname string // name of the queue to pause qname string // name of the queue to pause
want []string // expected keys in the paused set
}{ }{
{[]string{}, "default", []string{"asynq:queues:default"}}, {qname: "default"},
{[]string{"asynq:queues:default"}, "critical", []string{"asynq:queues:default", "asynq:queues:critical"}}, {qname: "custom"},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
// Set up initial state.
for _, qkey := range tc.initial {
if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil {
t.Fatal(err)
}
}
err := r.Pause(tc.qname) err := r.Pause(tc.qname)
if err != nil { if err != nil {
t.Errorf("Pause(%q) returned error: %v", tc.qname, err) t.Errorf("Pause(%q) returned error: %v", tc.qname, err)
} }
key := base.PauseKey(tc.qname)
got, err := r.client.SMembers(base.PausedQueues).Result() if r.client.Exists(key).Val() == 0 {
if err != nil { t.Errorf("key %q does not exist", key)
t.Fatal(err)
}
if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" {
t.Errorf("%q has members %v, want %v; (-want,+got)\n%s",
base.PausedQueues, got, tc.want, diff)
} }
} }
} }
@ -2234,20 +2219,17 @@ func TestPauseError(t *testing.T) {
tests := []struct { tests := []struct {
desc string // test case description desc string // test case description
initial []string // initial keys in the paused set paused []string // already paused queues
qname string // name of the queue to pause qname string // name of the queue to pause
want []string // expected keys in the paused set
}{ }{
{"queue already paused", []string{"asynq:queues:default"}, "default", []string{"asynq:queues:default"}}, {"queue already paused", []string{"default", "custom"}, "default"},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
for _, qname := range tc.paused {
// Set up initial state. if err := r.Pause(qname); err != nil {
for _, qkey := range tc.initial { t.Fatalf("could not pause %q: %v", qname, err)
if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil {
t.Fatal(err)
} }
} }
@ -2255,16 +2237,6 @@ func TestPauseError(t *testing.T) {
if err == nil { if err == nil {
t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname) t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname)
} }
got, err := r.client.SMembers(base.PausedQueues).Result()
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" {
t.Errorf("%s; %q has members %v, want %v; (-want,+got)\n%s",
tc.desc, base.PausedQueues, got, tc.want, diff)
}
} }
} }
@ -2272,21 +2244,17 @@ func TestUnpause(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
initial []string // initial keys in the paused set paused []string // already paused queues
qname string // name of the queue to unpause qname string // name of the queue to unpause
want []string // expected keys in the paused set
}{ }{
{[]string{"asynq:queues:default"}, "default", []string{}}, {[]string{"default", "custom"}, "default"},
{[]string{"asynq:queues:default", "asynq:queues:low"}, "low", []string{"asynq:queues:default"}},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
for _, qname := range tc.paused {
// Set up initial state. if err := r.Pause(qname); err != nil {
for _, qkey := range tc.initial { t.Fatalf("could not pause %q: %v", qname, err)
if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil {
t.Fatal(err)
} }
} }
@ -2294,15 +2262,9 @@ func TestUnpause(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Unpause(%q) returned error: %v", tc.qname, err) t.Errorf("Unpause(%q) returned error: %v", tc.qname, err)
} }
key := base.PauseKey(tc.qname)
got, err := r.client.SMembers(base.PausedQueues).Result() if r.client.Exists(key) == 1 {
if err != nil { t.Errorf("key %q exists", key)
t.Fatal(err)
}
if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" {
t.Errorf("%q has members %v, want %v; (-want,+got)\n%s",
base.PausedQueues, got, tc.want, diff)
} }
} }
} }
@ -2312,21 +2274,17 @@ func TestUnpauseError(t *testing.T) {
tests := []struct { tests := []struct {
desc string // test case description desc string // test case description
initial []string // initial keys in the paused set paused []string // already paused queues
qname string // name of the queue to unpause qname string // name of the queue to unpause
want []string // expected keys in the paused set
}{ }{
{"set is empty", []string{}, "default", []string{}}, {"queue is not paused", []string{"default"}, "custom"},
{"queue is not in the set", []string{"asynq:queues:default"}, "low", []string{"asynq:queues:default"}},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
for _, qname := range tc.paused {
// Set up initial state. if err := r.Pause(qname); err != nil {
for _, qkey := range tc.initial { t.Fatalf("could not pause %q: %v", qname, err)
if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil {
t.Fatal(err)
} }
} }
@ -2334,15 +2292,5 @@ func TestUnpauseError(t *testing.T) {
if err == nil { if err == nil {
t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname) t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname)
} }
got, err := r.client.SMembers(base.PausedQueues).Result()
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" {
t.Errorf("%s; %q has members %v, want %v; (-want,+got)\n%s",
tc.desc, base.PausedQueues, got, tc.want, diff)
}
} }
} }