2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-08-19 15:08:55 +08:00

Restructure CLI commands with subcommands

This commit is contained in:
Ken Hibino
2020-08-21 06:00:49 -07:00
parent d6f389e63f
commit f38f94b947
24 changed files with 897 additions and 1185 deletions

View File

@@ -28,6 +28,8 @@ type Stats struct {
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the total number of tasks in the queue.
Size int
// Number of tasks in each state.
Enqueued int
InProgress int
@@ -125,20 +127,26 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
Queue: qname,
Timestamp: now,
}
size := 0
for i := 0; i < len(data); i += 2 {
key := cast.ToString(data[i])
val := cast.ToInt(data[i+1])
switch key {
case base.QueueKey(qname):
stats.Enqueued = val
size += val
case base.InProgressKey(qname):
stats.InProgress = val
size += val
case base.ScheduledKey(qname):
stats.Scheduled = val
size += val
case base.RetryKey(qname):
stats.Retry = val
size += val
case base.DeadKey(qname):
stats.Dead = val
size += val
case base.ProcessedKey(qname, now):
stats.Processed = val
case base.FailedKey(qname, now):
@@ -151,6 +159,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
}
}
stats.Size = size
return stats, nil
}
@@ -168,7 +177,14 @@ return res`)
// HistoricalStats returns a list of stats from the last n days for the given queue.
func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
if n < 1 {
return []*DailyStats{}, nil
return nil, fmt.Errorf("the number of days must be positive")
}
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
if err != nil {
return nil, err
}
if !exists {
return nil, &ErrQueueNotFound{qname}
}
const day = 24 * time.Hour
now := time.Now().UTC()
@@ -252,6 +268,9 @@ func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, e
// ListInProgress returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListInProgress(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.InProgressKey(qname), pgn)
}
@@ -281,17 +300,26 @@ func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, err
// ListScheduled returns all tasks from the given queue that are scheduled
// to be processed in the future.
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.ScheduledKey(qname), pgn)
}
// ListRetry returns all tasks from the given queue that have failed before
// and willl be retried in the future.
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.RetryKey(qname), pgn)
}
// ListDead returns all tasks from the given queue that have exhausted its retry limit.
func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.DeadKey(qname), pgn)
}
@@ -615,6 +643,15 @@ func (e *ErrQueueNotFound) Error() string {
return fmt.Sprintf("queue %q does not exist", e.qname)
}
// ErrQueueNotEmpty indicates specified queue is not empty.
type ErrQueueNotEmpty struct {
qname string
}
func (e *ErrQueueNotEmpty) Error() string {
return fmt.Sprintf("queue %q is not empty", e.qname)
}
// Only check whether in-progress queue is empty before removing.
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:in_progress
@@ -650,7 +687,7 @@ local retry = redis.call("SCARD", KEYS[4])
local dead = redis.call("SCARD", KEYS[5])
local total = enqueued + inprogress + scheduled + retry + dead
if total > 0 then
return redis.error_reply("Queue is not empty")
return redis.error_reply("QUEUE NOT EMPTY")
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
@@ -689,8 +726,13 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
base.DeadlinesKey(qname),
}
if err := script.Run(r.client, keys).Err(); err != nil {
return err
if err.Error() == "QUEUE NOT EMPTY" {
return &ErrQueueNotEmpty{qname}
} else {
return err
}
}
return r.client.SRem(base.AllQueues, qname).Err()
}

View File

@@ -112,6 +112,7 @@ func TestCurrentStats(t *testing.T) {
want: &Stats{
Queue: "default",
Paused: false,
Size: 4,
Enqueued: 1,
InProgress: 1,
Scheduled: 2,
@@ -166,6 +167,7 @@ func TestCurrentStats(t *testing.T) {
want: &Stats{
Queue: "critical",
Paused: true,
Size: 1,
Enqueued: 1,
InProgress: 0,
Scheduled: 0,
@@ -232,12 +234,13 @@ func TestHistoricalStats(t *testing.T) {
}{
{"default", 90},
{"custom", 7},
{"default", 0},
{"default", 1},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
r.client.SAdd(base.AllQueues, tc.qname)
// populate last n days data
for i := 0; i < tc.n; i++ {
ts := now.Add(-time.Duration(i) * 24 * time.Hour)