diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 693c4f3..dec8ec6 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -753,32 +753,28 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { return workers, nil } -// KEYS[1] -> asynq:paused -// ARGV[1] -> asynq:queues: - queue to pause -var pauseCmd = redis.NewScript(` -local ismem = redis.call("SISMEMBER", KEYS[1], ARGV[1]) -if ismem == 1 then - return redis.error_reply("queue is already paused") -end -return redis.call("SADD", KEYS[1], ARGV[1])`) - // Pause pauses processing of tasks from the given queue. func (r *RDB) Pause(qname string) error { - qkey := base.QueueKey(qname) - return pauseCmd.Run(r.client, []string{base.PausedQueues}, qkey).Err() + key := base.PauseKey(qname) + exists, err := r.client.SetNX(key, time.Now().Unix(), 0).Result() + if err != nil { + return err + } + if exists { + return fmt.Errorf("queue %q is already paused", qname) + } + return nil } -// KEYS[1] -> asynq:paused -// ARGV[1] -> asynq:queues: - queue to unpause -var unpauseCmd = redis.NewScript(` -local ismem = redis.call("SISMEMBER", KEYS[1], ARGV[1]) -if ismem == 0 then - return redis.error_reply("queue is not paused") -end -return redis.call("SREM", KEYS[1], ARGV[1])`) - // Unpause resumes processing of tasks from the given queue. func (r *RDB) Unpause(qname string) error { - qkey := base.QueueKey(qname) - return unpauseCmd.Run(r.client, []string{base.PausedQueues}, qkey).Err() + key := base.PauseKey(qname) + deleted, err := r.client.Del(key).Result() + if err != nil { + return err + } + if deleted == 0 { + return fmt.Errorf("queue %q is not paused", qname) + } + return nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index b954758..2be16c9 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2194,37 +2194,22 @@ func TestPause(t *testing.T) { r := setup(t) tests := []struct { - initial []string // initial keys in the paused set - qname string // name of the queue to pause - want []string // expected keys in the paused set + qname string // name of the queue to pause }{ - {[]string{}, "default", []string{"asynq:queues:default"}}, - {[]string{"asynq:queues:default"}, "critical", []string{"asynq:queues:default", "asynq:queues:critical"}}, + {qname: "default"}, + {qname: "custom"}, } for _, tc := range tests { h.FlushDB(t, r.client) - // Set up initial state. - for _, qkey := range tc.initial { - if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil { - t.Fatal(err) - } - } - err := r.Pause(tc.qname) if err != nil { t.Errorf("Pause(%q) returned error: %v", tc.qname, err) } - - got, err := r.client.SMembers(base.PausedQueues).Result() - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" { - t.Errorf("%q has members %v, want %v; (-want,+got)\n%s", - base.PausedQueues, got, tc.want, diff) + key := base.PauseKey(tc.qname) + if r.client.Exists(key).Val() == 0 { + t.Errorf("key %q does not exist", key) } } } @@ -2233,21 +2218,18 @@ func TestPauseError(t *testing.T) { r := setup(t) tests := []struct { - desc string // test case description - initial []string // initial keys in the paused set - qname string // name of the queue to pause - want []string // expected keys in the paused set + desc string // test case description + paused []string // already paused queues + qname string // name of the queue to pause }{ - {"queue already paused", []string{"asynq:queues:default"}, "default", []string{"asynq:queues:default"}}, + {"queue already paused", []string{"default", "custom"}, "default"}, } for _, tc := range tests { h.FlushDB(t, r.client) - - // Set up initial state. - for _, qkey := range tc.initial { - if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil { - t.Fatal(err) + for _, qname := range tc.paused { + if err := r.Pause(qname); err != nil { + t.Fatalf("could not pause %q: %v", qname, err) } } @@ -2255,16 +2237,6 @@ func TestPauseError(t *testing.T) { if err == nil { t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname) } - - got, err := r.client.SMembers(base.PausedQueues).Result() - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" { - t.Errorf("%s; %q has members %v, want %v; (-want,+got)\n%s", - tc.desc, base.PausedQueues, got, tc.want, diff) - } } } @@ -2272,21 +2244,17 @@ func TestUnpause(t *testing.T) { r := setup(t) tests := []struct { - initial []string // initial keys in the paused set - qname string // name of the queue to unpause - want []string // expected keys in the paused set + paused []string // already paused queues + qname string // name of the queue to unpause }{ - {[]string{"asynq:queues:default"}, "default", []string{}}, - {[]string{"asynq:queues:default", "asynq:queues:low"}, "low", []string{"asynq:queues:default"}}, + {[]string{"default", "custom"}, "default"}, } for _, tc := range tests { h.FlushDB(t, r.client) - - // Set up initial state. - for _, qkey := range tc.initial { - if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil { - t.Fatal(err) + for _, qname := range tc.paused { + if err := r.Pause(qname); err != nil { + t.Fatalf("could not pause %q: %v", qname, err) } } @@ -2294,15 +2262,9 @@ func TestUnpause(t *testing.T) { if err != nil { t.Errorf("Unpause(%q) returned error: %v", tc.qname, err) } - - got, err := r.client.SMembers(base.PausedQueues).Result() - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" { - t.Errorf("%q has members %v, want %v; (-want,+got)\n%s", - base.PausedQueues, got, tc.want, diff) + key := base.PauseKey(tc.qname) + if r.client.Exists(key) == 1 { + t.Errorf("key %q exists", key) } } } @@ -2311,22 +2273,18 @@ func TestUnpauseError(t *testing.T) { r := setup(t) tests := []struct { - desc string // test case description - initial []string // initial keys in the paused set - qname string // name of the queue to unpause - want []string // expected keys in the paused set + desc string // test case description + paused []string // already paused queues + qname string // name of the queue to unpause }{ - {"set is empty", []string{}, "default", []string{}}, - {"queue is not in the set", []string{"asynq:queues:default"}, "low", []string{"asynq:queues:default"}}, + {"queue is not paused", []string{"default"}, "custom"}, } for _, tc := range tests { h.FlushDB(t, r.client) - - // Set up initial state. - for _, qkey := range tc.initial { - if err := r.client.SAdd(base.PausedQueues, qkey).Err(); err != nil { - t.Fatal(err) + for _, qname := range tc.paused { + if err := r.Pause(qname); err != nil { + t.Fatalf("could not pause %q: %v", qname, err) } } @@ -2334,15 +2292,5 @@ func TestUnpauseError(t *testing.T) { if err == nil { t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname) } - - got, err := r.client.SMembers(base.PausedQueues).Result() - if err != nil { - t.Fatal(err) - } - - if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" { - t.Errorf("%s; %q has members %v, want %v; (-want,+got)\n%s", - tc.desc, base.PausedQueues, got, tc.want, diff) - } } }