mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Replace RunTaskByKey with RunTask in Inspector
This commit is contained in:
parent
c2998b7f57
commit
55a8ef036e
@ -140,8 +140,13 @@ var (
|
|||||||
ErrQueueNotFound = errors.New("queue not found")
|
ErrQueueNotFound = errors.New("queue not found")
|
||||||
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
||||||
ErrQueueNotEmpty = errors.New("queue is not empty")
|
ErrQueueNotEmpty = errors.New("queue is not empty")
|
||||||
|
// ErrTaskNotFound indicates that the specified task cannot be found in the queue.
|
||||||
|
ErrTaskNotFound = errors.New("task not found")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type taskNotFoundError struct {
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteQueue removes the specified queue.
|
// DeleteQueue removes the specified queue.
|
||||||
//
|
//
|
||||||
// If force is set to true, DeleteQueue will remove the queue regardless of
|
// If force is set to true, DeleteQueue will remove the queue regardless of
|
||||||
@ -591,28 +596,31 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
|
|||||||
return int(n), err
|
return int(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunTaskByKey transition a task to pending state given task key and queue name.
|
// RunTask updates the task to pending state given a queue name and task id.
|
||||||
// TODO: Update this to run task by ID.
|
// The task needs to be in scheduled, retry, or archived state, otherwise RunTask
|
||||||
func (i *Inspector) RunTaskByKey(qname, key string) error {
|
// will return an error.
|
||||||
|
//
|
||||||
|
// If a queue with the given name doesn't exist, it returns ErrQueueNotFound.
|
||||||
|
// If a task with the given id doesn't exist in the queue, it returns ErrTaskNotFound.
|
||||||
|
// If the task is in pending or active state, it returns a non-nil error.
|
||||||
|
func (i *Inspector) RunTask(qname, id string) error {
|
||||||
if err := base.ValidateQueueName(qname); err != nil {
|
if err := base.ValidateQueueName(qname); err != nil {
|
||||||
return err
|
return fmt.Errorf("asynq: %v", err)
|
||||||
}
|
}
|
||||||
prefix, id, _, err := parseTaskKey(key)
|
taskid, err := uuid.Parse(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("asynq: %s is not a valid task id", id)
|
||||||
}
|
}
|
||||||
switch prefix {
|
err = i.rdb.RunTask(qname, taskid)
|
||||||
case keyPrefixScheduled:
|
switch {
|
||||||
return i.rdb.RunTask(qname, id)
|
case errors.IsQueueNotFound(err):
|
||||||
case keyPrefixRetry:
|
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||||
return i.rdb.RunTask(qname, id)
|
case errors.IsTaskNotFound(err):
|
||||||
case keyPrefixArchived:
|
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||||
return i.rdb.RunTask(qname, id)
|
case err != nil:
|
||||||
case keyPrefixPending:
|
return fmt.Errorf("asynq: %v", err)
|
||||||
return fmt.Errorf("task is already pending for run")
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("invalid key")
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ArchiveAllPendingTasks archives all pending tasks from the given queue,
|
// ArchiveAllPendingTasks archives all pending tasks from the given queue,
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/hibiken/asynq"
|
"github.com/hibiken/asynq"
|
||||||
h "github.com/hibiken/asynq/internal/asynqtest"
|
h "github.com/hibiken/asynq/internal/asynqtest"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
@ -2113,7 +2114,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
|
func TestInspectorRunTaskRunsScheduledTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -2130,7 +2131,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
|
|||||||
scheduled map[string][]base.Z
|
scheduled map[string][]base.Z
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantScheduled map[string][]base.Z
|
wantScheduled map[string][]base.Z
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
}{
|
}{
|
||||||
@ -2144,7 +2145,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
|
|||||||
"custom": {},
|
"custom": {},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createScheduledTask(z2).Key(),
|
id: createScheduledTask(z2).ID,
|
||||||
wantScheduled: map[string][]base.Z{
|
wantScheduled: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2161,8 +2162,8 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
|
|||||||
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||||
|
|
||||||
if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.RunTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantScheduled {
|
for qname, want := range tc.wantScheduled {
|
||||||
@ -2183,7 +2184,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
|
func TestInspectorRunTaskRunsRetryTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -2200,7 +2201,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
|
|||||||
retry map[string][]base.Z
|
retry map[string][]base.Z
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantRetry map[string][]base.Z
|
wantRetry map[string][]base.Z
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
}{
|
}{
|
||||||
@ -2214,7 +2215,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
|
|||||||
"custom": {},
|
"custom": {},
|
||||||
},
|
},
|
||||||
qname: "custom",
|
qname: "custom",
|
||||||
key: createRetryTask(z2).Key(),
|
id: createRetryTask(z2).ID,
|
||||||
wantRetry: map[string][]base.Z{
|
wantRetry: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2231,8 +2232,8 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
|
|||||||
h.SeedAllRetryQueues(t, r, tc.retry)
|
h.SeedAllRetryQueues(t, r, tc.retry)
|
||||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||||
|
|
||||||
if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.RunTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("RunTaskBy(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantRetry {
|
for qname, want := range tc.wantRetry {
|
||||||
@ -2252,7 +2253,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
|
func TestInspectorRunTaskRunsArchivedTask(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@ -2269,7 +2270,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
|
|||||||
archived map[string][]base.Z
|
archived map[string][]base.Z
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantArchived map[string][]base.Z
|
wantArchived map[string][]base.Z
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
}{
|
}{
|
||||||
@ -2285,7 +2286,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
|
|||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
qname: "critical",
|
qname: "critical",
|
||||||
key: createArchivedTask(z2).Key(),
|
id: createArchivedTask(z2).ID,
|
||||||
wantArchived: map[string][]base.Z{
|
wantArchived: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
@ -2304,8 +2305,108 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
|
|||||||
h.SeedAllArchivedQueues(t, r, tc.archived)
|
h.SeedAllArchivedQueues(t, r, tc.archived)
|
||||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||||
|
|
||||||
if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.RunTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for qname, want := range tc.wantArchived {
|
||||||
|
wantArchived := h.GetArchivedEntries(t, r, qname)
|
||||||
|
if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s",
|
||||||
|
qname, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for qname, want := range tc.wantPending {
|
||||||
|
gotPending := h.GetPendingMessages(t, r, qname)
|
||||||
|
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s",
|
||||||
|
qname, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInspectorRunTaskError(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
|
m2 := h.NewTaskMessageWithQueue("task2", nil, "critical")
|
||||||
|
m3 := h.NewTaskMessageWithQueue("task3", nil, "low")
|
||||||
|
now := time.Now()
|
||||||
|
z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()}
|
||||||
|
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
|
||||||
|
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
|
||||||
|
|
||||||
|
inspector := New(getRedisConnOpt(t))
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
archived map[string][]base.Z
|
||||||
|
pending map[string][]*base.TaskMessage
|
||||||
|
qname string
|
||||||
|
id string
|
||||||
|
wantErr error
|
||||||
|
wantArchived map[string][]base.Z
|
||||||
|
wantPending map[string][]*base.TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
archived: map[string][]base.Z{
|
||||||
|
"default": {z1},
|
||||||
|
"critical": {z2},
|
||||||
|
"low": {z3},
|
||||||
|
},
|
||||||
|
pending: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
"critical": {},
|
||||||
|
"low": {},
|
||||||
|
},
|
||||||
|
qname: "nonexistent",
|
||||||
|
id: createArchivedTask(z2).ID,
|
||||||
|
wantErr: ErrQueueNotFound,
|
||||||
|
wantArchived: map[string][]base.Z{
|
||||||
|
"default": {z1},
|
||||||
|
"critical": {z2},
|
||||||
|
"low": {z3},
|
||||||
|
},
|
||||||
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
"critical": {},
|
||||||
|
"low": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
archived: map[string][]base.Z{
|
||||||
|
"default": {z1},
|
||||||
|
"critical": {z2},
|
||||||
|
"low": {z3},
|
||||||
|
},
|
||||||
|
pending: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
"critical": {},
|
||||||
|
"low": {},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
id: uuid.NewString(),
|
||||||
|
wantErr: ErrTaskNotFound,
|
||||||
|
wantArchived: map[string][]base.Z{
|
||||||
|
"default": {z1},
|
||||||
|
"critical": {z2},
|
||||||
|
"low": {z3},
|
||||||
|
},
|
||||||
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
"critical": {},
|
||||||
|
"low": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r)
|
||||||
|
h.SeedAllArchivedQueues(t, r, tc.archived)
|
||||||
|
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||||
|
|
||||||
|
if err := inspector.RunTask(tc.qname, tc.id); !errors.Is(err, tc.wantErr) {
|
||||||
|
t.Errorf("RunTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantArchived {
|
for qname, want := range tc.wantArchived {
|
||||||
|
@ -26,7 +26,7 @@ type Error struct {
|
|||||||
Err error
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Error) Error() string {
|
func (e *Error) DebugString() string {
|
||||||
var b strings.Builder
|
var b strings.Builder
|
||||||
if e.Op != "" {
|
if e.Op != "" {
|
||||||
b.WriteString(string(e.Op))
|
b.WriteString(string(e.Op))
|
||||||
@ -46,6 +46,20 @@ func (e *Error) Error() string {
|
|||||||
return b.String()
|
return b.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Error) Error() string {
|
||||||
|
var b strings.Builder
|
||||||
|
if e.Code != Unspecified {
|
||||||
|
b.WriteString(e.Code.String())
|
||||||
|
}
|
||||||
|
if e.Err != nil {
|
||||||
|
if b.Len() > 0 {
|
||||||
|
b.WriteString(": ")
|
||||||
|
}
|
||||||
|
b.WriteString(e.Err.Error())
|
||||||
|
}
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
func (e *Error) Unwrap() error {
|
func (e *Error) Unwrap() error {
|
||||||
return e.Err
|
return e.Err
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,9 @@ package errors
|
|||||||
|
|
||||||
import "testing"
|
import "testing"
|
||||||
|
|
||||||
func TestErrorString(t *testing.T) {
|
func TestErrorDebugString(t *testing.T) {
|
||||||
|
// DebugString should include Op since its meant to be used by
|
||||||
|
// maintainers/contributors of the asynq package.
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
desc string
|
desc string
|
||||||
err error
|
err error
|
||||||
@ -24,6 +26,33 @@ func TestErrorString(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
if got := tc.err.(*Error).DebugString(); got != tc.want {
|
||||||
|
t.Errorf("%s: got=%q, want=%q", tc.desc, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestErrorString(t *testing.T) {
|
||||||
|
// String method should omit Op since op is an internal detail
|
||||||
|
// and we don't want to provide it to users of the package.
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
err error
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "With Op, Code, and string",
|
||||||
|
err: E(Op("rdb.DeleteTask"), NotFound, "cannot find task with id=123"),
|
||||||
|
want: "NOT_FOUND: cannot find task with id=123",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "With Op, Code and error",
|
||||||
|
err: E(Op("rdb.DeleteTask"), NotFound, &TaskNotFoundError{Queue: "default", ID: "123"}),
|
||||||
|
want: `NOT_FOUND: cannot find task with id=123 in queue "default"`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
if got := tc.err.Error(); got != tc.want {
|
if got := tc.err.Error(); got != tc.want {
|
||||||
t.Errorf("%s: got=%q, want=%q", tc.desc, got, tc.want)
|
t.Errorf("%s: got=%q, want=%q", tc.desc, got, tc.want)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user