2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Add processed and failed count to Stats

This commit is contained in:
Ken Hibino 2019-12-25 20:17:00 -08:00
parent 6491f46955
commit cde9d41580
2 changed files with 37 additions and 5 deletions

View File

@ -3,6 +3,7 @@ package rdb
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -18,6 +19,8 @@ type Stats struct {
Scheduled int Scheduled int
Retry int Retry int
Dead int Dead int
Processed int
Failed int
Timestamp time.Time Timestamp time.Time
} }
@ -74,23 +77,36 @@ type DeadTask struct {
// CurrentStats returns a current state of the queues. // CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats() (*Stats, error) { func (r *RDB) CurrentStats() (*Stats, error) {
now := time.Now()
pipe := r.client.Pipeline() pipe := r.client.Pipeline()
qlen := pipe.LLen(base.DefaultQueue) qlen := pipe.LLen(base.DefaultQueue)
plen := pipe.LLen(base.InProgressQueue) plen := pipe.LLen(base.InProgressQueue)
slen := pipe.ZCard(base.ScheduledQueue) slen := pipe.ZCard(base.ScheduledQueue)
rlen := pipe.ZCard(base.RetryQueue) rlen := pipe.ZCard(base.RetryQueue)
dlen := pipe.ZCard(base.DeadQueue) dlen := pipe.ZCard(base.DeadQueue)
pcount := pipe.Get(base.ProcessedKey(now))
fcount := pipe.Get(base.FailureKey(now))
_, err := pipe.Exec() _, err := pipe.Exec()
if err != nil { if err != nil {
return nil, err return nil, err
} }
p, err := strconv.Atoi(pcount.Val())
if err != nil {
return nil, err
}
f, err := strconv.Atoi(fcount.Val())
if err != nil {
return nil, err
}
return &Stats{ return &Stats{
Enqueued: int(qlen.Val()), Enqueued: int(qlen.Val()),
InProgress: int(plen.Val()), InProgress: int(plen.Val()),
Scheduled: int(slen.Val()), Scheduled: int(slen.Val()),
Retry: int(rlen.Val()), Retry: int(rlen.Val()),
Dead: int(dlen.Val()), Dead: int(dlen.Val()),
Timestamp: time.Now(), Processed: p,
Failed: f,
Timestamp: now,
}, nil }, nil
} }

View File

@ -55,12 +55,16 @@ func TestCurrentStats(t *testing.T) {
m3 := newTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m3 := newTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
m4 := newTaskMessage("sync", nil) m4 := newTaskMessage("sync", nil)
now := time.Now()
tests := []struct { tests := []struct {
enqueued []*base.TaskMessage enqueued []*base.TaskMessage
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
scheduled []sortedSetEntry scheduled []sortedSetEntry
retry []sortedSetEntry retry []sortedSetEntry
dead []sortedSetEntry dead []sortedSetEntry
processed int
failed int
want *Stats want *Stats
}{ }{
{ {
@ -69,15 +73,19 @@ func TestCurrentStats(t *testing.T) {
scheduled: []sortedSetEntry{ scheduled: []sortedSetEntry{
{m3, time.Now().Add(time.Hour).Unix()}, {m3, time.Now().Add(time.Hour).Unix()},
{m4, time.Now().Unix()}}, {m4, time.Now().Unix()}},
retry: []sortedSetEntry{}, retry: []sortedSetEntry{},
dead: []sortedSetEntry{}, dead: []sortedSetEntry{},
processed: 120,
failed: 2,
want: &Stats{ want: &Stats{
Enqueued: 1, Enqueued: 1,
InProgress: 1, InProgress: 1,
Scheduled: 2, Scheduled: 2,
Retry: 0, Retry: 0,
Dead: 0, Dead: 0,
Timestamp: time.Now(), Processed: 120,
Failed: 2,
Timestamp: now,
}, },
}, },
{ {
@ -90,13 +98,17 @@ func TestCurrentStats(t *testing.T) {
{m1, time.Now().Add(time.Minute).Unix()}}, {m1, time.Now().Add(time.Minute).Unix()}},
dead: []sortedSetEntry{ dead: []sortedSetEntry{
{m2, time.Now().Add(-time.Hour).Unix()}}, {m2, time.Now().Add(-time.Hour).Unix()}},
processed: 90,
failed: 10,
want: &Stats{ want: &Stats{
Enqueued: 0, Enqueued: 0,
InProgress: 0, InProgress: 0,
Scheduled: 2, Scheduled: 2,
Retry: 1, Retry: 1,
Dead: 1, Dead: 1,
Timestamp: time.Now(), Processed: 90,
Failed: 10,
Timestamp: now,
}, },
}, },
} }
@ -108,6 +120,10 @@ func TestCurrentStats(t *testing.T) {
seedScheduledQueue(t, r, tc.scheduled) seedScheduledQueue(t, r, tc.scheduled)
seedRetryQueue(t, r, tc.retry) seedRetryQueue(t, r, tc.retry)
seedDeadQueue(t, r, tc.dead) seedDeadQueue(t, r, tc.dead)
processedKey := base.ProcessedKey(now)
failedKey := base.FailureKey(now)
r.client.Set(processedKey, tc.processed, 0)
r.client.Set(failedKey, tc.failed, 0)
got, err := r.CurrentStats() got, err := r.CurrentStats()
if err != nil { if err != nil {