mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-20 21:26:14 +08:00
Compare commits
24 Commits
v0.11.0
...
v0.10.0.rc
Author | SHA1 | Date | |
---|---|---|---|
|
04702ddfd2 | ||
|
6705f7c27a | ||
|
e27ae0d33a | ||
|
6cd0ab65a3 | ||
|
83c9d5ae94 | ||
|
7eebbf181e | ||
|
7b1770da96 | ||
|
e2c5882368 | ||
|
50df107ace | ||
|
9699d196e5 | ||
|
1c5f7a791b | ||
|
232efe8279 | ||
|
ef4a4a8334 | ||
|
65e17a3469 | ||
|
88d94a2a9d | ||
|
7433b94aac | ||
|
08ac7793ab | ||
|
02b653df72 | ||
|
bee784c052 | ||
|
4ea58052f8 | ||
|
5afb4861a5 | ||
|
68e6b379fc | ||
|
0e70a14899 | ||
|
f01c7b8e66 |
10
CHANGELOG.md
10
CHANGELOG.md
@@ -7,16 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.11.0] - 2020-07-28
|
||||
|
||||
### Added
|
||||
|
||||
- `Inspector` type was added to monitor and mutate state of queues and tasks.
|
||||
- `HealthCheckFunc` and `HealthCheckInterval` fields were added to `Config` to allow user to specify a callback
|
||||
function to check for broker connection.
|
||||
|
||||
## [0.10.0] - 2020-07-06
|
||||
|
||||
### Changed
|
||||
|
||||
- All tasks now requires timeout or deadline. By default, timeout is set to 30 mins.
|
||||
|
@@ -34,7 +34,6 @@ A system can consist of multiple worker servers and brokers, giving way to high
|
||||
- Scheduling of tasks
|
||||
- Durability since tasks are written to Redis
|
||||
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
|
||||
- Automatic recovery of tasks in the event of a worker crash
|
||||
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
|
||||
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
|
||||
- Low latency to add a task since writes are fast in Redis
|
||||
|
@@ -36,7 +36,7 @@ func TestClientEnqueueAt(t *testing.T) {
|
||||
opts []Option
|
||||
wantRes *Result
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantScheduled []base.Z
|
||||
wantScheduled []h.ZSetEntry
|
||||
}{
|
||||
{
|
||||
desc: "Process task immediately",
|
||||
@@ -75,9 +75,9 @@ func TestClientEnqueueAt(t *testing.T) {
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
||||
wantScheduled: []base.Z{
|
||||
wantScheduled: []h.ZSetEntry{
|
||||
{
|
||||
Message: &base.TaskMessage{
|
||||
Msg: &base.TaskMessage{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -85,7 +85,7 @@ func TestClientEnqueueAt(t *testing.T) {
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
Score: oneHourLater.Unix(),
|
||||
Score: float64(oneHourLater.Unix()),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -376,7 +376,7 @@ func TestClientEnqueueIn(t *testing.T) {
|
||||
opts []Option
|
||||
wantRes *Result
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantScheduled []base.Z
|
||||
wantScheduled []h.ZSetEntry
|
||||
}{
|
||||
{
|
||||
desc: "schedule a task to be enqueued in one hour",
|
||||
@@ -390,9 +390,9 @@ func TestClientEnqueueIn(t *testing.T) {
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
||||
wantScheduled: []base.Z{
|
||||
wantScheduled: []h.ZSetEntry{
|
||||
{
|
||||
Message: &base.TaskMessage{
|
||||
Msg: &base.TaskMessage{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -400,7 +400,7 @@ func TestClientEnqueueIn(t *testing.T) {
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
Score: time.Now().Add(time.Hour).Unix(),
|
||||
Score: float64(time.Now().Add(time.Hour).Unix()),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@@ -1,80 +0,0 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/log"
|
||||
)
|
||||
|
||||
// healthchecker is responsible for pinging broker periodically
|
||||
// and call user provided HeathCheckFunc with the ping result.
|
||||
type healthchecker struct {
|
||||
logger *log.Logger
|
||||
broker base.Broker
|
||||
|
||||
// channel to communicate back to the long running "healthchecker" goroutine.
|
||||
done chan struct{}
|
||||
|
||||
// interval between healthchecks.
|
||||
interval time.Duration
|
||||
|
||||
// function to call periodically.
|
||||
healthcheckFunc func(error)
|
||||
}
|
||||
|
||||
type healthcheckerParams struct {
|
||||
logger *log.Logger
|
||||
broker base.Broker
|
||||
interval time.Duration
|
||||
healthcheckFunc func(error)
|
||||
}
|
||||
|
||||
func newHealthChecker(params healthcheckerParams) *healthchecker {
|
||||
return &healthchecker{
|
||||
logger: params.logger,
|
||||
broker: params.broker,
|
||||
done: make(chan struct{}),
|
||||
interval: params.interval,
|
||||
healthcheckFunc: params.healthcheckFunc,
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *healthchecker) terminate() {
|
||||
if hc.healthcheckFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
hc.logger.Debug("Healthchecker shutting down...")
|
||||
// Signal the healthchecker goroutine to stop.
|
||||
hc.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (hc *healthchecker) start(wg *sync.WaitGroup) {
|
||||
if hc.healthcheckFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
timer := time.NewTimer(hc.interval)
|
||||
for {
|
||||
select {
|
||||
case <-hc.done:
|
||||
hc.logger.Debug("Healthchecker done")
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
err := hc.broker.Ping()
|
||||
hc.healthcheckFunc(err)
|
||||
timer.Reset(hc.interval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
@@ -1,101 +0,0 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/hibiken/asynq/internal/testbroker"
|
||||
)
|
||||
|
||||
func TestHealthChecker(t *testing.T) {
|
||||
r := setup(t)
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
|
||||
var (
|
||||
// mu guards called and e variables.
|
||||
mu sync.Mutex
|
||||
called int
|
||||
e error
|
||||
)
|
||||
checkFn := func(err error) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
called++
|
||||
e = err
|
||||
}
|
||||
|
||||
hc := newHealthChecker(healthcheckerParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
interval: 1 * time.Second,
|
||||
healthcheckFunc: checkFn,
|
||||
})
|
||||
|
||||
hc.start(&sync.WaitGroup{})
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
mu.Lock()
|
||||
if called == 0 {
|
||||
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
|
||||
}
|
||||
if e != nil {
|
||||
t.Errorf("HealthCheckFunc was called with non-nil error: %v", e)
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
hc.terminate()
|
||||
}
|
||||
|
||||
func TestHealthCheckerWhenRedisDown(t *testing.T) {
|
||||
// Make sure that healthchecker goroutine doesn't panic
|
||||
// if it cannot connect to redis.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Errorf("panic occurred: %v", r)
|
||||
}
|
||||
}()
|
||||
r := rdb.NewRDB(setup(t))
|
||||
testBroker := testbroker.NewTestBroker(r)
|
||||
var (
|
||||
// mu guards called and e variables.
|
||||
mu sync.Mutex
|
||||
called int
|
||||
e error
|
||||
)
|
||||
checkFn := func(err error) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
called++
|
||||
e = err
|
||||
}
|
||||
|
||||
hc := newHealthChecker(healthcheckerParams{
|
||||
logger: testLogger,
|
||||
broker: testBroker,
|
||||
interval: 1 * time.Second,
|
||||
healthcheckFunc: checkFn,
|
||||
})
|
||||
|
||||
testBroker.Sleep()
|
||||
hc.start(&sync.WaitGroup{})
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
mu.Lock()
|
||||
if called == 0 {
|
||||
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
|
||||
}
|
||||
if e == nil {
|
||||
t.Errorf("HealthCheckFunc was called with nil; want non-nil error")
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
hc.terminate()
|
||||
}
|
500
inspector.go
500
inspector.go
@@ -1,500 +0,0 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// Inspector is a client interface to inspect and mutate the state of
|
||||
// queues and tasks.
|
||||
type Inspector struct {
|
||||
rdb *rdb.RDB
|
||||
}
|
||||
|
||||
// New returns a new instance of Inspector.
|
||||
func NewInspector(r RedisConnOpt) *Inspector {
|
||||
return &Inspector{
|
||||
rdb: rdb.NewRDB(createRedisClient(r)),
|
||||
}
|
||||
}
|
||||
|
||||
// Stats represents a state of queues at a certain time.
|
||||
type Stats struct {
|
||||
Enqueued int
|
||||
InProgress int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
Processed int
|
||||
Failed int
|
||||
Queues []*QueueInfo
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// QueueInfo holds information about a queue.
|
||||
type QueueInfo struct {
|
||||
// Name of the queue (e.g. "default", "critical").
|
||||
// Note: It doesn't include the prefix "asynq:queues:".
|
||||
Name string
|
||||
|
||||
// Paused indicates whether the queue is paused.
|
||||
// If true, tasks in the queue should not be processed.
|
||||
Paused bool
|
||||
|
||||
// Size is the number of tasks in the queue.
|
||||
Size int
|
||||
}
|
||||
|
||||
// CurrentStats returns a current stats of the queues.
|
||||
func (i *Inspector) CurrentStats() (*Stats, error) {
|
||||
stats, err := i.rdb.CurrentStats()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var qs []*QueueInfo
|
||||
for _, q := range stats.Queues {
|
||||
qs = append(qs, (*QueueInfo)(q))
|
||||
}
|
||||
return &Stats{
|
||||
Enqueued: stats.Enqueued,
|
||||
InProgress: stats.InProgress,
|
||||
Scheduled: stats.Scheduled,
|
||||
Retry: stats.Retry,
|
||||
Dead: stats.Dead,
|
||||
Processed: stats.Processed,
|
||||
Failed: stats.Failed,
|
||||
Queues: qs,
|
||||
Timestamp: stats.Timestamp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DailyStats holds aggregate data for a given day.
|
||||
type DailyStats struct {
|
||||
Processed int
|
||||
Failed int
|
||||
Date time.Time
|
||||
}
|
||||
|
||||
// History returns a list of stats from the last n days.
|
||||
func (i *Inspector) History(n int) ([]*DailyStats, error) {
|
||||
stats, err := i.rdb.HistoricalStats(n)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var res []*DailyStats
|
||||
for _, s := range stats {
|
||||
res = append(res, &DailyStats{
|
||||
Processed: s.Processed,
|
||||
Failed: s.Failed,
|
||||
Date: s.Time,
|
||||
})
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// EnqueuedTask is a task in a queue and is ready to be processed.
|
||||
type EnqueuedTask struct {
|
||||
*Task
|
||||
ID string
|
||||
Queue string
|
||||
}
|
||||
|
||||
// InProgressTask is a task that's currently being processed.
|
||||
type InProgressTask struct {
|
||||
*Task
|
||||
ID string
|
||||
}
|
||||
|
||||
// ScheduledTask is a task scheduled to be processed in the future.
|
||||
type ScheduledTask struct {
|
||||
*Task
|
||||
ID string
|
||||
Queue string
|
||||
NextEnqueueAt time.Time
|
||||
|
||||
score int64
|
||||
}
|
||||
|
||||
// RetryTask is a task scheduled to be retried in the future.
|
||||
type RetryTask struct {
|
||||
*Task
|
||||
ID string
|
||||
Queue string
|
||||
NextEnqueueAt time.Time
|
||||
MaxRetry int
|
||||
Retried int
|
||||
ErrorMsg string
|
||||
// TODO: LastFailedAt time.Time
|
||||
|
||||
score int64
|
||||
}
|
||||
|
||||
// DeadTask is a task exhausted its retries.
|
||||
// DeadTask won't be retried automatically.
|
||||
type DeadTask struct {
|
||||
*Task
|
||||
ID string
|
||||
Queue string
|
||||
MaxRetry int
|
||||
Retried int
|
||||
LastFailedAt time.Time
|
||||
ErrorMsg string
|
||||
|
||||
score int64
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, enqueue, and kill the task.
|
||||
func (t *ScheduledTask) Key() string {
|
||||
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, enqueue, and kill the task.
|
||||
func (t *RetryTask) Key() string {
|
||||
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, enqueue, and kill the task.
|
||||
func (t *DeadTask) Key() string {
|
||||
return fmt.Sprintf("d:%v:%v", t.ID, t.score)
|
||||
}
|
||||
|
||||
// parseTaskKey parses a key string and returns each part of key with proper
|
||||
// type if valid, otherwise it reports an error.
|
||||
func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err error) {
|
||||
parts := strings.Split(key, ":")
|
||||
if len(parts) != 3 {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
id, err = uuid.Parse(parts[1])
|
||||
if err != nil {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
score, err = strconv.ParseInt(parts[2], 10, 64)
|
||||
if err != nil {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
qtype = parts[0]
|
||||
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
return id, score, qtype, nil
|
||||
}
|
||||
|
||||
// ListOption specifies behavior of list operation.
|
||||
type ListOption interface{}
|
||||
|
||||
// Internal list option representations.
|
||||
type (
|
||||
pageSizeOpt int
|
||||
pageNumOpt int
|
||||
)
|
||||
|
||||
type listOption struct {
|
||||
pageSize int
|
||||
pageNum int
|
||||
}
|
||||
|
||||
const (
|
||||
// Page size used by default in list operation.
|
||||
defaultPageSize = 30
|
||||
|
||||
// Page number used by default in list operation.
|
||||
defaultPageNum = 1
|
||||
)
|
||||
|
||||
func composeListOptions(opts ...ListOption) listOption {
|
||||
res := listOption{
|
||||
pageSize: defaultPageSize,
|
||||
pageNum: defaultPageNum,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
switch opt := opt.(type) {
|
||||
case pageSizeOpt:
|
||||
res.pageSize = int(opt)
|
||||
case pageNumOpt:
|
||||
res.pageNum = int(opt)
|
||||
default:
|
||||
// ignore unexpected option
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// PageSize returns an option to specify the page size for list operation.
|
||||
//
|
||||
// Negative page size is treated as zero.
|
||||
func PageSize(n int) ListOption {
|
||||
if n < 0 {
|
||||
n = 0
|
||||
}
|
||||
return pageSizeOpt(n)
|
||||
}
|
||||
|
||||
// Page returns an option to specify the page number for list operation.
|
||||
// The value 1 fetches the first page.
|
||||
//
|
||||
// Negative page number is treated as one.
|
||||
func Page(n int) ListOption {
|
||||
if n < 0 {
|
||||
n = 1
|
||||
}
|
||||
return pageNumOpt(n)
|
||||
}
|
||||
|
||||
// ListScheduledTasks retrieves tasks in the specified queue.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
msgs, err := i.rdb.ListEnqueued(qname, pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*EnqueuedTask
|
||||
for _, m := range msgs {
|
||||
tasks = append(tasks, &EnqueuedTask{
|
||||
Task: NewTask(m.Type, m.Payload),
|
||||
ID: m.ID.String(),
|
||||
Queue: m.Queue,
|
||||
})
|
||||
}
|
||||
return tasks, err
|
||||
}
|
||||
|
||||
// ListScheduledTasks retrieves tasks currently being processed.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
msgs, err := i.rdb.ListInProgress(pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*InProgressTask
|
||||
for _, m := range msgs {
|
||||
tasks = append(tasks, &InProgressTask{
|
||||
Task: NewTask(m.Type, m.Payload),
|
||||
ID: m.ID.String(),
|
||||
})
|
||||
}
|
||||
return tasks, err
|
||||
}
|
||||
|
||||
// ListScheduledTasks retrieves tasks in scheduled state.
|
||||
// Tasks are sorted by NextEnqueueAt field in ascending order.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
zs, err := i.rdb.ListScheduled(pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*ScheduledTask
|
||||
for _, z := range zs {
|
||||
enqueueAt := time.Unix(z.Score, 0)
|
||||
t := NewTask(z.Message.Type, z.Message.Payload)
|
||||
tasks = append(tasks, &ScheduledTask{
|
||||
Task: t,
|
||||
ID: z.Message.ID.String(),
|
||||
Queue: z.Message.Queue,
|
||||
NextEnqueueAt: enqueueAt,
|
||||
score: z.Score,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListScheduledTasks retrieves tasks in retry state.
|
||||
// Tasks are sorted by NextEnqueueAt field in ascending order.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
zs, err := i.rdb.ListRetry(pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*RetryTask
|
||||
for _, z := range zs {
|
||||
enqueueAt := time.Unix(z.Score, 0)
|
||||
t := NewTask(z.Message.Type, z.Message.Payload)
|
||||
tasks = append(tasks, &RetryTask{
|
||||
Task: t,
|
||||
ID: z.Message.ID.String(),
|
||||
Queue: z.Message.Queue,
|
||||
NextEnqueueAt: enqueueAt,
|
||||
MaxRetry: z.Message.Retry,
|
||||
Retried: z.Message.Retried,
|
||||
// TODO: LastFailedAt: z.Message.LastFailedAt
|
||||
ErrorMsg: z.Message.ErrorMsg,
|
||||
score: z.Score,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListScheduledTasks retrieves tasks in retry state.
|
||||
// Tasks are sorted by LastFailedAt field in descending order.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
zs, err := i.rdb.ListDead(pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*DeadTask
|
||||
for _, z := range zs {
|
||||
failedAt := time.Unix(z.Score, 0)
|
||||
t := NewTask(z.Message.Type, z.Message.Payload)
|
||||
tasks = append(tasks, &DeadTask{
|
||||
Task: t,
|
||||
ID: z.Message.ID.String(),
|
||||
Queue: z.Message.Queue,
|
||||
MaxRetry: z.Message.Retry,
|
||||
Retried: z.Message.Retried,
|
||||
LastFailedAt: failedAt,
|
||||
ErrorMsg: z.Message.ErrorMsg,
|
||||
score: z.Score,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// DeleteAllScheduledTasks deletes all tasks in scheduled state,
|
||||
// and reports the number tasks deleted.
|
||||
func (i *Inspector) DeleteAllScheduledTasks() (int, error) {
|
||||
n, err := i.rdb.DeleteAllScheduledTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// DeleteAllRetryTasks deletes all tasks in retry state,
|
||||
// and reports the number tasks deleted.
|
||||
func (i *Inspector) DeleteAllRetryTasks() (int, error) {
|
||||
n, err := i.rdb.DeleteAllRetryTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// DeleteAllDeadTasks deletes all tasks in dead state,
|
||||
// and reports the number tasks deleted.
|
||||
func (i *Inspector) DeleteAllDeadTasks() (int, error) {
|
||||
n, err := i.rdb.DeleteAllDeadTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// DeleteTaskByKey deletes a task with the given key.
|
||||
func (i *Inspector) DeleteTaskByKey(key string) error {
|
||||
id, score, qtype, err := parseTaskKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch qtype {
|
||||
case "s":
|
||||
return i.rdb.DeleteScheduledTask(id, score)
|
||||
case "r":
|
||||
return i.rdb.DeleteRetryTask(id, score)
|
||||
case "d":
|
||||
return i.rdb.DeleteDeadTask(id, score)
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueAllScheduledTasks enqueues all tasks in the scheduled state,
|
||||
// and reports the number of tasks enqueued.
|
||||
func (i *Inspector) EnqueueAllScheduledTasks() (int, error) {
|
||||
n, err := i.rdb.EnqueueAllScheduledTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// EnqueueAllRetryTasks enqueues all tasks in the retry state,
|
||||
// and reports the number of tasks enqueued.
|
||||
func (i *Inspector) EnqueueAllRetryTasks() (int, error) {
|
||||
n, err := i.rdb.EnqueueAllRetryTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// EnqueueAllDeadTasks enqueues all tasks in the dead state,
|
||||
// and reports the number of tasks enqueued.
|
||||
func (i *Inspector) EnqueueAllDeadTasks() (int, error) {
|
||||
n, err := i.rdb.EnqueueAllDeadTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// EnqueueTaskByKey enqueues a task with the given key.
|
||||
func (i *Inspector) EnqueueTaskByKey(key string) error {
|
||||
id, score, qtype, err := parseTaskKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch qtype {
|
||||
case "s":
|
||||
return i.rdb.EnqueueScheduledTask(id, score)
|
||||
case "r":
|
||||
return i.rdb.EnqueueRetryTask(id, score)
|
||||
case "d":
|
||||
return i.rdb.EnqueueDeadTask(id, score)
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// KillAllScheduledTasks kills all tasks in scheduled state,
|
||||
// and reports the number of tasks killed.
|
||||
func (i *Inspector) KillAllScheduledTasks() (int, error) {
|
||||
n, err := i.rdb.KillAllScheduledTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// KillAllRetryTasks kills all tasks in retry state,
|
||||
// and reports the number of tasks killed.
|
||||
func (i *Inspector) KillAllRetryTasks() (int, error) {
|
||||
n, err := i.rdb.KillAllRetryTasks()
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// KillTaskByKey kills a task with the given key.
|
||||
func (i *Inspector) KillTaskByKey(key string) error {
|
||||
id, score, qtype, err := parseTaskKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch qtype {
|
||||
case "s":
|
||||
return i.rdb.KillScheduledTask(id, score)
|
||||
case "r":
|
||||
return i.rdb.KillRetryTask(id, score)
|
||||
case "d":
|
||||
return fmt.Errorf("task already dead")
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// PauseQueue pauses task processing on the specified queue.
|
||||
// If the queue is already paused, it will return a non-nil error.
|
||||
func (i *Inspector) PauseQueue(qname string) error {
|
||||
return i.rdb.Pause(qname)
|
||||
}
|
||||
|
||||
// UnpauseQueue resumes task processing on the specified queue.
|
||||
// If the queue is not paused, it will return a non-nil error.
|
||||
func (i *Inspector) UnpauseQueue(qname string) error {
|
||||
return i.rdb.Unpause(qname)
|
||||
}
|
1641
inspector_test.go
1641
inspector_test.go
File diff suppressed because it is too large
Load Diff
@@ -17,6 +17,12 @@ import (
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
)
|
||||
|
||||
// ZSetEntry is an entry in redis sorted set.
|
||||
type ZSetEntry struct {
|
||||
Msg *base.TaskMessage
|
||||
Score float64
|
||||
}
|
||||
|
||||
// SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages.
|
||||
var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage {
|
||||
out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it
|
||||
@@ -27,10 +33,10 @@ var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage
|
||||
})
|
||||
|
||||
// SortZSetEntryOpt is an cmp.Option to sort ZSetEntry for comparing slice of zset entries.
|
||||
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []base.Z) []base.Z {
|
||||
out := append([]base.Z(nil), in...) // Copy input to avoid mutating it
|
||||
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) []ZSetEntry {
|
||||
out := append([]ZSetEntry(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].Message.ID.String() < out[j].Message.ID.String()
|
||||
return out[i].Msg.ID.String() < out[j].Msg.ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
@@ -171,15 +177,6 @@ func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage,
|
||||
seedRedisList(tb, r, queue, msgs)
|
||||
}
|
||||
|
||||
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
|
||||
//
|
||||
// enqueued maps a queue name to a list of messages.
|
||||
func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][]*base.TaskMessage) {
|
||||
for q, msgs := range enqueued {
|
||||
SeedEnqueuedQueue(tb, r, msgs, q)
|
||||
}
|
||||
}
|
||||
|
||||
// SeedInProgressQueue initializes the in-progress queue with the given messages.
|
||||
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
|
||||
tb.Helper()
|
||||
@@ -187,25 +184,25 @@ func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessag
|
||||
}
|
||||
|
||||
// SeedScheduledQueue initializes the scheduled queue with the given messages.
|
||||
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
|
||||
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
|
||||
tb.Helper()
|
||||
seedRedisZSet(tb, r, base.ScheduledQueue, entries)
|
||||
}
|
||||
|
||||
// SeedRetryQueue initializes the retry queue with the given messages.
|
||||
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
|
||||
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
|
||||
tb.Helper()
|
||||
seedRedisZSet(tb, r, base.RetryQueue, entries)
|
||||
}
|
||||
|
||||
// SeedDeadQueue initializes the dead queue with the given messages.
|
||||
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
|
||||
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
|
||||
tb.Helper()
|
||||
seedRedisZSet(tb, r, base.DeadQueue, entries)
|
||||
}
|
||||
|
||||
// SeedDeadlines initializes the deadlines set with the given entries.
|
||||
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z) {
|
||||
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
|
||||
tb.Helper()
|
||||
seedRedisZSet(tb, r, base.KeyDeadlines, entries)
|
||||
}
|
||||
@@ -219,9 +216,9 @@ func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.Task
|
||||
}
|
||||
}
|
||||
|
||||
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) {
|
||||
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry) {
|
||||
for _, item := range items {
|
||||
z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)}
|
||||
z := &redis.Z{Member: MustMarshal(tb, item.Msg), Score: float64(item.Score)}
|
||||
if err := c.ZAdd(key, z).Err(); err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
@@ -265,25 +262,25 @@ func GetDeadMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
|
||||
}
|
||||
|
||||
// GetScheduledEntries returns all task messages and its score in the scheduled queue.
|
||||
func GetScheduledEntries(tb testing.TB, r *redis.Client) []base.Z {
|
||||
func GetScheduledEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
|
||||
tb.Helper()
|
||||
return getZSetEntries(tb, r, base.ScheduledQueue)
|
||||
}
|
||||
|
||||
// GetRetryEntries returns all task messages and its score in the retry queue.
|
||||
func GetRetryEntries(tb testing.TB, r *redis.Client) []base.Z {
|
||||
func GetRetryEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
|
||||
tb.Helper()
|
||||
return getZSetEntries(tb, r, base.RetryQueue)
|
||||
}
|
||||
|
||||
// GetDeadEntries returns all task messages and its score in the dead queue.
|
||||
func GetDeadEntries(tb testing.TB, r *redis.Client) []base.Z {
|
||||
func GetDeadEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
|
||||
tb.Helper()
|
||||
return getZSetEntries(tb, r, base.DeadQueue)
|
||||
}
|
||||
|
||||
// GetDeadlinesEntries returns all task messages and its score in the deadlines set.
|
||||
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []base.Z {
|
||||
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
|
||||
tb.Helper()
|
||||
return getZSetEntries(tb, r, base.KeyDeadlines)
|
||||
}
|
||||
@@ -298,13 +295,13 @@ func getZSetMessages(tb testing.TB, r *redis.Client, zset string) []*base.TaskMe
|
||||
return MustUnmarshalSlice(tb, data)
|
||||
}
|
||||
|
||||
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []base.Z {
|
||||
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []ZSetEntry {
|
||||
data := r.ZRangeWithScores(zset, 0, -1).Val()
|
||||
var entries []base.Z
|
||||
var entries []ZSetEntry
|
||||
for _, z := range data {
|
||||
entries = append(entries, base.Z{
|
||||
Message: MustUnmarshal(tb, z.Member.(string)),
|
||||
Score: int64(z.Score),
|
||||
entries = append(entries, ZSetEntry{
|
||||
Msg: MustUnmarshal(tb, z.Member.(string)),
|
||||
Score: z.Score,
|
||||
})
|
||||
}
|
||||
return entries
|
||||
|
@@ -133,12 +133,6 @@ func DecodeMessage(s string) (*TaskMessage, error) {
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// Z represents sorted set member.
|
||||
type Z struct {
|
||||
Message *TaskMessage
|
||||
Score int64
|
||||
}
|
||||
|
||||
// ServerStatus represents status of a server.
|
||||
// ServerStatus methods are concurrency safe.
|
||||
type ServerStatus struct {
|
||||
@@ -263,7 +257,6 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
|
||||
//
|
||||
// See rdb.RDB as a reference implementation.
|
||||
type Broker interface {
|
||||
Ping() error
|
||||
Enqueue(msg *TaskMessage) error
|
||||
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
|
||||
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
||||
|
@@ -51,6 +51,56 @@ type DailyStats struct {
|
||||
Time time.Time
|
||||
}
|
||||
|
||||
// EnqueuedTask is a task in a queue and is ready to be processed.
|
||||
type EnqueuedTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
Queue string
|
||||
}
|
||||
|
||||
// InProgressTask is a task that's currently being processed.
|
||||
type InProgressTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
}
|
||||
|
||||
// ScheduledTask is a task that's scheduled to be processed in the future.
|
||||
type ScheduledTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
ProcessAt time.Time
|
||||
Score int64
|
||||
Queue string
|
||||
}
|
||||
|
||||
// RetryTask is a task that's in retry queue because worker failed to process the task.
|
||||
type RetryTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
// TODO(hibiken): add LastFailedAt time.Time
|
||||
ProcessAt time.Time
|
||||
ErrorMsg string
|
||||
Retried int
|
||||
Retry int
|
||||
Score int64
|
||||
Queue string
|
||||
}
|
||||
|
||||
// DeadTask is a task in that has exhausted all retries.
|
||||
type DeadTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
LastFailedAt time.Time
|
||||
ErrorMsg string
|
||||
Score int64
|
||||
Queue string
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:queues
|
||||
// KEYS[2] -> asynq:in_progress
|
||||
// KEYS[3] -> asynq:scheduled
|
||||
@@ -239,79 +289,158 @@ func (p Pagination) stop() int64 {
|
||||
}
|
||||
|
||||
// ListEnqueued returns enqueued tasks that are ready to be processed.
|
||||
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
|
||||
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) {
|
||||
qkey := base.QueueKey(qname)
|
||||
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
|
||||
return nil, fmt.Errorf("queue %q does not exist", qname)
|
||||
}
|
||||
return r.listMessages(qkey, pgn)
|
||||
}
|
||||
|
||||
// ListInProgress returns all tasks that are currently being processed.
|
||||
func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error) {
|
||||
return r.listMessages(base.InProgressQueue, pgn)
|
||||
}
|
||||
|
||||
// listMessages returns a list of TaskMessage in Redis list with the given key.
|
||||
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
|
||||
// Note: Because we use LPUSH to redis list, we need to calculate the
|
||||
// correct range and reverse the list to get the tasks with pagination.
|
||||
stop := -pgn.start() - 1
|
||||
start := -pgn.stop() - 1
|
||||
data, err := r.client.LRange(key, start, stop).Result()
|
||||
data, err := r.client.LRange(qkey, start, stop).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reverse(data)
|
||||
var msgs []*base.TaskMessage
|
||||
var tasks []*EnqueuedTask
|
||||
for _, s := range data {
|
||||
m, err := base.DecodeMessage(s)
|
||||
var msg base.TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
msgs = append(msgs, m)
|
||||
tasks = append(tasks, &EnqueuedTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
Queue: msg.Queue,
|
||||
})
|
||||
}
|
||||
return msgs, nil
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListInProgress returns all tasks that are currently being processed.
|
||||
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) {
|
||||
// Note: Because we use LPUSH to redis list, we need to calculate the
|
||||
// correct range and reverse the list to get the tasks with pagination.
|
||||
stop := -pgn.start() - 1
|
||||
start := -pgn.stop() - 1
|
||||
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reverse(data)
|
||||
var tasks []*InProgressTask
|
||||
for _, s := range data {
|
||||
var msg base.TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
tasks = append(tasks, &InProgressTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListScheduled returns all tasks that are scheduled to be processed
|
||||
// in the future.
|
||||
func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error) {
|
||||
return r.listZSetEntries(base.ScheduledQueue, pgn)
|
||||
}
|
||||
|
||||
// ListRetry returns all tasks that have failed before and willl be retried
|
||||
// in the future.
|
||||
func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error) {
|
||||
return r.listZSetEntries(base.RetryQueue, pgn)
|
||||
}
|
||||
|
||||
// ListDead returns all tasks that have exhausted its retry limit.
|
||||
func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error) {
|
||||
return r.listZSetEntries(base.DeadQueue, pgn)
|
||||
}
|
||||
|
||||
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
|
||||
// with the given key.
|
||||
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
|
||||
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
|
||||
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var res []base.Z
|
||||
var tasks []*ScheduledTask
|
||||
for _, z := range data {
|
||||
s, ok := z.Member.(string)
|
||||
if !ok {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
msg, err := base.DecodeMessage(s)
|
||||
var msg base.TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
res = append(res, base.Z{msg, int64(z.Score)})
|
||||
processAt := time.Unix(int64(z.Score), 0)
|
||||
tasks = append(tasks, &ScheduledTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
Queue: msg.Queue,
|
||||
ProcessAt: processAt,
|
||||
Score: int64(z.Score),
|
||||
})
|
||||
}
|
||||
return res, nil
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListRetry returns all tasks that have failed before and willl be retried
|
||||
// in the future.
|
||||
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*RetryTask
|
||||
for _, z := range data {
|
||||
s, ok := z.Member.(string)
|
||||
if !ok {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
var msg base.TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
processAt := time.Unix(int64(z.Score), 0)
|
||||
tasks = append(tasks, &RetryTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
ErrorMsg: msg.ErrorMsg,
|
||||
Retry: msg.Retry,
|
||||
Retried: msg.Retried,
|
||||
Queue: msg.Queue,
|
||||
ProcessAt: processAt,
|
||||
Score: int64(z.Score),
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListDead returns all tasks that have exhausted its retry limit.
|
||||
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*DeadTask
|
||||
for _, z := range data {
|
||||
s, ok := z.Member.(string)
|
||||
if !ok {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
var msg base.TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
lastFailedAt := time.Unix(int64(z.Score), 0)
|
||||
tasks = append(tasks, &DeadTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
ErrorMsg: msg.ErrorMsg,
|
||||
Queue: msg.Queue,
|
||||
LastFailedAt: lastFailedAt,
|
||||
Score: int64(z.Score),
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
|
||||
@@ -575,40 +704,19 @@ func (r *RDB) deleteTask(zset, id string, score float64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// KEYS[1] -> queue to delete
|
||||
var deleteAllCmd = redis.NewScript(`
|
||||
local n = redis.call("ZCARD", KEYS[1])
|
||||
redis.call("DEL", KEYS[1])
|
||||
return n`)
|
||||
|
||||
// DeleteAllDeadTasks deletes all tasks from the dead queue
|
||||
// and returns the number of tasks deleted.
|
||||
func (r *RDB) DeleteAllDeadTasks() (int64, error) {
|
||||
return r.deleteAll(base.DeadQueue)
|
||||
// DeleteAllDeadTasks deletes all tasks from the dead queue.
|
||||
func (r *RDB) DeleteAllDeadTasks() error {
|
||||
return r.client.Del(base.DeadQueue).Err()
|
||||
}
|
||||
|
||||
// DeleteAllRetryTasks deletes all tasks from the dead queue
|
||||
// and returns the number of tasks deleted.
|
||||
func (r *RDB) DeleteAllRetryTasks() (int64, error) {
|
||||
return r.deleteAll(base.RetryQueue)
|
||||
// DeleteAllRetryTasks deletes all tasks from the dead queue.
|
||||
func (r *RDB) DeleteAllRetryTasks() error {
|
||||
return r.client.Del(base.RetryQueue).Err()
|
||||
}
|
||||
|
||||
// DeleteAllScheduledTasks deletes all tasks from the dead queue
|
||||
// and returns the number of tasks deleted.
|
||||
func (r *RDB) DeleteAllScheduledTasks() (int64, error) {
|
||||
return r.deleteAll(base.ScheduledQueue)
|
||||
}
|
||||
|
||||
func (r *RDB) deleteAll(key string) (int64, error) {
|
||||
res, err := deleteAllCmd.Run(r.client, []string{key}).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("could not cast %v to int64", res)
|
||||
}
|
||||
return n, nil
|
||||
// DeleteAllScheduledTasks deletes all tasks from the dead queue.
|
||||
func (r *RDB) DeleteAllScheduledTasks() error {
|
||||
return r.client.Del(base.ScheduledQueue).Err()
|
||||
}
|
||||
|
||||
// ErrQueueNotFound indicates specified queue does not exist.
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -45,11 +45,6 @@ func (r *RDB) Close() error {
|
||||
return r.client.Close()
|
||||
}
|
||||
|
||||
// Ping checks the connection with redis server.
|
||||
func (r *RDB) Ping() error {
|
||||
return r.client.Ping().Err()
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:queues:<qname>
|
||||
// KEYS[2] -> asynq:queues
|
||||
// ARGV[1] -> task message data
|
||||
|
@@ -148,7 +148,7 @@ func TestDequeue(t *testing.T) {
|
||||
err error
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantInProgress []*base.TaskMessage
|
||||
wantDeadlines []base.Z
|
||||
wantDeadlines []h.ZSetEntry
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
@@ -162,10 +162,10 @@ func TestDequeue(t *testing.T) {
|
||||
"default": {},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{t1},
|
||||
wantDeadlines: []base.Z{
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t1,
|
||||
Score: t1Deadline,
|
||||
Msg: t1,
|
||||
Score: float64(t1Deadline),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -181,7 +181,7 @@ func TestDequeue(t *testing.T) {
|
||||
"default": {},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
@@ -199,10 +199,10 @@ func TestDequeue(t *testing.T) {
|
||||
"low": {t3},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{t2},
|
||||
wantDeadlines: []base.Z{
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t2,
|
||||
Score: t2Deadline,
|
||||
Msg: t2,
|
||||
Score: float64(t2Deadline),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -222,10 +222,10 @@ func TestDequeue(t *testing.T) {
|
||||
"low": {t2, t1},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{t3},
|
||||
wantDeadlines: []base.Z{
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t3,
|
||||
Score: t3Deadline,
|
||||
Msg: t3,
|
||||
Score: float64(t3Deadline),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -245,7 +245,7 @@ func TestDequeue(t *testing.T) {
|
||||
"low": {},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -412,70 +412,70 @@ func TestDone(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
inProgress []*base.TaskMessage // initial state of the in-progress list
|
||||
deadlines []base.Z // initial state of deadlines set
|
||||
deadlines []h.ZSetEntry // initial state of deadlines set
|
||||
target *base.TaskMessage // task to remove
|
||||
wantInProgress []*base.TaskMessage // final state of the in-progress list
|
||||
wantDeadlines []base.Z // final state of the deadline set
|
||||
wantDeadlines []h.ZSetEntry // final state of the deadline set
|
||||
}{
|
||||
{
|
||||
inProgress: []*base.TaskMessage{t1, t2},
|
||||
deadlines: []base.Z{
|
||||
deadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t1,
|
||||
Score: t1Deadline,
|
||||
Msg: t1,
|
||||
Score: float64(t1Deadline),
|
||||
},
|
||||
{
|
||||
Message: t2,
|
||||
Score: t2Deadline,
|
||||
Msg: t2,
|
||||
Score: float64(t2Deadline),
|
||||
},
|
||||
},
|
||||
target: t1,
|
||||
wantInProgress: []*base.TaskMessage{t2},
|
||||
wantDeadlines: []base.Z{
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t2,
|
||||
Score: t2Deadline,
|
||||
Msg: t2,
|
||||
Score: float64(t2Deadline),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
inProgress: []*base.TaskMessage{t1},
|
||||
deadlines: []base.Z{
|
||||
deadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t1,
|
||||
Score: t1Deadline,
|
||||
Msg: t1,
|
||||
Score: float64(t1Deadline),
|
||||
},
|
||||
},
|
||||
target: t1,
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
},
|
||||
{
|
||||
inProgress: []*base.TaskMessage{t1, t2, t3},
|
||||
deadlines: []base.Z{
|
||||
deadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t1,
|
||||
Score: t1Deadline,
|
||||
Msg: t1,
|
||||
Score: float64(t1Deadline),
|
||||
},
|
||||
{
|
||||
Message: t2,
|
||||
Score: t2Deadline,
|
||||
Msg: t2,
|
||||
Score: float64(t2Deadline),
|
||||
},
|
||||
{
|
||||
Message: t3,
|
||||
Score: t3Deadline,
|
||||
Msg: t3,
|
||||
Score: float64(t3Deadline),
|
||||
},
|
||||
},
|
||||
target: t3,
|
||||
wantInProgress: []*base.TaskMessage{t1, t2},
|
||||
wantDeadlines: []base.Z{
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{
|
||||
Message: t1,
|
||||
Score: t1Deadline,
|
||||
Msg: t1,
|
||||
Score: float64(t1Deadline),
|
||||
},
|
||||
{
|
||||
Message: t2,
|
||||
Score: t2Deadline,
|
||||
Msg: t2,
|
||||
Score: float64(t2Deadline),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -560,28 +560,28 @@ func TestRequeue(t *testing.T) {
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage // initial state of queues
|
||||
inProgress []*base.TaskMessage // initial state of the in-progress list
|
||||
deadlines []base.Z // initial state of the deadlines set
|
||||
deadlines []h.ZSetEntry // initial state of the deadlines set
|
||||
target *base.TaskMessage // task to requeue
|
||||
wantEnqueued map[string][]*base.TaskMessage // final state of queues
|
||||
wantInProgress []*base.TaskMessage // final state of the in-progress list
|
||||
wantDeadlines []base.Z // final state of the deadlines set
|
||||
wantDeadlines []h.ZSetEntry // final state of the deadlines set
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {},
|
||||
},
|
||||
inProgress: []*base.TaskMessage{t1, t2},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: t1Deadline},
|
||||
{Message: t2, Score: t2Deadline},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(t1Deadline)},
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
target: t1,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {t1},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{t2},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -589,15 +589,15 @@ func TestRequeue(t *testing.T) {
|
||||
base.DefaultQueueName: {t1},
|
||||
},
|
||||
inProgress: []*base.TaskMessage{t2},
|
||||
deadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
target: t2,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {t1, t2},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
@@ -605,9 +605,9 @@ func TestRequeue(t *testing.T) {
|
||||
"critical": {},
|
||||
},
|
||||
inProgress: []*base.TaskMessage{t2, t3},
|
||||
deadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
{Message: t3, Score: t3Deadline},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
{Msg: t3, Score: float64(t3Deadline)},
|
||||
},
|
||||
target: t3,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
@@ -615,8 +615,8 @@ func TestRequeue(t *testing.T) {
|
||||
"critical": {t3},
|
||||
},
|
||||
wantInProgress: []*base.TaskMessage{t2},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -765,42 +765,42 @@ func TestRetry(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
inProgress []*base.TaskMessage
|
||||
deadlines []base.Z
|
||||
retry []base.Z
|
||||
deadlines []h.ZSetEntry
|
||||
retry []h.ZSetEntry
|
||||
msg *base.TaskMessage
|
||||
processAt time.Time
|
||||
errMsg string
|
||||
wantInProgress []*base.TaskMessage
|
||||
wantDeadlines []base.Z
|
||||
wantRetry []base.Z
|
||||
wantDeadlines []h.ZSetEntry
|
||||
wantRetry []h.ZSetEntry
|
||||
}{
|
||||
{
|
||||
inProgress: []*base.TaskMessage{t1, t2},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: t1Deadline},
|
||||
{Message: t2, Score: t2Deadline},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(t1Deadline)},
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
retry: []base.Z{
|
||||
retry: []h.ZSetEntry{
|
||||
{
|
||||
Message: t3,
|
||||
Score: now.Add(time.Minute).Unix(),
|
||||
Msg: t3,
|
||||
Score: float64(now.Add(time.Minute).Unix()),
|
||||
},
|
||||
},
|
||||
msg: t1,
|
||||
processAt: now.Add(5 * time.Minute),
|
||||
errMsg: errMsg,
|
||||
wantInProgress: []*base.TaskMessage{t2},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
wantRetry: []base.Z{
|
||||
wantRetry: []h.ZSetEntry{
|
||||
{
|
||||
Message: h.TaskMessageAfterRetry(*t1, errMsg),
|
||||
Score: now.Add(5 * time.Minute).Unix(),
|
||||
Msg: h.TaskMessageAfterRetry(*t1, errMsg),
|
||||
Score: float64(now.Add(5 * time.Minute).Unix()),
|
||||
},
|
||||
{
|
||||
Message: t3,
|
||||
Score: now.Add(time.Minute).Unix(),
|
||||
Msg: t3,
|
||||
Score: float64(now.Add(time.Minute).Unix()),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -891,59 +891,59 @@ func TestKill(t *testing.T) {
|
||||
// TODO(hibiken): add test cases for trimming
|
||||
tests := []struct {
|
||||
inProgress []*base.TaskMessage
|
||||
deadlines []base.Z
|
||||
dead []base.Z
|
||||
deadlines []h.ZSetEntry
|
||||
dead []h.ZSetEntry
|
||||
target *base.TaskMessage // task to kill
|
||||
wantInProgress []*base.TaskMessage
|
||||
wantDeadlines []base.Z
|
||||
wantDead []base.Z
|
||||
wantDeadlines []h.ZSetEntry
|
||||
wantDead []h.ZSetEntry
|
||||
}{
|
||||
{
|
||||
inProgress: []*base.TaskMessage{t1, t2},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: t1Deadline},
|
||||
{Message: t2, Score: t2Deadline},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(t1Deadline)},
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
dead: []base.Z{
|
||||
dead: []h.ZSetEntry{
|
||||
{
|
||||
Message: t3,
|
||||
Score: now.Add(-time.Hour).Unix(),
|
||||
Msg: t3,
|
||||
Score: float64(now.Add(-time.Hour).Unix()),
|
||||
},
|
||||
},
|
||||
target: t1,
|
||||
wantInProgress: []*base.TaskMessage{t2},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
},
|
||||
wantDead: []base.Z{
|
||||
wantDead: []h.ZSetEntry{
|
||||
{
|
||||
Message: h.TaskMessageWithError(*t1, errMsg),
|
||||
Score: now.Unix(),
|
||||
Msg: h.TaskMessageWithError(*t1, errMsg),
|
||||
Score: float64(now.Unix()),
|
||||
},
|
||||
{
|
||||
Message: t3,
|
||||
Score: now.Add(-time.Hour).Unix(),
|
||||
Msg: t3,
|
||||
Score: float64(now.Add(-time.Hour).Unix()),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
inProgress: []*base.TaskMessage{t1, t2, t3},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: t1Deadline},
|
||||
{Message: t2, Score: t2Deadline},
|
||||
{Message: t3, Score: t3Deadline},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(t1Deadline)},
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
{Msg: t3, Score: float64(t3Deadline)},
|
||||
},
|
||||
dead: []base.Z{},
|
||||
dead: []h.ZSetEntry{},
|
||||
target: t1,
|
||||
wantInProgress: []*base.TaskMessage{t2, t3},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t2, Score: t2Deadline},
|
||||
{Message: t3, Score: t3Deadline},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(t2Deadline)},
|
||||
{Msg: t3, Score: float64(t3Deadline)},
|
||||
},
|
||||
wantDead: []base.Z{
|
||||
wantDead: []h.ZSetEntry{
|
||||
{
|
||||
Message: h.TaskMessageWithError(*t1, errMsg),
|
||||
Score: now.Unix(),
|
||||
Msg: h.TaskMessageWithError(*t1, errMsg),
|
||||
Score: float64(now.Unix()),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1009,19 +1009,19 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
hourFromNow := time.Now().Add(time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
scheduled []base.Z
|
||||
retry []base.Z
|
||||
scheduled []h.ZSetEntry
|
||||
retry []h.ZSetEntry
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantScheduled []*base.TaskMessage
|
||||
wantRetry []*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
scheduled: []base.Z{
|
||||
{Message: t1, Score: secondAgo.Unix()},
|
||||
{Message: t2, Score: secondAgo.Unix()},
|
||||
scheduled: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(secondAgo.Unix())},
|
||||
{Msg: t2, Score: float64(secondAgo.Unix())},
|
||||
},
|
||||
retry: []base.Z{
|
||||
{Message: t3, Score: secondAgo.Unix()}},
|
||||
retry: []h.ZSetEntry{
|
||||
{Msg: t3, Score: float64(secondAgo.Unix())}},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
},
|
||||
@@ -1029,11 +1029,11 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
wantRetry: []*base.TaskMessage{},
|
||||
},
|
||||
{
|
||||
scheduled: []base.Z{
|
||||
{Message: t1, Score: hourFromNow.Unix()},
|
||||
{Message: t2, Score: secondAgo.Unix()}},
|
||||
retry: []base.Z{
|
||||
{Message: t3, Score: secondAgo.Unix()}},
|
||||
scheduled: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(hourFromNow.Unix())},
|
||||
{Msg: t2, Score: float64(secondAgo.Unix())}},
|
||||
retry: []h.ZSetEntry{
|
||||
{Msg: t3, Score: float64(secondAgo.Unix())}},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": {t2, t3},
|
||||
},
|
||||
@@ -1041,11 +1041,11 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
wantRetry: []*base.TaskMessage{},
|
||||
},
|
||||
{
|
||||
scheduled: []base.Z{
|
||||
{Message: t1, Score: hourFromNow.Unix()},
|
||||
{Message: t2, Score: hourFromNow.Unix()}},
|
||||
retry: []base.Z{
|
||||
{Message: t3, Score: hourFromNow.Unix()}},
|
||||
scheduled: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(hourFromNow.Unix())},
|
||||
{Msg: t2, Score: float64(hourFromNow.Unix())}},
|
||||
retry: []h.ZSetEntry{
|
||||
{Msg: t3, Score: float64(hourFromNow.Unix())}},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
@@ -1053,12 +1053,12 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
wantRetry: []*base.TaskMessage{t3},
|
||||
},
|
||||
{
|
||||
scheduled: []base.Z{
|
||||
{Message: t1, Score: secondAgo.Unix()},
|
||||
{Message: t4, Score: secondAgo.Unix()},
|
||||
scheduled: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(secondAgo.Unix())},
|
||||
{Msg: t4, Score: float64(secondAgo.Unix())},
|
||||
},
|
||||
retry: []base.Z{
|
||||
{Message: t5, Score: secondAgo.Unix()}},
|
||||
retry: []h.ZSetEntry{
|
||||
{Msg: t5, Score: float64(secondAgo.Unix())}},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t4},
|
||||
@@ -1112,41 +1112,41 @@ func TestListDeadlineExceeded(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
deadlines []base.Z
|
||||
deadlines []h.ZSetEntry
|
||||
t time.Time
|
||||
want []*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
desc: "with one task in-progress",
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: fiveMinutesAgo.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(fiveMinutesAgo.Unix())},
|
||||
},
|
||||
t: time.Now(),
|
||||
want: []*base.TaskMessage{t1},
|
||||
},
|
||||
{
|
||||
desc: "with multiple tasks in-progress, and one expired",
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: oneHourAgo.Unix()},
|
||||
{Message: t2, Score: fiveMinutesFromNow.Unix()},
|
||||
{Message: t3, Score: oneHourFromNow.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(oneHourAgo.Unix())},
|
||||
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
|
||||
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
|
||||
},
|
||||
t: time.Now(),
|
||||
want: []*base.TaskMessage{t1},
|
||||
},
|
||||
{
|
||||
desc: "with multiple expired tasks in-progress",
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: oneHourAgo.Unix()},
|
||||
{Message: t2, Score: fiveMinutesAgo.Unix()},
|
||||
{Message: t3, Score: oneHourFromNow.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(oneHourAgo.Unix())},
|
||||
{Msg: t2, Score: float64(fiveMinutesAgo.Unix())},
|
||||
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
|
||||
},
|
||||
t: time.Now(),
|
||||
want: []*base.TaskMessage{t1, t2},
|
||||
},
|
||||
{
|
||||
desc: "with empty in-progress queue",
|
||||
deadlines: []base.Z{},
|
||||
deadlines: []h.ZSetEntry{},
|
||||
t: time.Now(),
|
||||
want: []*base.TaskMessage{},
|
||||
},
|
||||
|
@@ -180,15 +180,6 @@ func (tb *TestBroker) PublishCancelation(id string) error {
|
||||
return tb.real.PublishCancelation(id)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) Ping() error {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
if tb.sleeping {
|
||||
return errRedisDown
|
||||
}
|
||||
return tb.real.Ping()
|
||||
}
|
||||
|
||||
func (tb *TestBroker) Close() error {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
|
23
processor.go
23
processor.go
@@ -191,19 +191,9 @@ func (p *processor) exec() {
|
||||
p.cancelations.Delete(msg.ID.String())
|
||||
}()
|
||||
|
||||
// check context before starting a worker goroutine.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// already canceled (e.g. deadline exceeded).
|
||||
p.retryOrKill(ctx, msg, ctx.Err())
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
resCh := make(chan error, 1)
|
||||
go func() {
|
||||
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
|
||||
}()
|
||||
task := NewTask(msg.Type, msg.Payload)
|
||||
go func() { resCh <- perform(ctx, task, p.handler) }()
|
||||
|
||||
select {
|
||||
case <-p.abort:
|
||||
@@ -212,6 +202,7 @@ func (p *processor) exec() {
|
||||
p.requeue(msg)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
|
||||
p.retryOrKill(ctx, msg, ctx.Err())
|
||||
return
|
||||
case resErr := <-resCh:
|
||||
@@ -220,6 +211,9 @@ func (p *processor) exec() {
|
||||
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||
if resErr != nil {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(ctx, task, resErr)
|
||||
}
|
||||
p.retryOrKill(ctx, msg, resErr)
|
||||
return
|
||||
}
|
||||
@@ -258,11 +252,7 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
||||
}
|
||||
|
||||
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
||||
}
|
||||
if msg.Retried >= msg.Retry {
|
||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||
p.kill(ctx, msg, err)
|
||||
} else {
|
||||
p.retry(ctx, msg, err)
|
||||
@@ -291,6 +281,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
}
|
||||
|
||||
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||
err := p.broker.Kill(msg, e.Error())
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
||||
|
@@ -223,7 +223,7 @@ func TestProcessorRetry(t *testing.T) {
|
||||
delay time.Duration // retry delay duration
|
||||
handler Handler // task handler
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantRetry []base.Z // tasks in retry queue at the end
|
||||
wantRetry []h.ZSetEntry // tasks in retry queue at the end
|
||||
wantDead []*base.TaskMessage // tasks in dead queue at the end
|
||||
wantErrCount int // number of times error handler should be called
|
||||
}{
|
||||
@@ -235,10 +235,10 @@ func TestProcessorRetry(t *testing.T) {
|
||||
return fmt.Errorf(errMsg)
|
||||
}),
|
||||
wait: 2 * time.Second,
|
||||
wantRetry: []base.Z{
|
||||
{Message: h.TaskMessageAfterRetry(*m2, errMsg), Score: now.Add(time.Minute).Unix()},
|
||||
{Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()},
|
||||
{Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()},
|
||||
wantRetry: []h.ZSetEntry{
|
||||
{Msg: h.TaskMessageAfterRetry(*m2, errMsg), Score: float64(now.Add(time.Minute).Unix())},
|
||||
{Msg: h.TaskMessageAfterRetry(*m3, errMsg), Score: float64(now.Add(time.Minute).Unix())},
|
||||
{Msg: h.TaskMessageAfterRetry(*m4, errMsg), Score: float64(now.Add(time.Minute).Unix())},
|
||||
},
|
||||
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
|
||||
wantErrCount: 4,
|
||||
|
@@ -34,24 +34,24 @@ func TestRecoverer(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
inProgress []*base.TaskMessage
|
||||
deadlines []base.Z
|
||||
retry []base.Z
|
||||
dead []base.Z
|
||||
deadlines []h.ZSetEntry
|
||||
retry []h.ZSetEntry
|
||||
dead []h.ZSetEntry
|
||||
wantInProgress []*base.TaskMessage
|
||||
wantDeadlines []base.Z
|
||||
wantDeadlines []h.ZSetEntry
|
||||
wantRetry []*base.TaskMessage
|
||||
wantDead []*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
desc: "with one task in-progress",
|
||||
inProgress: []*base.TaskMessage{t1},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: fiveMinutesAgo.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(fiveMinutesAgo.Unix())},
|
||||
},
|
||||
retry: []base.Z{},
|
||||
dead: []base.Z{},
|
||||
retry: []h.ZSetEntry{},
|
||||
dead: []h.ZSetEntry{},
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
wantRetry: []*base.TaskMessage{
|
||||
h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
|
||||
},
|
||||
@@ -60,30 +60,30 @@ func TestRecoverer(t *testing.T) {
|
||||
{
|
||||
desc: "with a task with max-retry reached",
|
||||
inProgress: []*base.TaskMessage{t4},
|
||||
deadlines: []base.Z{
|
||||
{Message: t4, Score: fiveMinutesAgo.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t4, Score: float64(fiveMinutesAgo.Unix())},
|
||||
},
|
||||
retry: []base.Z{},
|
||||
dead: []base.Z{},
|
||||
retry: []h.ZSetEntry{},
|
||||
dead: []h.ZSetEntry{},
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
wantRetry: []*base.TaskMessage{},
|
||||
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")},
|
||||
},
|
||||
{
|
||||
desc: "with multiple tasks in-progress, and one expired",
|
||||
inProgress: []*base.TaskMessage{t1, t2, t3},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: oneHourAgo.Unix()},
|
||||
{Message: t2, Score: fiveMinutesFromNow.Unix()},
|
||||
{Message: t3, Score: oneHourFromNow.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(oneHourAgo.Unix())},
|
||||
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
|
||||
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
|
||||
},
|
||||
retry: []base.Z{},
|
||||
dead: []base.Z{},
|
||||
retry: []h.ZSetEntry{},
|
||||
dead: []h.ZSetEntry{},
|
||||
wantInProgress: []*base.TaskMessage{t2, t3},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t2, Score: fiveMinutesFromNow.Unix()},
|
||||
{Message: t3, Score: oneHourFromNow.Unix()},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
|
||||
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
|
||||
},
|
||||
wantRetry: []*base.TaskMessage{
|
||||
h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
|
||||
@@ -93,16 +93,16 @@ func TestRecoverer(t *testing.T) {
|
||||
{
|
||||
desc: "with multiple expired tasks in-progress",
|
||||
inProgress: []*base.TaskMessage{t1, t2, t3},
|
||||
deadlines: []base.Z{
|
||||
{Message: t1, Score: oneHourAgo.Unix()},
|
||||
{Message: t2, Score: fiveMinutesAgo.Unix()},
|
||||
{Message: t3, Score: oneHourFromNow.Unix()},
|
||||
deadlines: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(oneHourAgo.Unix())},
|
||||
{Msg: t2, Score: float64(fiveMinutesAgo.Unix())},
|
||||
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
|
||||
},
|
||||
retry: []base.Z{},
|
||||
dead: []base.Z{},
|
||||
retry: []h.ZSetEntry{},
|
||||
dead: []h.ZSetEntry{},
|
||||
wantInProgress: []*base.TaskMessage{t3},
|
||||
wantDeadlines: []base.Z{
|
||||
{Message: t3, Score: oneHourFromNow.Unix()},
|
||||
wantDeadlines: []h.ZSetEntry{
|
||||
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
|
||||
},
|
||||
wantRetry: []*base.TaskMessage{
|
||||
h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
|
||||
@@ -113,11 +113,11 @@ func TestRecoverer(t *testing.T) {
|
||||
{
|
||||
desc: "with empty in-progress queue",
|
||||
inProgress: []*base.TaskMessage{},
|
||||
deadlines: []base.Z{},
|
||||
retry: []base.Z{},
|
||||
dead: []base.Z{},
|
||||
deadlines: []h.ZSetEntry{},
|
||||
retry: []h.ZSetEntry{},
|
||||
dead: []h.ZSetEntry{},
|
||||
wantInProgress: []*base.TaskMessage{},
|
||||
wantDeadlines: []base.Z{},
|
||||
wantDeadlines: []h.ZSetEntry{},
|
||||
wantRetry: []*base.TaskMessage{},
|
||||
wantDead: []*base.TaskMessage{},
|
||||
},
|
||||
|
@@ -31,8 +31,8 @@ func TestScheduler(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
initScheduled []base.Z // scheduled queue initial state
|
||||
initRetry []base.Z // retry queue initial state
|
||||
initScheduled []h.ZSetEntry // scheduled queue initial state
|
||||
initRetry []h.ZSetEntry // retry queue initial state
|
||||
initQueue []*base.TaskMessage // default queue initial state
|
||||
wait time.Duration // wait duration before checking for final state
|
||||
wantScheduled []*base.TaskMessage // schedule queue final state
|
||||
@@ -40,12 +40,12 @@ func TestScheduler(t *testing.T) {
|
||||
wantQueue []*base.TaskMessage // default queue final state
|
||||
}{
|
||||
{
|
||||
initScheduled: []base.Z{
|
||||
{Message: t1, Score: now.Add(time.Hour).Unix()},
|
||||
{Message: t2, Score: now.Add(-2 * time.Second).Unix()},
|
||||
initScheduled: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(now.Add(time.Hour).Unix())},
|
||||
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
|
||||
},
|
||||
initRetry: []base.Z{
|
||||
{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()},
|
||||
initRetry: []h.ZSetEntry{
|
||||
{Msg: t3, Score: float64(time.Now().Add(-500 * time.Millisecond).Unix())},
|
||||
},
|
||||
initQueue: []*base.TaskMessage{t4},
|
||||
wait: pollInterval * 2,
|
||||
@@ -54,12 +54,12 @@ func TestScheduler(t *testing.T) {
|
||||
wantQueue: []*base.TaskMessage{t2, t3, t4},
|
||||
},
|
||||
{
|
||||
initScheduled: []base.Z{
|
||||
{Message: t1, Score: now.Unix()},
|
||||
{Message: t2, Score: now.Add(-2 * time.Second).Unix()},
|
||||
{Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()},
|
||||
initScheduled: []h.ZSetEntry{
|
||||
{Msg: t1, Score: float64(now.Unix())},
|
||||
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
|
||||
{Msg: t3, Score: float64(now.Add(-500 * time.Millisecond).Unix())},
|
||||
},
|
||||
initRetry: []base.Z{},
|
||||
initRetry: []h.ZSetEntry{},
|
||||
initQueue: []*base.TaskMessage{t4},
|
||||
wait: pollInterval * 2,
|
||||
wantScheduled: []*base.TaskMessage{},
|
||||
|
63
server.go
63
server.go
@@ -40,14 +40,13 @@ type Server struct {
|
||||
status *base.ServerStatus
|
||||
|
||||
// wait group to wait for all goroutines to finish.
|
||||
wg sync.WaitGroup
|
||||
scheduler *scheduler
|
||||
processor *processor
|
||||
syncer *syncer
|
||||
heartbeater *heartbeater
|
||||
subscriber *subscriber
|
||||
recoverer *recoverer
|
||||
healthchecker *healthchecker
|
||||
wg sync.WaitGroup
|
||||
scheduler *scheduler
|
||||
processor *processor
|
||||
syncer *syncer
|
||||
heartbeater *heartbeater
|
||||
subscriber *subscriber
|
||||
recoverer *recoverer
|
||||
}
|
||||
|
||||
// Config specifies the server's background-task processing behavior.
|
||||
@@ -124,18 +123,9 @@ type Config struct {
|
||||
//
|
||||
// If unset or zero, default timeout of 8 seconds is used.
|
||||
ShutdownTimeout time.Duration
|
||||
|
||||
// HealthCheckFunc is called periodically with any errors encountered during ping to the
|
||||
// connected redis server.
|
||||
HealthCheckFunc func(error)
|
||||
|
||||
// HealthCheckInterval specifies the interval between healthchecks.
|
||||
//
|
||||
// If unset or zero, the interval is set to 15 seconds.
|
||||
HealthCheckInterval time.Duration
|
||||
}
|
||||
|
||||
// An ErrorHandler handles an error occured during task processing.
|
||||
// An ErrorHandler handles errors returned by the task handler.
|
||||
type ErrorHandler interface {
|
||||
HandleError(ctx context.Context, task *Task, err error)
|
||||
}
|
||||
@@ -260,11 +250,7 @@ var defaultQueueConfig = map[string]int{
|
||||
base.DefaultQueueName: 1,
|
||||
}
|
||||
|
||||
const (
|
||||
defaultShutdownTimeout = 8 * time.Second
|
||||
|
||||
defaultHealthCheckInterval = 15 * time.Second
|
||||
)
|
||||
const defaultShutdownTimeout = 8 * time.Second
|
||||
|
||||
// NewServer returns a new Server given a redis connection option
|
||||
// and background processing configuration.
|
||||
@@ -290,10 +276,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
if shutdownTimeout == 0 {
|
||||
shutdownTimeout = defaultShutdownTimeout
|
||||
}
|
||||
healthcheckInterval := cfg.HealthCheckInterval
|
||||
if healthcheckInterval == 0 {
|
||||
healthcheckInterval = defaultHealthCheckInterval
|
||||
}
|
||||
logger := log.NewLogger(cfg.Logger)
|
||||
loglevel := cfg.LogLevel
|
||||
if loglevel == level_unspecified {
|
||||
@@ -354,23 +336,16 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
retryDelayFunc: delayFunc,
|
||||
interval: 1 * time.Minute,
|
||||
})
|
||||
healthchecker := newHealthChecker(healthcheckerParams{
|
||||
logger: logger,
|
||||
broker: rdb,
|
||||
interval: healthcheckInterval,
|
||||
healthcheckFunc: cfg.HealthCheckFunc,
|
||||
})
|
||||
return &Server{
|
||||
logger: logger,
|
||||
broker: rdb,
|
||||
status: status,
|
||||
scheduler: scheduler,
|
||||
processor: processor,
|
||||
syncer: syncer,
|
||||
heartbeater: heartbeater,
|
||||
subscriber: subscriber,
|
||||
recoverer: recoverer,
|
||||
healthchecker: healthchecker,
|
||||
logger: logger,
|
||||
broker: rdb,
|
||||
status: status,
|
||||
scheduler: scheduler,
|
||||
processor: processor,
|
||||
syncer: syncer,
|
||||
heartbeater: heartbeater,
|
||||
subscriber: subscriber,
|
||||
recoverer: recoverer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -438,7 +413,6 @@ func (srv *Server) Start(handler Handler) error {
|
||||
srv.logger.Info("Starting processing")
|
||||
|
||||
srv.heartbeater.start(&srv.wg)
|
||||
srv.healthchecker.start(&srv.wg)
|
||||
srv.subscriber.start(&srv.wg)
|
||||
srv.syncer.start(&srv.wg)
|
||||
srv.recoverer.start(&srv.wg)
|
||||
@@ -468,7 +442,6 @@ func (srv *Server) Stop() {
|
||||
srv.recoverer.terminate()
|
||||
srv.syncer.terminate()
|
||||
srv.subscriber.terminate()
|
||||
srv.healthchecker.terminate()
|
||||
srv.heartbeater.terminate()
|
||||
|
||||
srv.wg.Wait()
|
||||
|
@@ -8,14 +8,15 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// delCmd represents the del command
|
||||
var delCmd = &cobra.Command{
|
||||
Use: "del [task key]",
|
||||
Use: "del [task id]",
|
||||
Short: "Deletes a task given an identifier",
|
||||
Long: `Del (asynq del) will delete a task given an identifier.
|
||||
|
||||
@@ -43,12 +44,27 @@ func init() {
|
||||
}
|
||||
|
||||
func del(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
id, score, qtype, err := parseQueryID(args[0])
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
r := rdb.NewRDB(redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
err := i.DeleteTaskByKey(args[0])
|
||||
}))
|
||||
switch qtype {
|
||||
case "s":
|
||||
err = r.DeleteScheduledTask(id, score)
|
||||
case "r":
|
||||
err = r.DeleteRetryTask(id, score)
|
||||
case "d":
|
||||
err = r.DeleteDeadTask(id, score)
|
||||
default:
|
||||
fmt.Println("invalid argument")
|
||||
os.Exit(1)
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
|
@@ -8,7 +8,8 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
@@ -44,22 +45,20 @@ func init() {
|
||||
}
|
||||
|
||||
func delall(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
var (
|
||||
n int
|
||||
err error
|
||||
)
|
||||
r := rdb.NewRDB(c)
|
||||
var err error
|
||||
switch args[0] {
|
||||
case "scheduled":
|
||||
n, err = i.DeleteAllScheduledTasks()
|
||||
err = r.DeleteAllScheduledTasks()
|
||||
case "retry":
|
||||
n, err = i.DeleteAllRetryTasks()
|
||||
err = r.DeleteAllRetryTasks()
|
||||
case "dead":
|
||||
n, err = i.DeleteAllDeadTasks()
|
||||
err = r.DeleteAllDeadTasks()
|
||||
default:
|
||||
fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs)
|
||||
os.Exit(1)
|
||||
@@ -68,5 +67,5 @@ func delall(cmd *cobra.Command, args []string) {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("Deleted all %d tasks in %q state\n", n, args[0])
|
||||
fmt.Printf("Deleted all tasks in %q state\n", args[0])
|
||||
}
|
||||
|
@@ -8,14 +8,15 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// enqCmd represents the enq command
|
||||
var enqCmd = &cobra.Command{
|
||||
Use: "enq [task key]",
|
||||
Use: "enq [task id]",
|
||||
Short: "Enqueues a task given an identifier",
|
||||
Long: `Enq (asynq enq) will enqueue a task given an identifier.
|
||||
|
||||
@@ -46,12 +47,27 @@ func init() {
|
||||
}
|
||||
|
||||
func enq(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
id, score, qtype, err := parseQueryID(args[0])
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
r := rdb.NewRDB(redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
err := i.EnqueueTaskByKey(args[0])
|
||||
}))
|
||||
switch qtype {
|
||||
case "s":
|
||||
err = r.EnqueueScheduledTask(id, score)
|
||||
case "r":
|
||||
err = r.EnqueueRetryTask(id, score)
|
||||
case "d":
|
||||
err = r.EnqueueDeadTask(id, score)
|
||||
default:
|
||||
fmt.Println("invalid argument")
|
||||
os.Exit(1)
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
|
@@ -8,7 +8,8 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
@@ -47,22 +48,21 @@ func init() {
|
||||
}
|
||||
|
||||
func enqall(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
var (
|
||||
n int
|
||||
err error
|
||||
)
|
||||
r := rdb.NewRDB(c)
|
||||
var n int64
|
||||
var err error
|
||||
switch args[0] {
|
||||
case "scheduled":
|
||||
n, err = i.EnqueueAllScheduledTasks()
|
||||
n, err = r.EnqueueAllScheduledTasks()
|
||||
case "retry":
|
||||
n, err = i.EnqueueAllRetryTasks()
|
||||
n, err = r.EnqueueAllRetryTasks()
|
||||
case "dead":
|
||||
n, err = i.EnqueueAllDeadTasks()
|
||||
n, err = r.EnqueueAllDeadTasks()
|
||||
default:
|
||||
fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs)
|
||||
os.Exit(1)
|
||||
|
@@ -10,7 +10,8 @@ import (
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
@@ -37,13 +38,14 @@ func init() {
|
||||
}
|
||||
|
||||
func history(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
r := rdb.NewRDB(c)
|
||||
|
||||
stats, err := i.History(days)
|
||||
stats, err := r.HistoricalStats(days)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -51,7 +53,7 @@ func history(cmd *cobra.Command, args []string) {
|
||||
printDailyStats(stats)
|
||||
}
|
||||
|
||||
func printDailyStats(stats []*asynq.DailyStats) {
|
||||
func printDailyStats(stats []*rdb.DailyStats) {
|
||||
format := strings.Repeat("%v\t", 4) + "\n"
|
||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate")
|
||||
@@ -63,7 +65,7 @@ func printDailyStats(stats []*asynq.DailyStats) {
|
||||
} else {
|
||||
errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100)
|
||||
}
|
||||
fmt.Fprintf(tw, format, s.Date.Format("2006-01-02"), s.Processed, s.Failed, errrate)
|
||||
fmt.Fprintf(tw, format, s.Time.Format("2006-01-02"), s.Processed, s.Failed, errrate)
|
||||
}
|
||||
tw.Flush()
|
||||
}
|
||||
|
@@ -8,14 +8,15 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// killCmd represents the kill command
|
||||
var killCmd = &cobra.Command{
|
||||
Use: "kill [task key]",
|
||||
Use: "kill [task id]",
|
||||
Short: "Kills a task given an identifier",
|
||||
Long: `Kill (asynq kill) will put a task in dead state given an identifier.
|
||||
|
||||
@@ -43,12 +44,25 @@ func init() {
|
||||
}
|
||||
|
||||
func kill(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
id, score, qtype, err := parseQueryID(args[0])
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
r := rdb.NewRDB(redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
err := i.KillTaskByKey(args[0])
|
||||
}))
|
||||
switch qtype {
|
||||
case "s":
|
||||
err = r.KillScheduledTask(id, score)
|
||||
case "r":
|
||||
err = r.KillRetryTask(id, score)
|
||||
default:
|
||||
fmt.Println("invalid argument")
|
||||
os.Exit(1)
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
|
@@ -8,7 +8,8 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
@@ -44,20 +45,19 @@ func init() {
|
||||
}
|
||||
|
||||
func killall(cmd *cobra.Command, args []string) {
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
var (
|
||||
n int
|
||||
err error
|
||||
)
|
||||
r := rdb.NewRDB(c)
|
||||
var n int64
|
||||
var err error
|
||||
switch args[0] {
|
||||
case "scheduled":
|
||||
n, err = i.KillAllScheduledTasks()
|
||||
n, err = r.KillAllScheduledTasks()
|
||||
case "retry":
|
||||
n, err = i.KillAllRetryTasks()
|
||||
n, err = r.KillAllRetryTasks()
|
||||
default:
|
||||
fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs)
|
||||
os.Exit(1)
|
||||
|
@@ -8,10 +8,13 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
@@ -59,11 +62,12 @@ func ls(cmd *cobra.Command, args []string) {
|
||||
fmt.Println("page number cannot be negative.")
|
||||
os.Exit(1)
|
||||
}
|
||||
i := asynq.NewInspector(asynq.RedisClientOpt{
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
r := rdb.NewRDB(c)
|
||||
parts := strings.Split(args[0], ":")
|
||||
switch parts[0] {
|
||||
case "enqueued":
|
||||
@@ -71,23 +75,54 @@ func ls(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
listEnqueued(i, parts[1])
|
||||
listEnqueued(r, parts[1])
|
||||
case "inprogress":
|
||||
listInProgress(i)
|
||||
listInProgress(r)
|
||||
case "scheduled":
|
||||
listScheduled(i)
|
||||
listScheduled(r)
|
||||
case "retry":
|
||||
listRetry(i)
|
||||
listRetry(r)
|
||||
case "dead":
|
||||
listDead(i)
|
||||
listDead(r)
|
||||
default:
|
||||
fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func listEnqueued(i *asynq.Inspector, qname string) {
|
||||
tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
// queryID returns an identifier used for "enq" command.
|
||||
// score is the zset score and queryType should be one
|
||||
// of "s", "r" or "d" (scheduled, retry, dead respectively).
|
||||
func queryID(id uuid.UUID, score int64, qtype string) string {
|
||||
const format = "%v:%v:%v"
|
||||
return fmt.Sprintf(format, qtype, score, id)
|
||||
}
|
||||
|
||||
// parseQueryID is a reverse operation of queryID function.
|
||||
// It takes a queryID and return each part of id with proper
|
||||
// type if valid, otherwise it reports an error.
|
||||
func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) {
|
||||
parts := strings.Split(queryID, ":")
|
||||
if len(parts) != 3 {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
id, err = uuid.Parse(parts[2])
|
||||
if err != nil {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
score, err = strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
qtype = parts[0]
|
||||
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
}
|
||||
return id, score, qtype, nil
|
||||
}
|
||||
|
||||
func listEnqueued(r *rdb.RDB, qname string) {
|
||||
tasks, err := r.ListEnqueued(qname, rdb.Pagination{Size: pageSize, Page: pageNum})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -97,16 +132,17 @@ func listEnqueued(i *asynq.Inspector, qname string) {
|
||||
return
|
||||
}
|
||||
cols := []string{"ID", "Type", "Payload", "Queue"}
|
||||
printTable(cols, func(w io.Writer, tmpl string) {
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue)
|
||||
}
|
||||
})
|
||||
}
|
||||
printTable(cols, printRows)
|
||||
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
|
||||
}
|
||||
|
||||
func listInProgress(i *asynq.Inspector) {
|
||||
tasks, err := i.ListInProgressTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
func listInProgress(r *rdb.RDB) {
|
||||
tasks, err := r.ListInProgress(rdb.Pagination{Size: pageSize, Page: pageNum})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -116,16 +152,17 @@ func listInProgress(i *asynq.Inspector) {
|
||||
return
|
||||
}
|
||||
cols := []string{"ID", "Type", "Payload"}
|
||||
printTable(cols, func(w io.Writer, tmpl string) {
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
|
||||
}
|
||||
})
|
||||
}
|
||||
printTable(cols, printRows)
|
||||
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
|
||||
}
|
||||
|
||||
func listScheduled(i *asynq.Inspector) {
|
||||
tasks, err := i.ListScheduledTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
func listScheduled(r *rdb.RDB) {
|
||||
tasks, err := r.ListScheduled(rdb.Pagination{Size: pageSize, Page: pageNum})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -134,19 +171,19 @@ func listScheduled(i *asynq.Inspector) {
|
||||
fmt.Println("No scheduled tasks")
|
||||
return
|
||||
}
|
||||
cols := []string{"Key", "Type", "Payload", "Process In", "Queue"}
|
||||
printTable(cols, func(w io.Writer, tmpl string) {
|
||||
cols := []string{"ID", "Type", "Payload", "Process In", "Queue"}
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
processIn := fmt.Sprintf("%.0f seconds",
|
||||
t.NextEnqueueAt.Sub(time.Now()).Seconds())
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn, t.Queue)
|
||||
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
|
||||
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue)
|
||||
}
|
||||
})
|
||||
}
|
||||
printTable(cols, printRows)
|
||||
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
|
||||
}
|
||||
|
||||
func listRetry(i *asynq.Inspector) {
|
||||
tasks, err := i.ListRetryTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
func listRetry(r *rdb.RDB) {
|
||||
tasks, err := r.ListRetry(rdb.Pagination{Size: pageSize, Page: pageNum})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -155,23 +192,24 @@ func listRetry(i *asynq.Inspector) {
|
||||
fmt.Println("No retry tasks")
|
||||
return
|
||||
}
|
||||
cols := []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"}
|
||||
printTable(cols, func(w io.Writer, tmpl string) {
|
||||
cols := []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"}
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
var nextRetry string
|
||||
if d := t.NextEnqueueAt.Sub(time.Now()); d > 0 {
|
||||
if d := t.ProcessAt.Sub(time.Now()); d > 0 {
|
||||
nextRetry = fmt.Sprintf("in %v", d.Round(time.Second))
|
||||
} else {
|
||||
nextRetry = "right now"
|
||||
}
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry, t.Queue)
|
||||
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.Retry, t.Queue)
|
||||
}
|
||||
})
|
||||
}
|
||||
printTable(cols, printRows)
|
||||
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
|
||||
}
|
||||
|
||||
func listDead(i *asynq.Inspector) {
|
||||
tasks, err := i.ListDeadTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
func listDead(r *rdb.RDB) {
|
||||
tasks, err := r.ListDead(rdb.Pagination{Size: pageSize, Page: pageNum})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -180,11 +218,12 @@ func listDead(i *asynq.Inspector) {
|
||||
fmt.Println("No dead tasks")
|
||||
return
|
||||
}
|
||||
cols := []string{"Key", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
|
||||
printTable(cols, func(w io.Writer, tmpl string) {
|
||||
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
|
||||
printRows := func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
|
||||
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
|
||||
}
|
||||
})
|
||||
}
|
||||
printTable(cols, printRows)
|
||||
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
|
||||
}
|
||||
|
@@ -166,7 +166,6 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
Reference in New Issue
Block a user