mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-20 09:16:12 +08:00
Compare commits
23 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
42c7ac0746 | ||
|
d331ff055d | ||
|
ccb682853e | ||
|
7c3ad9e45c | ||
|
ea23db4f6b | ||
|
00a25ca570 | ||
|
7235041128 | ||
|
a150d18ed7 | ||
|
0712e90f23 | ||
|
c5100a9c23 | ||
|
196d66f221 | ||
|
38509e309f | ||
|
f4dd8fe962 | ||
|
c06e9de97d | ||
|
52d536a8f5 | ||
|
f9c0673116 | ||
|
b604d25937 | ||
|
dfdf530a24 | ||
|
e9239260ae | ||
|
8f9d5a3352 | ||
|
c4dc993241 | ||
|
37dfd746d4 | ||
|
8d6e4167ab |
54
CHANGELOG.md
54
CHANGELOG.md
@@ -7,13 +7,33 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.14.0] - 2021-01-14
|
||||
|
||||
**IMPORTATNT**: Please run `asynq migrate` command to migrate from the previous versions.
|
||||
|
||||
### Changed
|
||||
|
||||
- Renamed `DeadTask` to `ArchivedTask`.
|
||||
- Renamed the operation `Kill` to `Archive` in `Inpsector`.
|
||||
- Print stack trace when Handler panics.
|
||||
- Include a file name and a line number in the error message when recovering from a panic.
|
||||
|
||||
### Added
|
||||
|
||||
- `DefaultRetryDelayFunc` is now a public API, which can be used in the custom `RetryDelayFunc`.
|
||||
- `SkipRetry` error is added to be used as a return value from `Handler`.
|
||||
- `Servers` method is added to `Inspector`
|
||||
- `CancelActiveTask` method is added to `Inspector`.
|
||||
- `ListSchedulerEnqueueEvents` method is added to `Inspector`.
|
||||
- `SchedulerEntries` method is added to `Inspector`.
|
||||
- `DeleteQueue` method is added to `Inspector`.
|
||||
|
||||
## [0.13.1] - 2020-11-22
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed processor to wait for specified time duration before forcefully shutdown workers.
|
||||
|
||||
|
||||
## [0.13.0] - 2020-10-13
|
||||
|
||||
### Added
|
||||
@@ -28,17 +48,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- `Payload.String() string` method is added
|
||||
- `Payload.String() string` method is added
|
||||
- `Payload.MarshalJSON() ([]byte, error)` method is added
|
||||
|
||||
## [0.12.0] - 2020-09-12
|
||||
|
||||
**IMPORTANT**: If you are upgrading from a previous version, please install the latest version of the CLI `go get -u github.com/hibiken/asynq/tools/asynq` and run `asynq migrate` command. No process should be writing to Redis while you run the migration command.
|
||||
**IMPORTANT**: If you are upgrading from a previous version, please install the latest version of the CLI `go get -u github.com/hibiken/asynq/tools/asynq` and run `asynq migrate` command. No process should be writing to Redis while you run the migration command.
|
||||
|
||||
## The semantics of queue have changed
|
||||
Previously, we called tasks that are ready to be processed *"Enqueued tasks"*, and other tasks that are scheduled to be processed in the future *"Scheduled tasks"*, etc.
|
||||
We changed the semantics of *"Enqueue"* slightly; All tasks that client pushes to Redis are *Enqueued* to a queue. Within a queue, tasks will transition from one state to another.
|
||||
|
||||
Previously, we called tasks that are ready to be processed _"Enqueued tasks"_, and other tasks that are scheduled to be processed in the future _"Scheduled tasks"_, etc.
|
||||
We changed the semantics of _"Enqueue"_ slightly; All tasks that client pushes to Redis are _Enqueued_ to a queue. Within a queue, tasks will transition from one state to another.
|
||||
Possible task states are:
|
||||
|
||||
- `Pending`: task is ready to be processed (previously called "Enqueued")
|
||||
- `Active`: tasks is currently being processed (previously called "InProgress")
|
||||
- `Scheduled`: task is scheduled to be processed in the future
|
||||
@@ -50,23 +72,28 @@ Possible task states are:
|
||||
---
|
||||
|
||||
### Changed
|
||||
|
||||
#### `Client`
|
||||
|
||||
Use `ProcessIn` or `ProcessAt` option to schedule a task instead of `EnqueueIn` or `EnqueueAt`.
|
||||
|
||||
| Previously | v0.12.0 |
|
||||
|-----------------------------|--------------------------------------------|
|
||||
| `client.EnqueueAt(t, task)` | `client.Enqueue(task, asynq.ProcessAt(t))` |
|
||||
| `client.EnqueueIn(d, task)` | `client.Enqueue(task, asynq.ProcessIn(d))` |
|
||||
| --------------------------- | ------------------------------------------ |
|
||||
| `client.EnqueueAt(t, task)` | `client.Enqueue(task, asynq.ProcessAt(t))` |
|
||||
| `client.EnqueueIn(d, task)` | `client.Enqueue(task, asynq.ProcessIn(d))` |
|
||||
|
||||
#### `Inspector`
|
||||
|
||||
All Inspector methods are scoped to a queue, and the methods take `qname (string)` as the first argument.
|
||||
`EnqueuedTask` is renamed to `PendingTask` and its corresponding methods.
|
||||
`InProgressTask` is renamed to `ActiveTask` and its corresponding methods.
|
||||
Command "Enqueue" is replaced by the verb "Run" (e.g. `EnqueueAllScheduledTasks` --> `RunAllScheduledTasks`)
|
||||
|
||||
#### `CLI`
|
||||
|
||||
CLI commands are restructured to use subcommands. Commands are organized into a few management commands:
|
||||
To view details on any command, use `asynq help <command> <subcommand>`.
|
||||
|
||||
- `asynq stats`
|
||||
- `asynq queue [ls inspect history rm pause unpause]`
|
||||
- `asynq task [ls cancel delete kill run delete-all kill-all run-all]`
|
||||
@@ -75,19 +102,23 @@ To view details on any command, use `asynq help <command> <subcommand>`.
|
||||
### Added
|
||||
|
||||
#### `RedisConnOpt`
|
||||
|
||||
- `RedisClusterClientOpt` is added to connect to Redis Cluster.
|
||||
- `Username` field is added to all `RedisConnOpt` types in order to authenticate connection when Redis ACLs are used.
|
||||
|
||||
#### `Client`
|
||||
|
||||
- `ProcessIn(d time.Duration) Option` and `ProcessAt(t time.Time) Option` are added to replace `EnqueueIn` and `EnqueueAt` functionality.
|
||||
|
||||
#### `Inspector`
|
||||
|
||||
- `Queues() ([]string, error)` method is added to get all queue names.
|
||||
- `ClusterKeySlot(qname string) (int64, error)` method is added to get queue's hash slot in Redis cluster.
|
||||
- `ClusterNodes(qname string) ([]ClusterNode, error)` method is added to get a list of Redis cluster nodes for the given queue.
|
||||
- `ClusterNodes(qname string) ([]ClusterNode, error)` method is added to get a list of Redis cluster nodes for the given queue.
|
||||
- `Close() error` method is added to close connection with redis.
|
||||
|
||||
### `Handler`
|
||||
### `Handler`
|
||||
|
||||
- `GetQueueName(ctx context.Context) (string, bool)` helper is added to extract queue name from a context.
|
||||
|
||||
## [0.11.0] - 2020-07-28
|
||||
@@ -104,7 +135,7 @@ To view details on any command, use `asynq help <command> <subcommand>`.
|
||||
|
||||
- All tasks now requires timeout or deadline. By default, timeout is set to 30 mins.
|
||||
- Tasks that exceed its deadline are automatically retried.
|
||||
- Encoding schema for task message has changed. Please install the latest CLI and run `migrate` command if
|
||||
- Encoding schema for task message has changed. Please install the latest CLI and run `migrate` command if
|
||||
you have tasks enqueued with the previous version of asynq.
|
||||
- API of `(*Client).Enqueue`, `(*Client).EnqueueIn`, and `(*Client).EnqueueAt` has changed to return a `*Result`.
|
||||
- API of `ErrorHandler` has changed. It now takes context as the first argument and removed `retried`, `maxRetry` from the argument list.
|
||||
@@ -122,7 +153,6 @@ To view details on any command, use `asynq help <command> <subcommand>`.
|
||||
|
||||
- Fixes the JSON number overflow issue (https://github.com/hibiken/asynq/issues/166).
|
||||
|
||||
|
||||
## [0.9.2] - 2020-06-08
|
||||
|
||||
### Added
|
||||
|
11
README.md
11
README.md
@@ -1,11 +1,10 @@
|
||||
# Asynq
|
||||
|
||||
[](https://travis-ci.com/hibiken/asynq)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](https://goreportcard.com/report/github.com/hibiken/asynq)
|
||||
[](https://godoc.org/github.com/hibiken/asynq)
|
||||
[](https://goreportcard.com/report/github.com/hibiken/asynq)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](https://gitter.im/go-asynq/community)
|
||||
[](https://codecov.io/gh/hibiken/asynq)
|
||||
|
||||
## Overview
|
||||
|
||||
@@ -110,7 +109,7 @@ func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
|
||||
}
|
||||
|
||||
// ImageProcessor implements asynq.Handler interface.
|
||||
type ImageProcesser struct {
|
||||
type ImageProcessor struct {
|
||||
// ... fields for struct
|
||||
}
|
||||
|
||||
@@ -135,6 +134,8 @@ In your application code, import the above package and use [`Client`](https://pk
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
@@ -179,7 +180,7 @@ func main() {
|
||||
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
|
||||
c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
|
||||
|
||||
t = tasks.NewImageResizeTask("some/blobstore/path")
|
||||
res, err = c.Enqueue(t)
|
||||
|
91
client.go
91
client.go
@@ -7,6 +7,7 @@ package asynq
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -85,7 +86,7 @@ func MaxRetry(n int) Option {
|
||||
|
||||
func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
|
||||
func (n retryOption) Type() OptionType { return MaxRetryOpt }
|
||||
func (n retryOption) Value() interface{} { return n }
|
||||
func (n retryOption) Value() interface{} { return int(n) }
|
||||
|
||||
// Queue returns an option to specify the queue to enqueue the task into.
|
||||
//
|
||||
@@ -96,7 +97,7 @@ func Queue(qname string) Option {
|
||||
|
||||
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) }
|
||||
func (qname queueOption) Type() OptionType { return QueueOpt }
|
||||
func (qname queueOption) Value() interface{} { return qname }
|
||||
func (qname queueOption) Value() interface{} { return string(qname) }
|
||||
|
||||
// Timeout returns an option to specify how long a task may run.
|
||||
// If the timeout elapses before the Handler returns, then the task
|
||||
@@ -112,7 +113,7 @@ func Timeout(d time.Duration) Option {
|
||||
|
||||
func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) }
|
||||
func (d timeoutOption) Type() OptionType { return TimeoutOpt }
|
||||
func (d timeoutOption) Value() interface{} { return d }
|
||||
func (d timeoutOption) Value() interface{} { return time.Duration(d) }
|
||||
|
||||
// Deadline returns an option to specify the deadline for the given task.
|
||||
// If it reaches the deadline before the Handler returns, then the task
|
||||
@@ -124,9 +125,11 @@ func Deadline(t time.Time) Option {
|
||||
return deadlineOption(t)
|
||||
}
|
||||
|
||||
func (t deadlineOption) String() string { return fmt.Sprintf("Deadline(%v)", time.Time(t)) }
|
||||
func (t deadlineOption) String() string {
|
||||
return fmt.Sprintf("Deadline(%v)", time.Time(t).Format(time.UnixDate))
|
||||
}
|
||||
func (t deadlineOption) Type() OptionType { return DeadlineOpt }
|
||||
func (t deadlineOption) Value() interface{} { return t }
|
||||
func (t deadlineOption) Value() interface{} { return time.Time(t) }
|
||||
|
||||
// Unique returns an option to enqueue a task only if the given task is unique.
|
||||
// Task enqueued with this option is guaranteed to be unique within the given ttl.
|
||||
@@ -143,7 +146,7 @@ func Unique(ttl time.Duration) Option {
|
||||
|
||||
func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", time.Duration(ttl)) }
|
||||
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
|
||||
func (ttl uniqueOption) Value() interface{} { return ttl }
|
||||
func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }
|
||||
|
||||
// ProcessAt returns an option to specify when to process the given task.
|
||||
//
|
||||
@@ -152,9 +155,11 @@ func ProcessAt(t time.Time) Option {
|
||||
return processAtOption(t)
|
||||
}
|
||||
|
||||
func (t processAtOption) String() string { return fmt.Sprintf("ProcessAt(%v)", time.Time(t)) }
|
||||
func (t processAtOption) String() string {
|
||||
return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate))
|
||||
}
|
||||
func (t processAtOption) Type() OptionType { return ProcessAtOpt }
|
||||
func (t processAtOption) Value() interface{} { return t }
|
||||
func (t processAtOption) Value() interface{} { return time.Time(t) }
|
||||
|
||||
// ProcessIn returns an option to specify when to process the given task relative to the current time.
|
||||
//
|
||||
@@ -165,7 +170,75 @@ func ProcessIn(d time.Duration) Option {
|
||||
|
||||
func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) }
|
||||
func (d processInOption) Type() OptionType { return ProcessInOpt }
|
||||
func (d processInOption) Value() interface{} { return d }
|
||||
func (d processInOption) Value() interface{} { return time.Duration(d) }
|
||||
|
||||
// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
|
||||
// otherwise returns non-nil error.
|
||||
func parseOption(s string) (Option, error) {
|
||||
fn, arg := parseOptionFunc(s), parseOptionArg(s)
|
||||
switch fn {
|
||||
case "Queue":
|
||||
qname, err := strconv.Unquote(arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return Queue(qname), nil
|
||||
case "MaxRetry":
|
||||
n, err := strconv.Atoi(arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return MaxRetry(n), nil
|
||||
case "Timeout":
|
||||
d, err := time.ParseDuration(arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return Timeout(d), nil
|
||||
case "Deadline":
|
||||
t, err := time.Parse(time.UnixDate, arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return Deadline(t), nil
|
||||
case "Unique":
|
||||
d, err := time.ParseDuration(arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return Unique(d), nil
|
||||
case "ProcessAt":
|
||||
t, err := time.Parse(time.UnixDate, arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ProcessAt(t), nil
|
||||
case "ProcessIn":
|
||||
d, err := time.ParseDuration(arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ProcessIn(d), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("cannot not parse option string %q", s)
|
||||
}
|
||||
}
|
||||
|
||||
func parseOptionFunc(s string) string {
|
||||
i := strings.Index(s, "(")
|
||||
return s[:i]
|
||||
}
|
||||
|
||||
func parseOptionArg(s string) string {
|
||||
i := strings.Index(s, "(")
|
||||
if i >= 0 {
|
||||
j := strings.Index(s, ")")
|
||||
if j > i {
|
||||
return s[i+1 : j]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
|
||||
//
|
||||
|
@@ -774,3 +774,71 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseOption(t *testing.T) {
|
||||
oneHourFromNow := time.Now().Add(1 * time.Hour)
|
||||
tests := []struct {
|
||||
s string
|
||||
wantType OptionType
|
||||
wantVal interface{}
|
||||
}{
|
||||
{`MaxRetry(10)`, MaxRetryOpt, 10},
|
||||
{`Queue("email")`, QueueOpt, "email"},
|
||||
{`Timeout(3m)`, TimeoutOpt, 3 * time.Minute},
|
||||
{Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow},
|
||||
{`Unique(1h)`, UniqueOpt, 1 * time.Hour},
|
||||
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
|
||||
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.s, func(t *testing.T) {
|
||||
got, err := parseOption(tc.s)
|
||||
if err != nil {
|
||||
t.Fatalf("returned error: %v", err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatal("returned nil")
|
||||
}
|
||||
if got.Type() != tc.wantType {
|
||||
t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType)
|
||||
}
|
||||
switch tc.wantType {
|
||||
case QueueOpt:
|
||||
gotVal, ok := got.Value().(string)
|
||||
if !ok {
|
||||
t.Fatal("returned Option with non-string value")
|
||||
}
|
||||
if gotVal != tc.wantVal.(string) {
|
||||
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
||||
}
|
||||
case MaxRetryOpt:
|
||||
gotVal, ok := got.Value().(int)
|
||||
if !ok {
|
||||
t.Fatal("returned Option with non-int value")
|
||||
}
|
||||
if gotVal != tc.wantVal.(int) {
|
||||
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
||||
}
|
||||
case TimeoutOpt, UniqueOpt, ProcessInOpt:
|
||||
gotVal, ok := got.Value().(time.Duration)
|
||||
if !ok {
|
||||
t.Fatal("returned Option with non duration value")
|
||||
}
|
||||
if gotVal != tc.wantVal.(time.Duration) {
|
||||
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
||||
}
|
||||
case DeadlineOpt, ProcessAtOpt:
|
||||
gotVal, ok := got.Value().(time.Time)
|
||||
if !ok {
|
||||
t.Fatal("returned Option with non time value")
|
||||
}
|
||||
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
|
||||
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("returned Option with unexpected type: %v", got.Type())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
15
heartbeat.go
15
heartbeat.go
@@ -147,13 +147,14 @@ func (h *heartbeater) beat() {
|
||||
var ws []*base.WorkerInfo
|
||||
for id, stat := range h.workers {
|
||||
ws = append(ws, &base.WorkerInfo{
|
||||
Host: h.host,
|
||||
PID: h.pid,
|
||||
ID: id,
|
||||
Type: stat.msg.Type,
|
||||
Queue: stat.msg.Queue,
|
||||
Payload: stat.msg.Payload,
|
||||
Started: stat.started,
|
||||
Host: h.host,
|
||||
PID: h.pid,
|
||||
ServerID: h.serverID,
|
||||
ID: id,
|
||||
Type: stat.msg.Type,
|
||||
Queue: stat.msg.Queue,
|
||||
Payload: stat.msg.Payload,
|
||||
Started: stat.started,
|
||||
})
|
||||
}
|
||||
|
||||
|
293
inspector.go
293
inspector.go
@@ -20,7 +20,7 @@ type Inspector struct {
|
||||
rdb *rdb.RDB
|
||||
}
|
||||
|
||||
// New returns a new instance of Inspector.
|
||||
// NewInspector returns a new instance of Inspector.
|
||||
func NewInspector(r RedisConnOpt) *Inspector {
|
||||
return &Inspector{
|
||||
rdb: rdb.NewRDB(createRedisClient(r)),
|
||||
@@ -42,7 +42,7 @@ type QueueStats struct {
|
||||
// Name of the queue.
|
||||
Queue string
|
||||
// Size is the total number of tasks in the queue.
|
||||
// The value is the sum of Pending, Active, Scheduled, Retry, and Dead.
|
||||
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
|
||||
Size int
|
||||
// Number of pending tasks.
|
||||
Pending int
|
||||
@@ -52,8 +52,8 @@ type QueueStats struct {
|
||||
Scheduled int
|
||||
// Number of retry tasks.
|
||||
Retry int
|
||||
// Number of dead tasks.
|
||||
Dead int
|
||||
// Number of archived tasks.
|
||||
Archived int
|
||||
// Total number of tasks being processed during the given date.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
@@ -82,7 +82,7 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
|
||||
Active: stats.Active,
|
||||
Scheduled: stats.Scheduled,
|
||||
Retry: stats.Retry,
|
||||
Dead: stats.Dead,
|
||||
Archived: stats.Archived,
|
||||
Processed: stats.Processed,
|
||||
Failed: stats.Failed,
|
||||
Paused: stats.Paused,
|
||||
@@ -124,6 +124,45 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ErrQueueNotFound indicates that the specified queue does not exist.
|
||||
type ErrQueueNotFound struct {
|
||||
qname string
|
||||
}
|
||||
|
||||
func (e *ErrQueueNotFound) Error() string {
|
||||
return fmt.Sprintf("queue %q does not exist", e.qname)
|
||||
}
|
||||
|
||||
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
||||
type ErrQueueNotEmpty struct {
|
||||
qname string
|
||||
}
|
||||
|
||||
func (e *ErrQueueNotEmpty) Error() string {
|
||||
return fmt.Sprintf("queue %q is not empty", e.qname)
|
||||
}
|
||||
|
||||
// DeleteQueue removes the specified queue.
|
||||
//
|
||||
// If force is set to true, DeleteQueue will remove the queue regardless of
|
||||
// the queue size as long as no tasks are active in the queue.
|
||||
// If force is set to false, DeleteQueue will remove the queue only if
|
||||
// the queue is empty.
|
||||
//
|
||||
// If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
|
||||
// If force is set to false and the specified queue is not empty, DeleteQueue
|
||||
// returns ErrQueueNotEmpty.
|
||||
func (i *Inspector) DeleteQueue(qname string, force bool) error {
|
||||
err := i.rdb.RemoveQueue(qname, force)
|
||||
if _, ok := err.(*rdb.ErrQueueNotFound); ok {
|
||||
return &ErrQueueNotFound{qname}
|
||||
}
|
||||
if _, ok := err.(*rdb.ErrQueueNotEmpty); ok {
|
||||
return &ErrQueueNotEmpty{qname}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// PendingTask is a task in a queue and is ready to be processed.
|
||||
type PendingTask struct {
|
||||
*Task
|
||||
@@ -162,9 +201,11 @@ type RetryTask struct {
|
||||
score int64
|
||||
}
|
||||
|
||||
// DeadTask is a task exhausted its retries.
|
||||
// DeadTask won't be retried automatically.
|
||||
type DeadTask struct {
|
||||
// ArchivedTask is a task archived for debugging and inspection purposes, and
|
||||
// it won't be retried automatically.
|
||||
// A task can be archived when the task exhausts its retry counts or manually
|
||||
// archived by a user via the CLI or Inspector.
|
||||
type ArchivedTask struct {
|
||||
*Task
|
||||
ID string
|
||||
Queue string
|
||||
@@ -176,19 +217,19 @@ type DeadTask struct {
|
||||
score int64
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and kill the task.
|
||||
// Key returns a key used to delete, run, and archive the task.
|
||||
func (t *ScheduledTask) Key() string {
|
||||
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and kill the task.
|
||||
// Key returns a key used to delete, run, and archive the task.
|
||||
func (t *RetryTask) Key() string {
|
||||
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and kill the task.
|
||||
func (t *DeadTask) Key() string {
|
||||
return fmt.Sprintf("d:%v:%v", t.ID, t.score)
|
||||
// Key returns a key used to delete, run, and archive the task.
|
||||
func (t *ArchivedTask) Key() string {
|
||||
return fmt.Sprintf("a:%v:%v", t.ID, t.score)
|
||||
}
|
||||
|
||||
// parseTaskKey parses a key string and returns each part of key with proper
|
||||
@@ -207,7 +248,7 @@ func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err erro
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
state = parts[0]
|
||||
if len(state) != 1 || !strings.Contains("srd", state) {
|
||||
if len(state) != 1 || !strings.Contains("sra", state) {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
return id, score, state, nil
|
||||
@@ -384,25 +425,25 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListDeadTasks retrieves dead tasks from the specified queue.
|
||||
// ListArchivedTasks retrieves archived tasks from the specified queue.
|
||||
// Tasks are sorted by LastFailedAt field in descending order.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) {
|
||||
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
zs, err := i.rdb.ListDead(qname, pgn)
|
||||
zs, err := i.rdb.ListArchived(qname, pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*DeadTask
|
||||
var tasks []*ArchivedTask
|
||||
for _, z := range zs {
|
||||
failedAt := time.Unix(z.Score, 0)
|
||||
t := NewTask(z.Message.Type, z.Message.Payload)
|
||||
tasks = append(tasks, &DeadTask{
|
||||
tasks = append(tasks, &ArchivedTask{
|
||||
Task: t,
|
||||
ID: z.Message.ID.String(),
|
||||
Queue: z.Message.Queue,
|
||||
@@ -414,7 +455,6 @@ func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
|
||||
@@ -437,13 +477,13 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// DeleteAllDeadTasks deletes all dead tasks from the specified queue,
|
||||
// DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
|
||||
// and reports the number tasks deleted.
|
||||
func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) {
|
||||
func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := i.rdb.DeleteAllDeadTasks(qname)
|
||||
n, err := i.rdb.DeleteAllArchivedTasks(qname)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
@@ -461,8 +501,8 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error {
|
||||
return i.rdb.DeleteScheduledTask(qname, id, score)
|
||||
case "r":
|
||||
return i.rdb.DeleteRetryTask(qname, id, score)
|
||||
case "d":
|
||||
return i.rdb.DeleteDeadTask(qname, id, score)
|
||||
case "a":
|
||||
return i.rdb.DeleteArchivedTask(qname, id, score)
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
@@ -488,13 +528,13 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// RunAllDeadTasks transition all dead tasks to pending state within the given queue,
|
||||
// RunAllArchivedTasks transition all archived tasks to pending state within the given queue,
|
||||
// and reports the number of tasks transitioned.
|
||||
func (i *Inspector) RunAllDeadTasks(qname string) (int, error) {
|
||||
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := i.rdb.RunAllDeadTasks(qname)
|
||||
n, err := i.rdb.RunAllArchivedTasks(qname)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
@@ -512,35 +552,35 @@ func (i *Inspector) RunTaskByKey(qname, key string) error {
|
||||
return i.rdb.RunScheduledTask(qname, id, score)
|
||||
case "r":
|
||||
return i.rdb.RunRetryTask(qname, id, score)
|
||||
case "d":
|
||||
return i.rdb.RunDeadTask(qname, id, score)
|
||||
case "a":
|
||||
return i.rdb.RunArchivedTask(qname, id, score)
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// KillAllScheduledTasks kills all scheduled tasks within the given queue,
|
||||
// and reports the number of tasks killed.
|
||||
func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) {
|
||||
// ArchiveAllScheduledTasks archives all scheduled tasks within the given queue,
|
||||
// and reports the number of tasks archiveed.
|
||||
func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := i.rdb.KillAllScheduledTasks(qname)
|
||||
n, err := i.rdb.ArchiveAllScheduledTasks(qname)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// KillAllRetryTasks kills all retry tasks within the given queue,
|
||||
// and reports the number of tasks killed.
|
||||
func (i *Inspector) KillAllRetryTasks(qname string) (int, error) {
|
||||
// ArchiveAllRetryTasks archives all retry tasks within the given queue,
|
||||
// and reports the number of tasks archiveed.
|
||||
func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := i.rdb.KillAllRetryTasks(qname)
|
||||
n, err := i.rdb.ArchiveAllRetryTasks(qname)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// KillTaskByKey kills a task with the given key in the given queue.
|
||||
func (i *Inspector) KillTaskByKey(qname, key string) error {
|
||||
// ArchiveTaskByKey archives a task with the given key in the given queue.
|
||||
func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -550,16 +590,24 @@ func (i *Inspector) KillTaskByKey(qname, key string) error {
|
||||
}
|
||||
switch state {
|
||||
case "s":
|
||||
return i.rdb.KillScheduledTask(qname, id, score)
|
||||
return i.rdb.ArchiveScheduledTask(qname, id, score)
|
||||
case "r":
|
||||
return i.rdb.KillRetryTask(qname, id, score)
|
||||
case "d":
|
||||
return fmt.Errorf("task already dead")
|
||||
return i.rdb.ArchiveRetryTask(qname, id, score)
|
||||
case "a":
|
||||
return fmt.Errorf("task already archived")
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// CancelActiveTask sends a signal to cancel processing of the task with
|
||||
// the given id. CancelActiveTask is best-effort, which means that it does not
|
||||
// guarantee that the task with the given id will be canceled. The return
|
||||
// value only indicates whether the cancelation signal has been sent.
|
||||
func (i *Inspector) CancelActiveTask(id string) error {
|
||||
return i.rdb.PublishCancelation(id)
|
||||
}
|
||||
|
||||
// PauseQueue pauses task processing on the specified queue.
|
||||
// If the queue is already paused, it will return a non-nil error.
|
||||
func (i *Inspector) PauseQueue(qname string) error {
|
||||
@@ -578,6 +626,84 @@ func (i *Inspector) UnpauseQueue(qname string) error {
|
||||
return i.rdb.Unpause(qname)
|
||||
}
|
||||
|
||||
// Servers return a list of running servers' information.
|
||||
func (i *Inspector) Servers() ([]*ServerInfo, error) {
|
||||
servers, err := i.rdb.ListServers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
workers, err := i.rdb.ListWorkers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := make(map[string]*ServerInfo) // ServerInfo keyed by serverID
|
||||
for _, s := range servers {
|
||||
m[s.ServerID] = &ServerInfo{
|
||||
ID: s.ServerID,
|
||||
Host: s.Host,
|
||||
PID: s.PID,
|
||||
Concurrency: s.Concurrency,
|
||||
Queues: s.Queues,
|
||||
StrictPriority: s.StrictPriority,
|
||||
Started: s.Started,
|
||||
Status: s.Status,
|
||||
ActiveWorkers: make([]*WorkerInfo, 0),
|
||||
}
|
||||
}
|
||||
for _, w := range workers {
|
||||
srvInfo, ok := m[w.ServerID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
wrkInfo := &WorkerInfo{
|
||||
Started: w.Started,
|
||||
Task: &ActiveTask{
|
||||
Task: NewTask(w.Type, w.Payload),
|
||||
ID: w.ID,
|
||||
Queue: w.Queue,
|
||||
},
|
||||
}
|
||||
srvInfo.ActiveWorkers = append(srvInfo.ActiveWorkers, wrkInfo)
|
||||
}
|
||||
var out []*ServerInfo
|
||||
for _, srvInfo := range m {
|
||||
out = append(out, srvInfo)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ServerInfo describes a running Server instance.
|
||||
type ServerInfo struct {
|
||||
// Unique Identifier for the server.
|
||||
ID string
|
||||
// Host machine on which the server is running.
|
||||
Host string
|
||||
// PID of the process in which the server is running.
|
||||
PID int
|
||||
|
||||
// Server configuration details.
|
||||
// See Config doc for field descriptions.
|
||||
Concurrency int
|
||||
Queues map[string]int
|
||||
StrictPriority bool
|
||||
|
||||
// Time the server started.
|
||||
Started time.Time
|
||||
// Status indicates the status of the server.
|
||||
// TODO: Update comment with more details.
|
||||
Status string
|
||||
// A List of active workers currently processing tasks.
|
||||
ActiveWorkers []*WorkerInfo
|
||||
}
|
||||
|
||||
// WorkerInfo describes a running worker processing a task.
|
||||
type WorkerInfo struct {
|
||||
// The task the worker is processing.
|
||||
Task *ActiveTask
|
||||
// Time the worker started processing the task.
|
||||
Started time.Time
|
||||
}
|
||||
|
||||
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
|
||||
func (i *Inspector) ClusterKeySlot(qname string) (int64, error) {
|
||||
return i.rdb.ClusterKeySlot(qname)
|
||||
@@ -592,7 +718,7 @@ type ClusterNode struct {
|
||||
Addr string
|
||||
}
|
||||
|
||||
// ClusterNode returns a list of nodes the given queue belongs to.
|
||||
// ClusterNodes returns a list of nodes the given queue belongs to.
|
||||
func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error) {
|
||||
nodes, err := i.rdb.ClusterNodes(qname)
|
||||
if err != nil {
|
||||
@@ -604,3 +730,80 @@ func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error) {
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// SchedulerEntry holds information about a periodic task registered with a scheduler.
|
||||
type SchedulerEntry struct {
|
||||
// Identifier of this entry.
|
||||
ID string
|
||||
|
||||
// Spec describes the schedule of this entry.
|
||||
Spec string
|
||||
|
||||
// Periodic Task registered for this entry.
|
||||
Task *Task
|
||||
|
||||
// Opts is the options for the periodic task.
|
||||
Opts []Option
|
||||
|
||||
// Next shows the next time the task will be enqueued.
|
||||
Next time.Time
|
||||
|
||||
// Prev shows the last time the task was enqueued.
|
||||
// Zero time if task was never enqueued.
|
||||
Prev time.Time
|
||||
}
|
||||
|
||||
// SchedulerEntries returns a list of all entries registered with
|
||||
// currently running schedulers.
|
||||
func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
|
||||
var entries []*SchedulerEntry
|
||||
res, err := i.rdb.ListSchedulerEntries()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, e := range res {
|
||||
task := NewTask(e.Type, e.Payload)
|
||||
var opts []Option
|
||||
for _, s := range e.Opts {
|
||||
if o, err := parseOption(s); err == nil {
|
||||
// ignore bad data
|
||||
opts = append(opts, o)
|
||||
}
|
||||
}
|
||||
entries = append(entries, &SchedulerEntry{
|
||||
ID: e.ID,
|
||||
Spec: e.Spec,
|
||||
Task: task,
|
||||
Opts: opts,
|
||||
Next: e.Next,
|
||||
Prev: e.Prev,
|
||||
})
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
|
||||
type SchedulerEnqueueEvent struct {
|
||||
// ID of the task that was enqueued.
|
||||
TaskID string
|
||||
|
||||
// Time the task was enqueued.
|
||||
EnqueuedAt time.Time
|
||||
}
|
||||
|
||||
// ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
data, err := i.rdb.ListSchedulerEnqueueEvents(entryID, pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var events []*SchedulerEnqueueEvent
|
||||
for _, e := range data {
|
||||
events = append(events, &SchedulerEnqueueEvent{TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt})
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -220,11 +220,11 @@ func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qn
|
||||
seedRedisZSet(tb, r, base.RetryKey(qname), entries)
|
||||
}
|
||||
|
||||
// SeedDeadQueue initializes the dead queue with the given messages.
|
||||
func SeedDeadQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||
// SeedArchivedQueue initializes the archived queue with the given messages.
|
||||
func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||
tb.Helper()
|
||||
r.SAdd(base.AllQueues, qname)
|
||||
seedRedisZSet(tb, r, base.DeadKey(qname), entries)
|
||||
seedRedisZSet(tb, r, base.ArchivedKey(qname), entries)
|
||||
}
|
||||
|
||||
// SeedDeadlines initializes the deadlines set with the given entries.
|
||||
@@ -264,10 +264,10 @@ func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string
|
||||
}
|
||||
}
|
||||
|
||||
// SeedAllDeadQueues initializes all of the specified dead queues with the given entries.
|
||||
func SeedAllDeadQueues(tb testing.TB, r redis.UniversalClient, dead map[string][]base.Z) {
|
||||
for q, entries := range dead {
|
||||
SeedDeadQueue(tb, r, entries, q)
|
||||
// SeedAllArchivedQueues initializes all of the specified archived queues with the given entries.
|
||||
func SeedAllArchivedQueues(tb testing.TB, r redis.UniversalClient, archived map[string][]base.Z) {
|
||||
for q, entries := range archived {
|
||||
SeedArchivedQueue(tb, r, entries, q)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,10 +320,10 @@ func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*b
|
||||
return getZSetMessages(tb, r, base.RetryKey(qname))
|
||||
}
|
||||
|
||||
// GetDeadMessages returns all dead messages in the given queue.
|
||||
func GetDeadMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||
// GetArchivedMessages returns all archived messages in the given queue.
|
||||
func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||
tb.Helper()
|
||||
return getZSetMessages(tb, r, base.DeadKey(qname))
|
||||
return getZSetMessages(tb, r, base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// GetScheduledEntries returns all scheduled messages and its score in the given queue.
|
||||
@@ -338,10 +338,10 @@ func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []bas
|
||||
return getZSetEntries(tb, r, base.RetryKey(qname))
|
||||
}
|
||||
|
||||
// GetDeadEntries returns all dead messages and its score in the given queue.
|
||||
func GetDeadEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||
// GetArchivedEntries returns all archived messages and its score in the given queue.
|
||||
func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||
tb.Helper()
|
||||
return getZSetEntries(tb, r, base.DeadKey(qname))
|
||||
return getZSetEntries(tb, r, base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
|
||||
|
@@ -19,7 +19,7 @@ import (
|
||||
)
|
||||
|
||||
// Version of asynq library and CLI.
|
||||
const Version = "0.13.0"
|
||||
const Version = "0.14.0"
|
||||
|
||||
// DefaultQueueName is the queue name used if none are specified by user.
|
||||
const DefaultQueueName = "default"
|
||||
@@ -56,9 +56,9 @@ func RetryKey(qname string) string {
|
||||
return fmt.Sprintf("asynq:{%s}:retry", qname)
|
||||
}
|
||||
|
||||
// DeadKey returns a redis key for the dead tasks.
|
||||
func DeadKey(qname string) string {
|
||||
return fmt.Sprintf("asynq:{%s}:dead", qname)
|
||||
// ArchivedKey returns a redis key for the archived tasks.
|
||||
func ArchivedKey(qname string) string {
|
||||
return fmt.Sprintf("asynq:{%s}:archived", qname)
|
||||
}
|
||||
|
||||
// DeadlinesKey returns a redis key for the deadlines.
|
||||
@@ -156,7 +156,7 @@ type TaskMessage struct {
|
||||
|
||||
// Timeout specifies timeout in seconds.
|
||||
// If task processing doesn't complete within the timeout, the task will be retried
|
||||
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
||||
// if retry count is remaining. Otherwise it will be moved to the archive.
|
||||
//
|
||||
// Use zero to indicate no timeout.
|
||||
Timeout int64
|
||||
@@ -164,7 +164,7 @@ type TaskMessage struct {
|
||||
// Deadline specifies the deadline for the task in Unix time,
|
||||
// the number of seconds elapsed since January 1, 1970 UTC.
|
||||
// If task processing doesn't complete before the deadline, the task will be retried
|
||||
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
||||
// if retry count is remaining. Otherwise it will be moved to the archive.
|
||||
//
|
||||
// Use zero to indicate no deadline.
|
||||
Deadline int64
|
||||
@@ -275,13 +275,14 @@ type ServerInfo struct {
|
||||
|
||||
// WorkerInfo holds information about a running worker.
|
||||
type WorkerInfo struct {
|
||||
Host string
|
||||
PID int
|
||||
ID string
|
||||
Type string
|
||||
Queue string
|
||||
Payload map[string]interface{}
|
||||
Started time.Time
|
||||
Host string
|
||||
PID int
|
||||
ServerID string
|
||||
ID string
|
||||
Type string
|
||||
Queue string
|
||||
Payload map[string]interface{}
|
||||
Started time.Time
|
||||
}
|
||||
|
||||
// SchedulerEntry holds information about a periodic task registered with a scheduler.
|
||||
@@ -304,7 +305,7 @@ type SchedulerEntry struct {
|
||||
// Next shows the next time the task will be enqueued.
|
||||
Next time.Time
|
||||
|
||||
// Prev shows the last time the task was enqueued.
|
||||
// Prev shows the last time the task was enqueued.
|
||||
// Zero time if task was never enqueued.
|
||||
Prev time.Time
|
||||
}
|
||||
@@ -368,7 +369,7 @@ type Broker interface {
|
||||
Schedule(msg *TaskMessage, processAt time.Time) error
|
||||
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
|
||||
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
|
||||
Kill(msg *TaskMessage, errMsg string) error
|
||||
Archive(msg *TaskMessage, errMsg string) error
|
||||
CheckAndEnqueue(qnames ...string) error
|
||||
ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
|
||||
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
||||
|
@@ -100,19 +100,19 @@ func TestRetryKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeadKey(t *testing.T) {
|
||||
func TestArchivedKey(t *testing.T) {
|
||||
tests := []struct {
|
||||
qname string
|
||||
want string
|
||||
}{
|
||||
{"default", "asynq:{default}:dead"},
|
||||
{"custom", "asynq:{custom}:dead"},
|
||||
{"default", "asynq:{default}:archived"},
|
||||
{"custom", "asynq:{custom}:archived"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got := DeadKey(tc.qname)
|
||||
got := ArchivedKey(tc.qname)
|
||||
if got != tc.want {
|
||||
t.Errorf("DeadKey(%q) = %q, want %q", tc.qname, got, tc.want)
|
||||
t.Errorf("ArchivedKey(%q) = %q, want %q", tc.qname, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -363,14 +363,14 @@ func TestStatusConcurrentAccess(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
status.Get()
|
||||
status.String()
|
||||
_ = status.String()
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
status.Set(StatusStopped)
|
||||
status.String()
|
||||
_ = status.String()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
@@ -35,7 +35,7 @@ type Stats struct {
|
||||
Active int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
Archived int
|
||||
// Total number of tasks processed during the current date.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
@@ -62,7 +62,7 @@ type DailyStats struct {
|
||||
// KEYS[2] -> asynq:<qname>:active
|
||||
// KEYS[3] -> asynq:<qname>:scheduled
|
||||
// KEYS[4] -> asynq:<qname>:retry
|
||||
// KEYS[5] -> asynq:<qname>:dead
|
||||
// KEYS[5] -> asynq:<qname>:archived
|
||||
// KEYS[6] -> asynq:<qname>:processed:<yyyy-mm-dd>
|
||||
// KEYS[7] -> asynq:<qname>:failed:<yyyy-mm-dd>
|
||||
// KEYS[8] -> asynq:<qname>:paused
|
||||
@@ -111,7 +111,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
base.ActiveKey(qname),
|
||||
base.ScheduledKey(qname),
|
||||
base.RetryKey(qname),
|
||||
base.DeadKey(qname),
|
||||
base.ArchivedKey(qname),
|
||||
base.ProcessedKey(qname, now),
|
||||
base.FailedKey(qname, now),
|
||||
base.PausedKey(qname),
|
||||
@@ -144,8 +144,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
case base.RetryKey(qname):
|
||||
stats.Retry = val
|
||||
size += val
|
||||
case base.DeadKey(qname):
|
||||
stats.Dead = val
|
||||
case base.ArchivedKey(qname):
|
||||
stats.Archived = val
|
||||
size += val
|
||||
case base.ProcessedKey(qname, now):
|
||||
stats.Processed = val
|
||||
@@ -328,12 +328,12 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
|
||||
return r.listZSetEntries(base.RetryKey(qname), pgn)
|
||||
}
|
||||
|
||||
// ListDead returns all tasks from the given queue that have exhausted its retry limit.
|
||||
func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) {
|
||||
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
|
||||
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
|
||||
if !r.client.SIsMember(base.AllQueues, qname).Val() {
|
||||
return nil, fmt.Errorf("queue %q does not exist", qname)
|
||||
}
|
||||
return r.listZSetEntries(base.DeadKey(qname), pgn)
|
||||
return r.listZSetEntries(base.ArchivedKey(qname), pgn)
|
||||
}
|
||||
|
||||
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
|
||||
@@ -353,16 +353,16 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
res = append(res, base.Z{msg, int64(z.Score)})
|
||||
res = append(res, base.Z{Message: msg, Score: int64(z.Score)})
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// RunDeadTask finds a dead task that matches the given id and score from
|
||||
// RunArchivedTask finds an archived task that matches the given id and score from
|
||||
// the given queue and enqueues it for processing.
|
||||
//If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) RunDeadTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndRun(base.DeadKey(qname), base.QueueKey(qname), id.String(), float64(score))
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -412,10 +412,10 @@ func (r *RDB) RunAllRetryTasks(qname string) (int64, error) {
|
||||
return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname))
|
||||
}
|
||||
|
||||
// RunAllDeadTasks enqueues all tasks from dead queue
|
||||
// RunAllArchivedTasks enqueues all archived tasks from the given queue
|
||||
// and returns the number of tasks enqueued.
|
||||
func (r *RDB) RunAllDeadTasks(qname string) (int64, error) {
|
||||
return r.removeAndRunAll(base.DeadKey(qname), base.QueueKey(qname))
|
||||
func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
|
||||
return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname))
|
||||
}
|
||||
|
||||
var removeAndRunCmd = redis.NewScript(`
|
||||
@@ -462,10 +462,10 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// KillRetryTask finds a retry task that matches the given id and score from the given queue
|
||||
// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndKill(base.RetryKey(qname), base.DeadKey(qname), id.String(), float64(score))
|
||||
// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue
|
||||
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -475,10 +475,10 @@ func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillScheduledTask finds a scheduled task that matches the given id and score from the given queue
|
||||
// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndKill(base.ScheduledKey(qname), base.DeadKey(qname), id.String(), float64(score))
|
||||
// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue
|
||||
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -488,26 +488,26 @@ func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillAllRetryTasks kills all retry tasks from the given queue and
|
||||
// ArchiveAllRetryTasks archives all retry tasks from the given queue and
|
||||
// returns the number of tasks that were moved.
|
||||
func (r *RDB) KillAllRetryTasks(qname string) (int64, error) {
|
||||
return r.removeAndKillAll(base.RetryKey(qname), base.DeadKey(qname))
|
||||
func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) {
|
||||
return r.removeAndArchiveAll(base.RetryKey(qname), base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// KillAllScheduledTasks kills all scheduled tasks from the given queue and
|
||||
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and
|
||||
// returns the number of tasks that were moved.
|
||||
func (r *RDB) KillAllScheduledTasks(qname string) (int64, error) {
|
||||
return r.removeAndKillAll(base.ScheduledKey(qname), base.DeadKey(qname))
|
||||
func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) {
|
||||
return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
|
||||
// KEYS[2] -> asynq:{<qname>}:dead
|
||||
// ARGV[1] -> score of the task to kill
|
||||
// ARGV[2] -> id of the task to kill
|
||||
// KEYS[2] -> asynq:{<qname>}:archived
|
||||
// ARGV[1] -> score of the task to archive
|
||||
// ARGV[2] -> id of the task to archive
|
||||
// ARGV[3] -> current timestamp
|
||||
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||
var removeAndKillCmd = redis.NewScript(`
|
||||
// ARGV[5] -> max number of tasks in archived state (e.g., 100)
|
||||
var removeAndArchiveCmd = redis.NewScript(`
|
||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
|
||||
for _, msg in ipairs(msgs) do
|
||||
local decoded = cjson.decode(msg)
|
||||
@@ -521,12 +521,12 @@ for _, msg in ipairs(msgs) do
|
||||
end
|
||||
return 0`)
|
||||
|
||||
func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) {
|
||||
func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) {
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndKillCmd.Run(r.client,
|
||||
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndArchiveCmd.Run(r.client,
|
||||
[]string{src, dst},
|
||||
score, id, now.Unix(), limit, maxDeadTasks).Result()
|
||||
score, id, now.Unix(), limit, maxArchiveSize).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -538,11 +538,11 @@ func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) {
|
||||
}
|
||||
|
||||
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
|
||||
// KEYS[2] -> asynq:{<qname>}:dead
|
||||
// KEYS[2] -> asynq:{<qname>}:archived
|
||||
// ARGV[1] -> current timestamp
|
||||
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[3] -> max number of tasks in dead queue (e.g., 100)
|
||||
var removeAndKillAllCmd = redis.NewScript(`
|
||||
// ARGV[3] -> max number of tasks in archive (e.g., 100)
|
||||
var removeAndArchiveAllCmd = redis.NewScript(`
|
||||
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
|
||||
for _, msg in ipairs(msgs) do
|
||||
redis.call("ZADD", KEYS[2], ARGV[1], msg)
|
||||
@@ -552,11 +552,11 @@ for _, msg in ipairs(msgs) do
|
||||
end
|
||||
return table.getn(msgs)`)
|
||||
|
||||
func (r *RDB) removeAndKillAll(src, dst string) (int64, error) {
|
||||
func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) {
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndKillAllCmd.Run(r.client, []string{src, dst},
|
||||
now.Unix(), limit, maxDeadTasks).Result()
|
||||
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst},
|
||||
now.Unix(), limit, maxArchiveSize).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -567,10 +567,10 @@ func (r *RDB) removeAndKillAll(src, dst string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// DeleteDeadTask deletes a dead task that matches the given id and score from the given queue.
|
||||
// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue.
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) DeleteDeadTask(qname string, id uuid.UUID, score int64) error {
|
||||
return r.deleteTask(base.DeadKey(qname), id.String(), float64(score))
|
||||
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error {
|
||||
return r.deleteTask(base.ArchivedKey(qname), id.String(), float64(score))
|
||||
}
|
||||
|
||||
// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue.
|
||||
@@ -617,10 +617,10 @@ local n = redis.call("ZCARD", KEYS[1])
|
||||
redis.call("DEL", KEYS[1])
|
||||
return n`)
|
||||
|
||||
// DeleteAllDeadTasks deletes all dead tasks from the given queue
|
||||
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
|
||||
// and returns the number of tasks deleted.
|
||||
func (r *RDB) DeleteAllDeadTasks(qname string) (int64, error) {
|
||||
return r.deleteAll(base.DeadKey(qname))
|
||||
func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) {
|
||||
return r.deleteAll(base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// DeleteAllRetryTasks deletes all retry tasks from the given queue
|
||||
@@ -670,7 +670,7 @@ func (e *ErrQueueNotEmpty) Error() string {
|
||||
// KEYS[2] -> asynq:{<qname>}:active
|
||||
// KEYS[3] -> asynq:{<qname>}:scheduled
|
||||
// KEYS[4] -> asynq:{<qname>}:retry
|
||||
// KEYS[5] -> asynq:{<qname>}:dead
|
||||
// KEYS[5] -> asynq:{<qname>}:archived
|
||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
||||
var removeQueueForceCmd = redis.NewScript(`
|
||||
local active = redis.call("LLEN", KEYS[2])
|
||||
@@ -690,15 +690,15 @@ return redis.status_reply("OK")`)
|
||||
// KEYS[2] -> asynq:{<qname>}:active
|
||||
// KEYS[3] -> asynq:{<qname>}:scheduled
|
||||
// KEYS[4] -> asynq:{<qname>}:retry
|
||||
// KEYS[5] -> asynq:{<qname>}:dead
|
||||
// KEYS[5] -> asynq:{<qname>}:archived
|
||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
||||
var removeQueueCmd = redis.NewScript(`
|
||||
local pending = redis.call("LLEN", KEYS[1])
|
||||
local active = redis.call("LLEN", KEYS[2])
|
||||
local scheduled = redis.call("SCARD", KEYS[3])
|
||||
local retry = redis.call("SCARD", KEYS[4])
|
||||
local dead = redis.call("SCARD", KEYS[5])
|
||||
local total = pending + active + scheduled + retry + dead
|
||||
local archived = redis.call("SCARD", KEYS[5])
|
||||
local total = pending + active + scheduled + retry + archived
|
||||
if total > 0 then
|
||||
return redis.error_reply("QUEUE NOT EMPTY")
|
||||
end
|
||||
@@ -735,7 +735,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
||||
base.ActiveKey(qname),
|
||||
base.ScheduledKey(qname),
|
||||
base.RetryKey(qname),
|
||||
base.DeadKey(qname),
|
||||
base.ArchivedKey(qname),
|
||||
base.DeadlinesKey(qname),
|
||||
}
|
||||
if err := script.Run(r.client, keys).Err(); err != nil {
|
||||
@@ -745,7 +745,6 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return r.client.SRem(base.AllQueues, qname).Err()
|
||||
}
|
||||
|
||||
@@ -854,9 +853,9 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
|
||||
}
|
||||
|
||||
// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
|
||||
func (r *RDB) ListSchedulerEnqueueEvents(entryID string) ([]*base.SchedulerEnqueueEvent, error) {
|
||||
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) {
|
||||
key := base.SchedulerHistoryKey(entryID)
|
||||
zs, err := r.client.ZRangeWithScores(key, 0, -1).Result()
|
||||
zs, err := r.client.ZRevRangeWithScores(key, pgn.start(), pgn.stop()).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -381,22 +381,22 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
||||
}
|
||||
|
||||
const (
|
||||
maxDeadTasks = 10000
|
||||
deadExpirationInDays = 90
|
||||
maxArchiveSize = 10000 // maximum number of tasks in archive
|
||||
archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently
|
||||
)
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:active
|
||||
// KEYS[2] -> asynq:{<qname>}:deadlines
|
||||
// KEYS[3] -> asynq:{<qname>}:dead
|
||||
// KEYS[3] -> asynq:{<qname>}:archived
|
||||
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
||||
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
|
||||
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
|
||||
// ARGV[2] -> base.TaskMessage value to add to Dead queue
|
||||
// ARGV[1] -> base.TaskMessage value to remove
|
||||
// ARGV[2] -> base.TaskMessage value to add
|
||||
// ARGV[3] -> died_at UNIX timestamp
|
||||
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||
// ARGV[5] -> max number of tasks in archive (e.g., 100)
|
||||
// ARGV[6] -> stats expiration timestamp
|
||||
var killCmd = redis.NewScript(`
|
||||
var archiveCmd = redis.NewScript(`
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
@@ -416,10 +416,9 @@ if tonumber(m) == 1 then
|
||||
end
|
||||
return redis.status_reply("OK")`)
|
||||
|
||||
// Kill sends the task to "dead" queue from active queue, assigning
|
||||
// the error message to the task.
|
||||
// It also trims the set by timestamp and set size.
|
||||
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
// Archive sends the given task to archive, attaching the error message to the task.
|
||||
// It also trims the archive by timestamp and set size.
|
||||
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
||||
msgToRemove, err := base.EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -431,13 +430,13 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
||||
processedKey := base.ProcessedKey(msg.Queue, now)
|
||||
failedKey := base.FailedKey(msg.Queue, now)
|
||||
expireAt := now.Add(statsTTL)
|
||||
return killCmd.Run(r.client,
|
||||
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey},
|
||||
msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
|
||||
return archiveCmd.Run(r.client,
|
||||
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey},
|
||||
msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err()
|
||||
}
|
||||
|
||||
// CheckAndEnqueue checks for scheduled/retry tasks for the given queues
|
||||
|
@@ -1008,7 +1008,7 @@ func TestRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestKill(t *testing.T) {
|
||||
func TestArchive(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
now := time.Now()
|
||||
@@ -1058,11 +1058,11 @@ func TestKill(t *testing.T) {
|
||||
tests := []struct {
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
deadlines map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
target *base.TaskMessage // task to kill
|
||||
archived map[string][]base.Z
|
||||
target *base.TaskMessage // task to archive
|
||||
wantActive map[string][]*base.TaskMessage
|
||||
wantDeadlines map[string][]base.Z
|
||||
wantDead map[string][]base.Z
|
||||
wantArchived map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
inProgress: map[string][]*base.TaskMessage{
|
||||
@@ -1074,7 +1074,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t2, Score: t2Deadline},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t3, Score: now.Add(-time.Hour).Unix()},
|
||||
},
|
||||
@@ -1086,7 +1086,7 @@ func TestKill(t *testing.T) {
|
||||
wantDeadlines: map[string][]base.Z{
|
||||
"default": {{Message: t2, Score: t2Deadline}},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()},
|
||||
{Message: t3, Score: now.Add(-time.Hour).Unix()},
|
||||
@@ -1104,7 +1104,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t3, Score: t3Deadline},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
target: t1,
|
||||
@@ -1117,7 +1117,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t3, Score: t3Deadline},
|
||||
},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()},
|
||||
},
|
||||
@@ -1136,7 +1136,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t4, Score: t4Deadline},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -1149,7 +1149,7 @@ func TestKill(t *testing.T) {
|
||||
"default": {{Message: t1, Score: t1Deadline}},
|
||||
"custom": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {
|
||||
{Message: h.TaskMessageWithError(*t4, errMsg), Score: now.Unix()},
|
||||
@@ -1162,11 +1162,11 @@ func TestKill(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllDeadlines(t, r.client, tc.deadlines)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
err := r.Kill(tc.target, errMsg)
|
||||
err := r.Archive(tc.target, errMsg)
|
||||
if err != nil {
|
||||
t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err)
|
||||
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1179,13 +1179,13 @@ func TestKill(t *testing.T) {
|
||||
for queue, want := range tc.wantDeadlines {
|
||||
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadlinesKey(queue), diff)
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff)
|
||||
}
|
||||
}
|
||||
for queue, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff)
|
||||
for queue, want := range tc.wantArchived {
|
||||
gotArchived := h.GetArchivedEntries(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -117,13 +117,13 @@ func (tb *TestBroker) Retry(msg *base.TaskMessage, processAt time.Time, errMsg s
|
||||
return tb.real.Retry(msg, processAt, errMsg)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
func (tb *TestBroker) Archive(msg *base.TaskMessage, errMsg string) error {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
if tb.sleeping {
|
||||
return errRedisDown
|
||||
}
|
||||
return tb.real.Kill(msg, errMsg)
|
||||
return tb.real.Archive(msg, errMsg)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error {
|
||||
|
53
processor.go
53
processor.go
@@ -6,9 +6,13 @@ package asynq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -29,7 +33,7 @@ type processor struct {
|
||||
// orderedQueues is set only in strict-priority mode.
|
||||
orderedQueues []string
|
||||
|
||||
retryDelayFunc retryDelayFunc
|
||||
retryDelayFunc RetryDelayFunc
|
||||
|
||||
errHandler ErrorHandler
|
||||
|
||||
@@ -63,12 +67,10 @@ type processor struct {
|
||||
finished chan<- *base.TaskMessage
|
||||
}
|
||||
|
||||
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
||||
|
||||
type processorParams struct {
|
||||
logger *log.Logger
|
||||
broker base.Broker
|
||||
retryDelayFunc retryDelayFunc
|
||||
retryDelayFunc RetryDelayFunc
|
||||
syncCh chan<- *syncRequest
|
||||
cancelations *base.Cancelations
|
||||
concurrency int
|
||||
@@ -203,7 +205,7 @@ func (p *processor) exec() {
|
||||
|
||||
resCh := make(chan error, 1)
|
||||
go func() {
|
||||
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
|
||||
resCh <- p.perform(ctx, NewTask(msg.Type, msg.Payload))
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -217,9 +219,9 @@ func (p *processor) exec() {
|
||||
return
|
||||
case resErr := <-resCh:
|
||||
// Note: One of three things should happen.
|
||||
// 1) Done -> Removes the message from Active
|
||||
// 2) Retry -> Removes the message from Active & Adds the message to Retry
|
||||
// 3) Kill -> Removes the message from Active & Adds the message to Dead
|
||||
// 1) Done -> Removes the message from Active
|
||||
// 2) Retry -> Removes the message from Active & Adds the message to Retry
|
||||
// 3) Archive -> Removes the message from Active & Adds the message to archive
|
||||
if resErr != nil {
|
||||
p.retryOrKill(ctx, msg, resErr)
|
||||
return
|
||||
@@ -258,13 +260,17 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
|
||||
// the task should not be retried and should be archived instead.
|
||||
var SkipRetry = errors.New("skip retry for the task")
|
||||
|
||||
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
||||
}
|
||||
if msg.Retried >= msg.Retry {
|
||||
if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
|
||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||
p.kill(ctx, msg, err)
|
||||
p.archive(ctx, msg, err)
|
||||
} else {
|
||||
p.retry(ctx, msg, err)
|
||||
}
|
||||
@@ -291,10 +297,10 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
err := p.broker.Kill(msg, e.Error())
|
||||
func (p *processor) archive(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
err := p.broker.Archive(msg, e.Error())
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.DeadKey(msg.Queue))
|
||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue))
|
||||
deadline, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
panic("asynq: internal error: missing deadline in context")
|
||||
@@ -302,7 +308,7 @@ func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||
p.syncRequestCh <- &syncRequest{
|
||||
fn: func() error {
|
||||
return p.broker.Kill(msg, e.Error())
|
||||
return p.broker.Archive(msg, e.Error())
|
||||
},
|
||||
errMsg: errMsg,
|
||||
deadline: deadline,
|
||||
@@ -340,13 +346,26 @@ func (p *processor) queues() []string {
|
||||
// perform calls the handler with the given task.
|
||||
// If the call returns without panic, it simply returns the value,
|
||||
// otherwise, it recovers from panic and returns an error.
|
||||
func perform(ctx context.Context, task *Task, h Handler) (err error) {
|
||||
func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
||||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
err = fmt.Errorf("panic: %v", x)
|
||||
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack()))
|
||||
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
|
||||
if ok && strings.Contains(file, "runtime/") {
|
||||
// The panic came from the runtime, most likely due to incorrect
|
||||
// map/slice usage. The parent frame should have the real trigger.
|
||||
_, file, line, ok = runtime.Caller(2)
|
||||
}
|
||||
|
||||
// Include the file and line number info in the error, if runtime.Caller returned ok.
|
||||
if ok {
|
||||
err = fmt.Errorf("panic [%s:%d]: %v", file, line, x)
|
||||
} else {
|
||||
err = fmt.Errorf("panic: %v", x)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return h.ProcessTask(ctx, task)
|
||||
return p.handler.ProcessTask(ctx, task)
|
||||
}
|
||||
|
||||
// uniq dedupes elements and returns a slice of unique names of length l.
|
||||
|
@@ -96,7 +96,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: defaultDelayFunc,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
@@ -187,7 +187,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: defaultDelayFunc,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
@@ -268,7 +268,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: defaultDelayFunc,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
@@ -307,19 +307,22 @@ func TestProcessorRetry(t *testing.T) {
|
||||
m4 := h.NewTaskMessage("sync", nil)
|
||||
|
||||
errMsg := "something went wrong"
|
||||
wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry)
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
desc string // test description
|
||||
pending []*base.TaskMessage // initial default queue state
|
||||
incoming []*base.TaskMessage // tasks to be enqueued during run
|
||||
delay time.Duration // retry delay duration
|
||||
handler Handler // task handler
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantRetry []base.Z // tasks in retry queue at the end
|
||||
wantDead []*base.TaskMessage // tasks in dead queue at the end
|
||||
wantArchived []*base.TaskMessage // tasks in archived queue at the end
|
||||
wantErrCount int // number of times error handler should be called
|
||||
}{
|
||||
{
|
||||
desc: "Should automatically retry errored tasks",
|
||||
pending: []*base.TaskMessage{m1, m2},
|
||||
incoming: []*base.TaskMessage{m3, m4},
|
||||
delay: time.Minute,
|
||||
@@ -332,9 +335,41 @@ func TestProcessorRetry(t *testing.T) {
|
||||
{Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()},
|
||||
{Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()},
|
||||
},
|
||||
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
|
||||
wantArchived: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
|
||||
wantErrCount: 4,
|
||||
},
|
||||
{
|
||||
desc: "Should skip retry errored tasks",
|
||||
pending: []*base.TaskMessage{m1, m2},
|
||||
incoming: []*base.TaskMessage{},
|
||||
delay: time.Minute,
|
||||
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||
return SkipRetry // return SkipRetry without wrapping
|
||||
}),
|
||||
wait: 2 * time.Second,
|
||||
wantRetry: []base.Z{},
|
||||
wantArchived: []*base.TaskMessage{
|
||||
h.TaskMessageWithError(*m1, SkipRetry.Error()),
|
||||
h.TaskMessageWithError(*m2, SkipRetry.Error()),
|
||||
},
|
||||
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
|
||||
},
|
||||
{
|
||||
desc: "Should skip retry errored tasks (with error wrapping)",
|
||||
pending: []*base.TaskMessage{m1, m2},
|
||||
incoming: []*base.TaskMessage{},
|
||||
delay: time.Minute,
|
||||
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||
return wrappedSkipRetry
|
||||
}),
|
||||
wait: 2 * time.Second,
|
||||
wantRetry: []base.Z{},
|
||||
wantArchived: []*base.TaskMessage{
|
||||
h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()),
|
||||
h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()),
|
||||
},
|
||||
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
@@ -389,16 +424,16 @@ func TestProcessorRetry(t *testing.T) {
|
||||
cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score
|
||||
gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName)
|
||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryKey(base.DefaultQueueName), diff)
|
||||
t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff)
|
||||
}
|
||||
|
||||
gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName)
|
||||
if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff)
|
||||
gotDead := h.GetArchivedMessages(t, r, base.DefaultQueueName)
|
||||
if diff := cmp.Diff(tc.wantArchived, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.ArchivedKey(base.DefaultQueueName), diff)
|
||||
}
|
||||
|
||||
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
|
||||
t.Errorf("%s: %q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), tc.desc, l)
|
||||
}
|
||||
|
||||
if n != tc.wantErrCount {
|
||||
@@ -443,7 +478,7 @@ func TestProcessorQueues(t *testing.T) {
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: nil,
|
||||
retryDelayFunc: defaultDelayFunc,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
syncCh: nil,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
@@ -534,7 +569,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: defaultDelayFunc,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time.
|
||||
@@ -564,7 +599,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerform(t *testing.T) {
|
||||
func TestProcessorPerform(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
handler HandlerFunc
|
||||
@@ -596,9 +631,16 @@ func TestPerform(t *testing.T) {
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
// Note: We don't need to fully initialize the processor since we are only testing
|
||||
// perform method.
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
queues: defaultQueueConfig,
|
||||
})
|
||||
|
||||
for _, tc := range tests {
|
||||
got := perform(context.Background(), tc.task, tc.handler)
|
||||
p.handler = tc.handler
|
||||
got := p.perform(context.Background(), tc.task)
|
||||
if !tc.wantErr && got != nil {
|
||||
t.Errorf("%s: perform() = %v, want nil", tc.desc, got)
|
||||
continue
|
||||
|
12
recoverer.go
12
recoverer.go
@@ -16,7 +16,7 @@ import (
|
||||
type recoverer struct {
|
||||
logger *log.Logger
|
||||
broker base.Broker
|
||||
retryDelayFunc retryDelayFunc
|
||||
retryDelayFunc RetryDelayFunc
|
||||
|
||||
// channel to communicate back to the long running "recoverer" goroutine.
|
||||
done chan struct{}
|
||||
@@ -33,7 +33,7 @@ type recovererParams struct {
|
||||
broker base.Broker
|
||||
queues []string
|
||||
interval time.Duration
|
||||
retryDelayFunc retryDelayFunc
|
||||
retryDelayFunc RetryDelayFunc
|
||||
}
|
||||
|
||||
func newRecoverer(params recovererParams) *recoverer {
|
||||
@@ -75,7 +75,7 @@ func (r *recoverer) start(wg *sync.WaitGroup) {
|
||||
const errMsg = "deadline exceeded" // TODO: better error message
|
||||
for _, msg := range msgs {
|
||||
if msg.Retried >= msg.Retry {
|
||||
r.kill(msg, errMsg)
|
||||
r.archive(msg, errMsg)
|
||||
} else {
|
||||
r.retry(msg, errMsg)
|
||||
}
|
||||
@@ -94,8 +94,8 @@ func (r *recoverer) retry(msg *base.TaskMessage, errMsg string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recoverer) kill(msg *base.TaskMessage, errMsg string) {
|
||||
if err := r.broker.Kill(msg, errMsg); err != nil {
|
||||
r.logger.Warnf("recoverer: could not move task to dead queue: %v", err)
|
||||
func (r *recoverer) archive(msg *base.TaskMessage, errMsg string) {
|
||||
if err := r.broker.Archive(msg, errMsg); err != nil {
|
||||
r.logger.Warnf("recoverer: could not move task to archive: %v", err)
|
||||
}
|
||||
}
|
||||
|
@@ -37,11 +37,11 @@ func TestRecoverer(t *testing.T) {
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
deadlines map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
wantActive map[string][]*base.TaskMessage
|
||||
wantDeadlines map[string][]base.Z
|
||||
wantRetry map[string][]*base.TaskMessage
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
wantArchived map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
desc: "with one active task",
|
||||
@@ -54,7 +54,7 @@ func TestRecoverer(t *testing.T) {
|
||||
retry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantActive: map[string][]*base.TaskMessage{
|
||||
@@ -66,7 +66,7 @@ func TestRecoverer(t *testing.T) {
|
||||
wantRetry: map[string][]*base.TaskMessage{
|
||||
"default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
@@ -84,7 +84,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -100,7 +100,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {h.TaskMessageWithError(*t4, "deadline exceeded")},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -124,7 +124,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -140,7 +140,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")},
|
||||
"critical": {},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -164,7 +164,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {},
|
||||
"cricial": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"cricial": {},
|
||||
},
|
||||
@@ -179,7 +179,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")},
|
||||
"critical": {h.TaskMessageAfterRetry(*t3, "deadline exceeded")},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -198,7 +198,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -214,7 +214,7 @@ func TestRecoverer(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -226,7 +226,7 @@ func TestRecoverer(t *testing.T) {
|
||||
h.SeedAllActiveQueues(t, r, tc.inProgress)
|
||||
h.SeedAllDeadlines(t, r, tc.deadlines)
|
||||
h.SeedAllRetryQueues(t, r, tc.retry)
|
||||
h.SeedAllDeadQueues(t, r, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r, tc.archived)
|
||||
|
||||
recoverer := newRecoverer(recovererParams{
|
||||
logger: testLogger,
|
||||
@@ -259,10 +259,10 @@ func TestRecoverer(t *testing.T) {
|
||||
t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.RetryKey(qname), diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadMessages(t, r, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.DeadKey(qname), diff)
|
||||
t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -115,7 +115,7 @@ func (j *enqueueJob) Run() {
|
||||
}
|
||||
return
|
||||
}
|
||||
j.logger.Infof("scheduler enqueued a task: %+v", res)
|
||||
j.logger.Debugf("scheduler enqueued a task: %+v", res)
|
||||
event := &base.SchedulerEnqueueEvent{
|
||||
TaskID: res.ID,
|
||||
EnqueuedAt: res.EnqueuedAt.In(j.location),
|
||||
|
40
server.go
40
server.go
@@ -20,18 +20,17 @@ import (
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// Server is responsible for managing the background-task processing.
|
||||
// Server is responsible for managing the task processing.
|
||||
//
|
||||
// Server pulls tasks off queues and processes them.
|
||||
// If the processing of a task is unsuccessful, server will
|
||||
// schedule it for a retry.
|
||||
// If the processing of a task is unsuccessful, server will schedule it for a retry.
|
||||
// A task will be retried until either the task gets processed successfully
|
||||
// or until it reaches its max retry count.
|
||||
//
|
||||
// If a task exhausts its retries, it will be moved to the "dead" queue and
|
||||
// will be kept in the queue for some time until a certain condition is met
|
||||
// (e.g., queue size reaches a certain limit, or the task has been in the
|
||||
// queue for a certain amount of time).
|
||||
// If a task exhausts its retries, it will be moved to the archive and
|
||||
// will be kept in the archive for some time until a certain condition is met
|
||||
// (e.g., archive size reaches a certain limit, or the task has been in the
|
||||
// archive for a certain amount of time).
|
||||
type Server struct {
|
||||
logger *log.Logger
|
||||
|
||||
@@ -61,11 +60,7 @@ type Config struct {
|
||||
// Function to calculate retry delay for a failed task.
|
||||
//
|
||||
// By default, it uses exponential backoff algorithm to calculate the delay.
|
||||
//
|
||||
// n is the number of times the task has been retried.
|
||||
// e is the error returned by the task handler.
|
||||
// t is the task in question.
|
||||
RetryDelayFunc func(n int, e error, t *Task) time.Duration
|
||||
RetryDelayFunc RetryDelayFunc
|
||||
|
||||
// List of queues to process with given priority value. Keys are the names of the
|
||||
// queues and values are associated priority value.
|
||||
@@ -154,6 +149,14 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
|
||||
fn(ctx, task, err)
|
||||
}
|
||||
|
||||
// RetryDelayFunc calculates the retry delay duration for a failed task given
|
||||
// the retry count, error, and the task.
|
||||
//
|
||||
// n is the number of times the task has been retried.
|
||||
// e is the error returned by the task handler.
|
||||
// t is the task in question.
|
||||
type RetryDelayFunc func(n int, e error, t *Task) time.Duration
|
||||
|
||||
// Logger supports logging at various log levels.
|
||||
type Logger interface {
|
||||
// Debug logs a message at Debug level.
|
||||
@@ -254,9 +257,11 @@ func toInternalLogLevel(l LogLevel) log.Level {
|
||||
panic(fmt.Sprintf("asynq: unexpected log level: %v", l))
|
||||
}
|
||||
|
||||
// Formula taken from https://github.com/mperham/sidekiq.
|
||||
func defaultDelayFunc(n int, e error, t *Task) time.Duration {
|
||||
// DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.
|
||||
// It uses exponential back-off strategy to calculate the retry delay.
|
||||
func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
// Formula taken from https://github.com/mperham/sidekiq.
|
||||
s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1))
|
||||
return time.Duration(s) * time.Second
|
||||
}
|
||||
@@ -280,7 +285,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
}
|
||||
delayFunc := cfg.RetryDelayFunc
|
||||
if delayFunc == nil {
|
||||
delayFunc = defaultDelayFunc
|
||||
delayFunc = DefaultRetryDelayFunc
|
||||
}
|
||||
queues := make(map[string]int)
|
||||
for qname, p := range cfg.Queues {
|
||||
@@ -292,7 +297,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
queues = defaultQueueConfig
|
||||
}
|
||||
var qnames []string
|
||||
for q, _ := range queues {
|
||||
for q := range queues {
|
||||
qnames = append(qnames, q)
|
||||
}
|
||||
shutdownTimeout := cfg.ShutdownTimeout
|
||||
@@ -392,6 +397,9 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
//
|
||||
// If ProcessTask return a non-nil error or panics, the task
|
||||
// will be retried after delay.
|
||||
// One exception to this rule is when ProcessTask returns SkipRetry error.
|
||||
// If the returned error is SkipRetry or the error wraps SkipRetry, retry is
|
||||
// skipped and task will be archived instead.
|
||||
type Handler interface {
|
||||
ProcessTask(context.Context, *Task) error
|
||||
}
|
||||
|
@@ -24,7 +24,7 @@ To view details on any command, use `asynq help <command> <subcommand>`.
|
||||
|
||||
- `asynq stats`
|
||||
- `asynq queue [ls inspect history rm pause unpause]`
|
||||
- `asynq task [ls cancel delete kill run delete-all kill-all run-all]`
|
||||
- `asynq task [ls cancel delete archive run delete-all archive-all run-all]`
|
||||
- `asynq server [ls]`
|
||||
|
||||
### Global flags
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -18,6 +19,8 @@ func init() {
|
||||
rootCmd.AddCommand(cronCmd)
|
||||
cronCmd.AddCommand(cronListCmd)
|
||||
cronCmd.AddCommand(cronHistoryCmd)
|
||||
cronHistoryCmd.Flags().Int("page", 1, "page number")
|
||||
cronHistoryCmd.Flags().Int("size", 30, "page size")
|
||||
}
|
||||
|
||||
var cronCmd = &cobra.Command{
|
||||
@@ -32,16 +35,16 @@ var cronListCmd = &cobra.Command{
|
||||
}
|
||||
|
||||
var cronHistoryCmd = &cobra.Command{
|
||||
Use: "history",
|
||||
Use: "history [ENTRY_ID...]",
|
||||
Short: "Show history of each cron tasks",
|
||||
Args: cobra.MinimumNArgs(1),
|
||||
Run: cronHistory,
|
||||
}
|
||||
|
||||
func cronList(cmd *cobra.Command, args []string) {
|
||||
r := createRDB()
|
||||
inspector := createInspector()
|
||||
|
||||
entries, err := r.ListSchedulerEntries()
|
||||
entries, err := inspector.SchedulerEntries()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -60,7 +63,7 @@ func cronList(cmd *cobra.Command, args []string) {
|
||||
cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"}
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, e := range entries {
|
||||
fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Type, e.Payload, e.Opts,
|
||||
fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type, e.Task.Payload, e.Opts,
|
||||
nextEnqueue(e.Next), prevEnqueue(e.Prev))
|
||||
}
|
||||
}
|
||||
@@ -84,9 +87,18 @@ func prevEnqueue(prevEnqueuedAt time.Time) string {
|
||||
return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second))
|
||||
}
|
||||
|
||||
// TODO: Paginate the result set.
|
||||
func cronHistory(cmd *cobra.Command, args []string) {
|
||||
r := createRDB()
|
||||
pageNum, err := cmd.Flags().GetInt("page")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
pageSize, err := cmd.Flags().GetInt("size")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
inspector := createInspector()
|
||||
for i, entryID := range args {
|
||||
if i > 0 {
|
||||
fmt.Printf("\n%s\n", separator)
|
||||
@@ -95,7 +107,8 @@ func cronHistory(cmd *cobra.Command, args []string) {
|
||||
|
||||
fmt.Printf("Entry: %s\n\n", entryID)
|
||||
|
||||
events, err := r.ListSchedulerEnqueueEvents(entryID)
|
||||
events, err := inspector.ListSchedulerEnqueueEvents(
|
||||
entryID, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
continue
|
||||
@@ -105,12 +118,6 @@ func cronHistory(cmd *cobra.Command, args []string) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Sort entries by enqueuedAt timestamp.
|
||||
sort.Slice(events, func(i, j int) bool {
|
||||
x, y := events[i], events[j]
|
||||
return x.EnqueuedAt.Unix() > y.EnqueuedAt.Unix()
|
||||
})
|
||||
|
||||
cols := []string{"TaskID", "EnqueuedAt"}
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, e := range events {
|
||||
|
@@ -98,7 +98,9 @@ func migrate(cmd *cobra.Command, args []string) {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := partitionZSetMembersByQueue(c, "asynq:dead", base.DeadKey); err != nil {
|
||||
// Note: base.DeadKey function was renamed in v0.14. We define the legacy function here since we need it for this migration script.
|
||||
deadKeyFunc := func(qname string) string { return fmt.Sprintf("asynq:{%s}:dead", qname) }
|
||||
if err := partitionZSetMembersByQueue(c, "asynq:dead", deadKeyFunc); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
@@ -113,7 +115,7 @@ func migrate(cmd *cobra.Command, args []string) {
|
||||
|
||||
paused, err := c.SMembers("asynq:paused").Result()
|
||||
if err != nil {
|
||||
printError(fmt.Errorf("command SMEMBERS asynq:paused failed: ", err))
|
||||
printError(fmt.Errorf("command SMEMBERS asynq:paused failed: %v", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
for _, qkey := range paused {
|
||||
@@ -136,6 +138,27 @@ func migrate(cmd *cobra.Command, args []string) {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
/*** Migrate from 0.13 to 0.14 compatible ***/
|
||||
|
||||
// Move all dead tasks to archived ZSET.
|
||||
for _, qname := range allQueues {
|
||||
zs, err := c.ZRangeWithScores(deadKeyFunc(qname), 0, -1).Result()
|
||||
if err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
for _, z := range zs {
|
||||
if err := c.ZAdd(base.ArchivedKey(qname), &z).Err(); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := deleteKey(c, deadKeyFunc(qname)); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func backupKey(key string) string {
|
||||
|
@@ -149,9 +149,9 @@ func printQueueStats(s *asynq.QueueStats) {
|
||||
fmt.Printf("Paused: %t\n\n", s.Paused)
|
||||
bold.Println("Task Count by State")
|
||||
printTable(
|
||||
[]string{"active", "pending", "scheduled", "retry", "dead"},
|
||||
[]string{"active", "pending", "scheduled", "retry", "archived"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead)
|
||||
fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Archived)
|
||||
},
|
||||
)
|
||||
fmt.Println()
|
||||
|
@@ -57,7 +57,7 @@ type AggregateStats struct {
|
||||
Pending int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
Archived int
|
||||
Processed int
|
||||
Failed int
|
||||
Timestamp time.Time
|
||||
@@ -84,7 +84,7 @@ func stats(cmd *cobra.Command, args []string) {
|
||||
aggStats.Pending += s.Pending
|
||||
aggStats.Scheduled += s.Scheduled
|
||||
aggStats.Retry += s.Retry
|
||||
aggStats.Dead += s.Dead
|
||||
aggStats.Archived += s.Archived
|
||||
aggStats.Processed += s.Processed
|
||||
aggStats.Failed += s.Failed
|
||||
aggStats.Timestamp = s.Timestamp
|
||||
@@ -126,9 +126,9 @@ func stats(cmd *cobra.Command, args []string) {
|
||||
func printStatsByState(s *AggregateStats) {
|
||||
format := strings.Repeat("%v\t", 5) + "\n"
|
||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
fmt.Fprintf(tw, format, "active", "pending", "scheduled", "retry", "dead")
|
||||
fmt.Fprintf(tw, format, "active", "pending", "scheduled", "retry", "archived")
|
||||
fmt.Fprintf(tw, format, "----------", "--------", "---------", "-----", "----")
|
||||
fmt.Fprintf(tw, format, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead)
|
||||
fmt.Fprintf(tw, format, s.Active, s.Pending, s.Scheduled, s.Retry, s.Archived)
|
||||
tw.Flush()
|
||||
}
|
||||
|
||||
|
@@ -26,11 +26,11 @@ func init() {
|
||||
|
||||
taskCmd.AddCommand(taskCancelCmd)
|
||||
|
||||
taskCmd.AddCommand(taskKillCmd)
|
||||
taskKillCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs")
|
||||
taskKillCmd.Flags().StringP("key", "k", "", "key of the task")
|
||||
taskKillCmd.MarkFlagRequired("queue")
|
||||
taskKillCmd.MarkFlagRequired("key")
|
||||
taskCmd.AddCommand(taskArchiveCmd)
|
||||
taskArchiveCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs")
|
||||
taskArchiveCmd.Flags().StringP("key", "k", "", "key of the task")
|
||||
taskArchiveCmd.MarkFlagRequired("queue")
|
||||
taskArchiveCmd.MarkFlagRequired("key")
|
||||
|
||||
taskCmd.AddCommand(taskDeleteCmd)
|
||||
taskDeleteCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs")
|
||||
@@ -44,11 +44,11 @@ func init() {
|
||||
taskRunCmd.MarkFlagRequired("queue")
|
||||
taskRunCmd.MarkFlagRequired("key")
|
||||
|
||||
taskCmd.AddCommand(taskKillAllCmd)
|
||||
taskKillAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong")
|
||||
taskKillAllCmd.Flags().StringP("state", "s", "", "state of the tasks")
|
||||
taskKillAllCmd.MarkFlagRequired("queue")
|
||||
taskKillAllCmd.MarkFlagRequired("state")
|
||||
taskCmd.AddCommand(taskArchiveAllCmd)
|
||||
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong")
|
||||
taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks")
|
||||
taskArchiveAllCmd.MarkFlagRequired("queue")
|
||||
taskArchiveAllCmd.MarkFlagRequired("state")
|
||||
|
||||
taskCmd.AddCommand(taskDeleteAllCmd)
|
||||
taskDeleteAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong")
|
||||
@@ -78,7 +78,7 @@ The value for the state flag should be one of:
|
||||
- pending
|
||||
- scheduled
|
||||
- retry
|
||||
- dead
|
||||
- archived
|
||||
|
||||
List opeartion paginates the result set.
|
||||
By default, the command fetches the first 30 tasks.
|
||||
@@ -100,9 +100,9 @@ var taskCancelCmd = &cobra.Command{
|
||||
Run: taskCancel,
|
||||
}
|
||||
|
||||
var taskKillCmd = &cobra.Command{
|
||||
Use: "kill --queue=QUEUE --key=KEY",
|
||||
Short: "Kill a task with the given key",
|
||||
var taskArchiveCmd = &cobra.Command{
|
||||
Use: "archive --queue=QUEUE --key=KEY",
|
||||
Short: "Archive a task with the given key",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskKill,
|
||||
}
|
||||
@@ -121,11 +121,11 @@ var taskRunCmd = &cobra.Command{
|
||||
Run: taskRun,
|
||||
}
|
||||
|
||||
var taskKillAllCmd = &cobra.Command{
|
||||
Use: "kill-all --queue=QUEUE --state=STATE",
|
||||
Short: "Kill all tasks in the given state",
|
||||
var taskArchiveAllCmd = &cobra.Command{
|
||||
Use: "archive-all --queue=QUEUE --state=STATE",
|
||||
Short: "Archive all tasks in the given state",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskKillAll,
|
||||
Run: taskArchiveAll,
|
||||
}
|
||||
|
||||
var taskDeleteAllCmd = &cobra.Command{
|
||||
@@ -173,8 +173,8 @@ func taskList(cmd *cobra.Command, args []string) {
|
||||
listScheduledTasks(qname, pageNum, pageSize)
|
||||
case "retry":
|
||||
listRetryTasks(qname, pageNum, pageSize)
|
||||
case "dead":
|
||||
listDeadTasks(qname, pageNum, pageSize)
|
||||
case "archived":
|
||||
listArchivedTasks(qname, pageNum, pageSize)
|
||||
default:
|
||||
fmt.Printf("error: state=%q is not supported\n", state)
|
||||
os.Exit(1)
|
||||
@@ -273,15 +273,15 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
|
||||
)
|
||||
}
|
||||
|
||||
func listDeadTasks(qname string, pageNum, pageSize int) {
|
||||
func listArchivedTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if len(tasks) == 0 {
|
||||
fmt.Printf("No dead tasks in %q queue\n", qname)
|
||||
fmt.Printf("No archived tasks in %q queue\n", qname)
|
||||
return
|
||||
}
|
||||
printTable(
|
||||
@@ -318,12 +318,12 @@ func taskKill(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
|
||||
i := createInspector()
|
||||
err = i.KillTaskByKey(qname, key)
|
||||
err = i.ArchiveTaskByKey(qname, key)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Println("task transitioned to dead state")
|
||||
fmt.Println("task transitioned to archived state")
|
||||
}
|
||||
|
||||
func taskDelete(cmd *cobra.Command, args []string) {
|
||||
@@ -368,7 +368,7 @@ func taskRun(cmd *cobra.Command, args []string) {
|
||||
fmt.Println("task transitioned to pending state")
|
||||
}
|
||||
|
||||
func taskKillAll(cmd *cobra.Command, args []string) {
|
||||
func taskArchiveAll(cmd *cobra.Command, args []string) {
|
||||
qname, err := cmd.Flags().GetString("queue")
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
@@ -384,9 +384,9 @@ func taskKillAll(cmd *cobra.Command, args []string) {
|
||||
var n int
|
||||
switch state {
|
||||
case "scheduled":
|
||||
n, err = i.KillAllScheduledTasks(qname)
|
||||
n, err = i.ArchiveAllScheduledTasks(qname)
|
||||
case "retry":
|
||||
n, err = i.KillAllRetryTasks(qname)
|
||||
n, err = i.ArchiveAllRetryTasks(qname)
|
||||
default:
|
||||
fmt.Printf("error: unsupported state %q\n", state)
|
||||
os.Exit(1)
|
||||
@@ -395,7 +395,7 @@ func taskKillAll(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("%d tasks transitioned to dead state\n", n)
|
||||
fmt.Printf("%d tasks transitioned to archived state\n", n)
|
||||
}
|
||||
|
||||
func taskDeleteAll(cmd *cobra.Command, args []string) {
|
||||
@@ -417,8 +417,8 @@ func taskDeleteAll(cmd *cobra.Command, args []string) {
|
||||
n, err = i.DeleteAllScheduledTasks(qname)
|
||||
case "retry":
|
||||
n, err = i.DeleteAllRetryTasks(qname)
|
||||
case "dead":
|
||||
n, err = i.DeleteAllDeadTasks(qname)
|
||||
case "archived":
|
||||
n, err = i.DeleteAllArchivedTasks(qname)
|
||||
default:
|
||||
fmt.Printf("error: unsupported state %q\n", state)
|
||||
os.Exit(1)
|
||||
@@ -449,8 +449,8 @@ func taskRunAll(cmd *cobra.Command, args []string) {
|
||||
n, err = i.RunAllScheduledTasks(qname)
|
||||
case "retry":
|
||||
n, err = i.RunAllRetryTasks(qname)
|
||||
case "dead":
|
||||
n, err = i.RunAllDeadTasks(qname)
|
||||
case "archived":
|
||||
n, err = i.RunAllArchivedTasks(qname)
|
||||
default:
|
||||
fmt.Printf("error: unsupported state %q\n", state)
|
||||
os.Exit(1)
|
||||
|
Reference in New Issue
Block a user