mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Replace DeleteTaskByKey with DeleteTask in Inspector
This commit is contained in:
parent
456edb6b71
commit
4bcc5ab6aa
@ -543,27 +543,25 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteTaskByKey deletes a task with the given key from the given queue.
|
// DeleteTaskByKey deletes a task with the given key from the given queue.
|
||||||
// TODO: We don't need score any more. Update this to delete task by ID
|
func (i *Inspector) DeleteTask(qname, id string) error {
|
||||||
func (i *Inspector) DeleteTaskByKey(qname, key string) error {
|
|
||||||
if err := base.ValidateQueueName(qname); err != nil {
|
if err := base.ValidateQueueName(qname); err != nil {
|
||||||
return err
|
return fmt.Errorf("asynq: %v", err)
|
||||||
}
|
}
|
||||||
prefix, id, _, err := parseTaskKey(key)
|
taskid, err := uuid.Parse(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("asynq: %s is not a valid task id", id)
|
||||||
}
|
}
|
||||||
switch prefix {
|
err = i.rdb.DeleteTask(qname, taskid)
|
||||||
case keyPrefixPending:
|
switch {
|
||||||
return i.rdb.DeleteTask(qname, id)
|
case errors.IsQueueNotFound(err):
|
||||||
case keyPrefixScheduled:
|
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||||
return i.rdb.DeleteTask(qname, id)
|
case errors.IsTaskNotFound(err):
|
||||||
case keyPrefixRetry:
|
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||||
return i.rdb.DeleteTask(qname, id)
|
case err != nil:
|
||||||
case keyPrefixArchived:
|
return fmt.Errorf("asynq: %v", err)
|
||||||
return i.rdb.DeleteTask(qname, id)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("invalid key")
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
|
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
|
||||||
|
@ -1903,7 +1903,7 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -1914,7 +1914,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -1923,7 +1923,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
"custom": {m3},
|
"custom": {m3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createPendingTask(m2).Key(),
|
id: createPendingTask(m2).ID,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {m1},
|
"default": {m1},
|
||||||
"custom": {m3},
|
"custom": {m3},
|
||||||
@ -1935,7 +1935,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
"custom": {m3},
|
"custom": {m3},
|
||||||
},
|
},
|
||||||
qname: "custom",
|
qname: "custom",
|
||||||
key: createPendingTask(m3).Key(),
|
id: createPendingTask(m3).ID,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {m1, m2},
|
"default": {m1, m2},
|
||||||
"custom": {},
|
"custom": {},
|
||||||
@ -1947,9 +1947,8 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v",
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
tc.qname, tc.key, err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1964,7 +1963,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -1980,7 +1979,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
scheduled map[string][]base.Z
|
scheduled map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantScheduled map[string][]base.Z
|
wantScheduled map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -1989,7 +1988,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createScheduledTask(z2).Key(),
|
id: createScheduledTask(z2).ID,
|
||||||
wantScheduled: map[string][]base.Z{
|
wantScheduled: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2001,8 +2000,8 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantScheduled {
|
for qname, want := range tc.wantScheduled {
|
||||||
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
||||||
@ -2014,7 +2013,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -2030,7 +2029,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
retry map[string][]base.Z
|
retry map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantRetry map[string][]base.Z
|
wantRetry map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -2039,7 +2038,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createRetryTask(z2).Key(),
|
id: createRetryTask(z2).ID,
|
||||||
wantRetry: map[string][]base.Z{
|
wantRetry: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2051,8 +2050,8 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllRetryQueues(t, r, tc.retry)
|
h.SeedAllRetryQueues(t, r, tc.retry)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantRetry {
|
for qname, want := range tc.wantRetry {
|
||||||
@ -2064,7 +2063,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
func TestInspectorDeleteTaskDeletesArchivedTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -2080,7 +2079,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
archived map[string][]base.Z
|
archived map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantArchived map[string][]base.Z
|
wantArchived map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -2089,7 +2088,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createArchivedTask(z2).Key(),
|
id: createArchivedTask(z2).ID,
|
||||||
wantArchived: map[string][]base.Z{
|
wantArchived: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2101,8 +2100,73 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllArchivedQueues(t, r, tc.archived)
|
h.SeedAllArchivedQueues(t, r, tc.archived)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for qname, want := range tc.wantArchived {
|
||||||
|
wantArchived := h.GetArchivedEntries(t, r, qname)
|
||||||
|
if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInspectorDeleteTaskError(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
|
m2 := h.NewTaskMessage("task2", nil)
|
||||||
|
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
|
||||||
|
now := time.Now()
|
||||||
|
z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()}
|
||||||
|
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
|
||||||
|
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
|
||||||
|
|
||||||
|
inspector := New(getRedisConnOpt(t))
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
archived map[string][]base.Z
|
||||||
|
qname string
|
||||||
|
id string
|
||||||
|
wantErr error
|
||||||
|
wantArchived map[string][]base.Z
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
archived: map[string][]base.Z{
|
||||||
|
"default": {z1, z2},
|
||||||
|
"custom": {z3},
|
||||||
|
},
|
||||||
|
qname: "nonexistent",
|
||||||
|
id: createArchivedTask(z2).ID,
|
||||||
|
wantErr: ErrQueueNotFound,
|
||||||
|
wantArchived: map[string][]base.Z{
|
||||||
|
"default": {z1, z2},
|
||||||
|
"custom": {z3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
archived: map[string][]base.Z{
|
||||||
|
"default": {z1, z2},
|
||||||
|
"custom": {z3},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
id: uuid.NewString(),
|
||||||
|
wantErr: ErrTaskNotFound,
|
||||||
|
wantArchived: map[string][]base.Z{
|
||||||
|
"default": {z1, z2},
|
||||||
|
"custom": {z3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r)
|
||||||
|
h.SeedAllArchivedQueues(t, r, tc.archived)
|
||||||
|
|
||||||
|
if err := inspector.DeleteTask(tc.qname, tc.id); !errors.Is(err, tc.wantErr) {
|
||||||
|
t.Errorf("DeleteTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantArchived {
|
for qname, want := range tc.wantArchived {
|
||||||
|
Loading…
Reference in New Issue
Block a user