mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Allow task deletion by queue name and task ID
This commit is contained in:
parent
6d35d46461
commit
9bc80c6216
@ -135,6 +135,7 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ErrQueueNotFound indicates that the specified queue does not exist.
|
// ErrQueueNotFound indicates that the specified queue does not exist.
|
||||||
|
// TODO: Consider renaming
|
||||||
type ErrQueueNotFound struct {
|
type ErrQueueNotFound struct {
|
||||||
qname string
|
qname string
|
||||||
}
|
}
|
||||||
@ -144,6 +145,7 @@ func (e *ErrQueueNotFound) Error() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
||||||
|
// TODO: Consider renaming
|
||||||
type ErrQueueNotEmpty struct {
|
type ErrQueueNotEmpty struct {
|
||||||
qname string
|
qname string
|
||||||
}
|
}
|
||||||
@ -595,28 +597,13 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
|
|||||||
return int(n), err
|
return int(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteTaskByKey deletes a task with the given key from the given queue.
|
// DeleteTaskByKey deletes a task with the given id from the given queue.
|
||||||
// TODO: We don't need score any more. Update this to delete task by ID
|
func (i *Inspector) DeleteTask(qname, id string) error {
|
||||||
func (i *Inspector) DeleteTaskByKey(qname, key string) error {
|
|
||||||
if err := base.ValidateQueueName(qname); err != nil {
|
if err := base.ValidateQueueName(qname); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
prefix, id, _, err := parseTaskKey(key)
|
// TODO: Return ErrTaskNotFound or meaningful error
|
||||||
if err != nil {
|
return i.rdb.DeleteTask(qname, id)
|
||||||
return err
|
|
||||||
}
|
|
||||||
switch prefix {
|
|
||||||
case keyPrefixPending:
|
|
||||||
return i.rdb.DeletePendingTask(qname, id)
|
|
||||||
case keyPrefixScheduled:
|
|
||||||
return i.rdb.DeleteScheduledTask(qname, id)
|
|
||||||
case keyPrefixRetry:
|
|
||||||
return i.rdb.DeleteRetryTask(qname, id)
|
|
||||||
case keyPrefixArchived:
|
|
||||||
return i.rdb.DeleteArchivedTask(qname, id)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("invalid key")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
|
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
|
||||||
|
@ -1912,7 +1912,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -1921,7 +1921,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
"custom": {m3},
|
"custom": {m3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createPendingTask(m2).Key(),
|
id: createPendingTask(m2).ID,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {m1},
|
"default": {m1},
|
||||||
"custom": {m3},
|
"custom": {m3},
|
||||||
@ -1933,7 +1933,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
"custom": {m3},
|
"custom": {m3},
|
||||||
},
|
},
|
||||||
qname: "custom",
|
qname: "custom",
|
||||||
key: createPendingTask(m3).Key(),
|
id: createPendingTask(m3).ID,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {m1, m2},
|
"default": {m1, m2},
|
||||||
"custom": {},
|
"custom": {},
|
||||||
@ -1945,9 +1945,8 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v",
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
tc.qname, tc.key, err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1978,7 +1977,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
scheduled map[string][]base.Z
|
scheduled map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantScheduled map[string][]base.Z
|
wantScheduled map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -1987,7 +1986,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createScheduledTask(z2).Key(),
|
id: createScheduledTask(z2).ID,
|
||||||
wantScheduled: map[string][]base.Z{
|
wantScheduled: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -1999,8 +1998,8 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantScheduled {
|
for qname, want := range tc.wantScheduled {
|
||||||
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
||||||
@ -2028,7 +2027,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
retry map[string][]base.Z
|
retry map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantRetry map[string][]base.Z
|
wantRetry map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -2037,7 +2036,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createRetryTask(z2).Key(),
|
id: createRetryTask(z2).ID,
|
||||||
wantRetry: map[string][]base.Z{
|
wantRetry: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2049,8 +2048,8 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllRetryQueues(t, r, tc.retry)
|
h.SeedAllRetryQueues(t, r, tc.retry)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantRetry {
|
for qname, want := range tc.wantRetry {
|
||||||
@ -2078,7 +2077,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
archived map[string][]base.Z
|
archived map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
key string
|
id string
|
||||||
wantArchived map[string][]base.Z
|
wantArchived map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -2087,7 +2086,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
qname: "default",
|
qname: "default",
|
||||||
key: createArchivedTask(z2).Key(),
|
id: createArchivedTask(z2).ID,
|
||||||
wantArchived: map[string][]base.Z{
|
wantArchived: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
@ -2099,8 +2098,8 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
h.SeedAllArchivedQueues(t, r, tc.archived)
|
h.SeedAllArchivedQueues(t, r, tc.archived)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTask(tc.qname, tc.id); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for qname, want := range tc.wantArchived {
|
for qname, want := range tc.wantArchived {
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
@ -18,6 +19,28 @@ import (
|
|||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type taskState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
stateActive taskState = iota
|
||||||
|
statePending
|
||||||
|
stateScheduled
|
||||||
|
stateRetry
|
||||||
|
stateArchived
|
||||||
|
)
|
||||||
|
|
||||||
|
var taskStateNames = map[taskState]string{
|
||||||
|
stateActive: "active",
|
||||||
|
statePending: "pending",
|
||||||
|
stateScheduled: "scheduled",
|
||||||
|
stateRetry: "retry",
|
||||||
|
stateArchived: "archived",
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s taskState) String() string {
|
||||||
|
return taskStateNames[s]
|
||||||
|
}
|
||||||
|
|
||||||
// EquateInt64Approx returns a Comparer option that treats int64 values
|
// EquateInt64Approx returns a Comparer option that treats int64 values
|
||||||
// to be equal if they are within the given margin.
|
// to be equal if they are within the given margin.
|
||||||
func EquateInt64Approx(margin int64) cmp.Option {
|
func EquateInt64Approx(margin int64) cmp.Option {
|
||||||
@ -182,42 +205,42 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) {
|
|||||||
func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
|
func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(base.AllQueues, qname)
|
r.SAdd(base.AllQueues, qname)
|
||||||
seedRedisList(tb, r, base.PendingKey(qname), msgs)
|
seedRedisList(tb, r, qname, msgs, statePending)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedActiveQueue initializes the active queue with the given messages.
|
// SeedActiveQueue initializes the active queue with the given messages.
|
||||||
func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
|
func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(base.AllQueues, qname)
|
r.SAdd(base.AllQueues, qname)
|
||||||
seedRedisList(tb, r, base.ActiveKey(qname), msgs)
|
seedRedisList(tb, r, qname, msgs, stateActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedScheduledQueue initializes the scheduled queue with the given messages.
|
// SeedScheduledQueue initializes the scheduled queue with the given messages.
|
||||||
func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(base.AllQueues, qname)
|
r.SAdd(base.AllQueues, qname)
|
||||||
seedRedisZSet(tb, r, base.ScheduledKey(qname), entries)
|
seedRedisZSet(tb, r, qname, entries, stateScheduled)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedRetryQueue initializes the retry queue with the given messages.
|
// SeedRetryQueue initializes the retry queue with the given messages.
|
||||||
func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(base.AllQueues, qname)
|
r.SAdd(base.AllQueues, qname)
|
||||||
seedRedisZSet(tb, r, base.RetryKey(qname), entries)
|
seedRedisZSet(tb, r, qname, entries, stateRetry)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedArchivedQueue initializes the archived queue with the given messages.
|
// SeedArchivedQueue initializes the archived queue with the given messages.
|
||||||
func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(base.AllQueues, qname)
|
r.SAdd(base.AllQueues, qname)
|
||||||
seedRedisZSet(tb, r, base.ArchivedKey(qname), entries)
|
seedRedisZSet(tb, r, qname, entries, stateArchived)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedDeadlines initializes the deadlines set with the given entries.
|
// SeedDeadlines initializes the deadlines set with the given entries.
|
||||||
func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(base.AllQueues, qname)
|
r.SAdd(base.AllQueues, qname)
|
||||||
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries)
|
seedRedisZSet(tb, r, qname, entries, stateActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
||||||
@ -270,8 +293,17 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) {
|
func seedRedisList(tb testing.TB, c redis.UniversalClient, qname string, msgs []*base.TaskMessage, state taskState) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
|
var key string
|
||||||
|
switch state {
|
||||||
|
case statePending:
|
||||||
|
key = base.PendingKey(qname)
|
||||||
|
case stateActive:
|
||||||
|
key = base.ActiveKey(qname)
|
||||||
|
default:
|
||||||
|
tb.Fatalf("cannot seed redis LIST with task state %s", state)
|
||||||
|
}
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
encoded := MustMarshal(tb, msg)
|
encoded := MustMarshal(tb, msg)
|
||||||
if err := c.LPush(key, msg.ID.String()).Err(); err != nil {
|
if err := c.LPush(key, msg.ID.String()).Err(); err != nil {
|
||||||
@ -282,6 +314,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b
|
|||||||
"msg": encoded,
|
"msg": encoded,
|
||||||
"timeout": msg.Timeout,
|
"timeout": msg.Timeout,
|
||||||
"deadline": msg.Deadline,
|
"deadline": msg.Deadline,
|
||||||
|
"state": strings.ToUpper(state.String()),
|
||||||
}
|
}
|
||||||
if err := c.HSet(key, data).Err(); err != nil {
|
if err := c.HSet(key, data).Err(); err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
@ -289,8 +322,19 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) {
|
func seedRedisZSet(tb testing.TB, c redis.UniversalClient, qname string, items []base.Z, state taskState) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
|
var key string
|
||||||
|
switch state {
|
||||||
|
case stateScheduled:
|
||||||
|
key = base.ScheduledKey(qname)
|
||||||
|
case stateRetry:
|
||||||
|
key = base.RetryKey(qname)
|
||||||
|
case stateArchived:
|
||||||
|
key = base.ArchivedKey(qname)
|
||||||
|
default:
|
||||||
|
tb.Fatalf("cannot seed redis ZSET with task state %s", state)
|
||||||
|
}
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
msg := item.Message
|
msg := item.Message
|
||||||
encoded := MustMarshal(tb, msg)
|
encoded := MustMarshal(tb, msg)
|
||||||
@ -303,6 +347,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b
|
|||||||
"msg": encoded,
|
"msg": encoded,
|
||||||
"timeout": msg.Timeout,
|
"timeout": msg.Timeout,
|
||||||
"deadline": msg.Deadline,
|
"deadline": msg.Deadline,
|
||||||
|
"state": strings.ToUpper(state.String()),
|
||||||
}
|
}
|
||||||
if err := c.HSet(key, data).Err(); err != nil {
|
if err := c.HSet(key, data).Err(); err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
|
@ -46,9 +46,14 @@ func ValidateQueueName(qname string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueKeyPrefix returns a prefix for a redis-key namespaced by queue name.
|
||||||
|
func QueueKeyPrefix(qname string) string {
|
||||||
|
return fmt.Sprintf("asynq:{%s}:", qname)
|
||||||
|
}
|
||||||
|
|
||||||
// TaskKeyPrefix returns a prefix for task key.
|
// TaskKeyPrefix returns a prefix for task key.
|
||||||
func TaskKeyPrefix(qname string) string {
|
func TaskKeyPrefix(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:t:", qname)
|
return fmt.Sprintf("%st:", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskKey returns a redis key for the given task message.
|
// TaskKey returns a redis key for the given task message.
|
||||||
@ -58,47 +63,47 @@ func TaskKey(qname, id string) string {
|
|||||||
|
|
||||||
// PendingKey returns a redis key for the given queue name.
|
// PendingKey returns a redis key for the given queue name.
|
||||||
func PendingKey(qname string) string {
|
func PendingKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:pending", qname)
|
return fmt.Sprintf("%spending", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActiveKey returns a redis key for the active tasks.
|
// ActiveKey returns a redis key for the active tasks.
|
||||||
func ActiveKey(qname string) string {
|
func ActiveKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:active", qname)
|
return fmt.Sprintf("%sactive", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduledKey returns a redis key for the scheduled tasks.
|
// ScheduledKey returns a redis key for the scheduled tasks.
|
||||||
func ScheduledKey(qname string) string {
|
func ScheduledKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:scheduled", qname)
|
return fmt.Sprintf("%sscheduled", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetryKey returns a redis key for the retry tasks.
|
// RetryKey returns a redis key for the retry tasks.
|
||||||
func RetryKey(qname string) string {
|
func RetryKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:retry", qname)
|
return fmt.Sprintf("%sretry", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ArchivedKey returns a redis key for the archived tasks.
|
// ArchivedKey returns a redis key for the archived tasks.
|
||||||
func ArchivedKey(qname string) string {
|
func ArchivedKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:archived", qname)
|
return fmt.Sprintf("%sarchived", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeadlinesKey returns a redis key for the deadlines.
|
// DeadlinesKey returns a redis key for the deadlines.
|
||||||
func DeadlinesKey(qname string) string {
|
func DeadlinesKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:deadlines", qname)
|
return fmt.Sprintf("%sdeadlines", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PausedKey returns a redis key to indicate that the given queue is paused.
|
// PausedKey returns a redis key to indicate that the given queue is paused.
|
||||||
func PausedKey(qname string) string {
|
func PausedKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:paused", qname)
|
return fmt.Sprintf("%spaused", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
||||||
func ProcessedKey(qname string, t time.Time) string {
|
func ProcessedKey(qname string, t time.Time) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02"))
|
return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// FailedKey returns a redis key for failure count for the given day for the queue.
|
// FailedKey returns a redis key for failure count for the given day for the queue.
|
||||||
func FailedKey(qname string, t time.Time) string {
|
func FailedKey(qname string, t time.Time) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02"))
|
return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerInfoKey returns a redis key for process info.
|
// ServerInfoKey returns a redis key for process info.
|
||||||
@ -123,7 +128,7 @@ func SchedulerHistoryKey(entryID string) string {
|
|||||||
|
|
||||||
// UniqueKey returns a redis key with the given type, payload, and queue name.
|
// UniqueKey returns a redis key with the given type, payload, and queue name.
|
||||||
func UniqueKey(qname, tasktype string, payload []byte) string {
|
func UniqueKey(qname, tasktype string, payload []byte) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload))
|
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, string(payload))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskMessage is the internal representation of a task with additional metadata fields.
|
// TaskMessage is the internal representation of a task with additional metadata fields.
|
||||||
|
@ -738,65 +738,37 @@ func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue.
|
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
|
||||||
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID) error {
|
|
||||||
return r.deleteTask(base.ArchivedKey(qname), qname, id.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue.
|
|
||||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
|
||||||
func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID) error {
|
|
||||||
return r.deleteTask(base.RetryKey(qname), qname, id.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue.
|
|
||||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
|
||||||
func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID) error {
|
|
||||||
return r.deleteTask(base.ScheduledKey(qname), qname, id.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:pending
|
|
||||||
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
|
|
||||||
// ARGV[1] -> task ID
|
|
||||||
var deletePendingTaskCmd = redis.NewScript(`
|
|
||||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
|
||||||
return 0
|
|
||||||
end
|
|
||||||
return redis.call("DEL", KEYS[2])
|
|
||||||
`)
|
|
||||||
|
|
||||||
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
|
|
||||||
// If there's no match, it returns ErrTaskNotFound.
|
|
||||||
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
|
|
||||||
keys := []string{base.PendingKey(qname), base.TaskKey(qname, id.String())}
|
|
||||||
res, err := deletePendingTaskCmd.Run(r.client, keys, id.String()).Result()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
n, ok := res.(int64)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("command error: unexpected return value %v", res)
|
|
||||||
}
|
|
||||||
if n == 0 {
|
|
||||||
return ErrTaskNotFound
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// KEYS[1] -> ZSET key to remove the task from (e.g. asynq:{<qname>}:retry)
|
|
||||||
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
|
|
||||||
// ARGV[1] -> task ID
|
// ARGV[1] -> task ID
|
||||||
|
// ARGV[2] -> redis key prefix (asynq:{<qname>}:)
|
||||||
var deleteTaskCmd = redis.NewScript(`
|
var deleteTaskCmd = redis.NewScript(`
|
||||||
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
|
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
return redis.call("DEL", KEYS[2])
|
local state = redis.call("HGET", KEYS[1], "state")
|
||||||
|
local n
|
||||||
|
if state == "PENDING" then
|
||||||
|
n = redis.call("LREM", (ARGV[2] .. "pending"), 0, ARGV[1])
|
||||||
|
elseif state == "SCHEDULED" then
|
||||||
|
n = redis.call("ZREM", (ARGV[2] .. "scheduled"), ARGV[1])
|
||||||
|
elseif state == "RETRY" then
|
||||||
|
n = redis.call("ZREM", (ARGV[2] .. "retry"), ARGV[1])
|
||||||
|
elseif state == "ARCHIVED" then
|
||||||
|
n = redis.call("ZREM", (ARGV[2] .. "archived"), ARGV[1])
|
||||||
|
else
|
||||||
|
return redis.error_reply("unknown task state: " .. tostring(state))
|
||||||
|
end
|
||||||
|
if n == 0 then
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
return redis.call("DEL", KEYS[1])
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func (r *RDB) deleteTask(key, qname, id string) error {
|
// DeleteTask deletes a task that matches the given id from the given queue.
|
||||||
keys := []string{key, base.TaskKey(qname, id)}
|
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
|
||||||
argv := []interface{}{id}
|
func (r *RDB) DeleteTask(qname, id string) error {
|
||||||
|
keys := []string{base.TaskKey(qname, id)}
|
||||||
|
argv := []interface{}{id, base.QueueKeyPrefix(qname)}
|
||||||
res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result()
|
res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2472,9 +2472,9 @@ func TestDeleteArchivedTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||||
|
|
||||||
got := r.DeleteArchivedTask(tc.qname, tc.id)
|
got := r.DeleteTask(tc.qname, tc.id.String())
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.DeleteArchivedTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2553,9 +2553,9 @@ func TestDeleteRetryTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||||
|
|
||||||
got := r.DeleteRetryTask(tc.qname, tc.id)
|
got := r.DeleteTask(tc.qname, tc.id.String())
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.DeleteRetryTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2634,9 +2634,9 @@ func TestDeleteScheduledTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||||
|
|
||||||
got := r.DeleteScheduledTask(tc.qname, tc.id)
|
got := r.DeleteTask(tc.qname, tc.id.String())
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.DeleteScheduledTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2704,9 +2704,9 @@ func TestDeletePendingTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||||
|
|
||||||
got := r.DeletePendingTask(tc.qname, tc.id)
|
got := r.DeleteTask(tc.qname, tc.id.String())
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.DeletePendingTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user