From cde9d41580dd6610087059717381ba6484a5f049 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 25 Dec 2019 20:17:00 -0800 Subject: [PATCH] Add processed and failed count to Stats --- internal/rdb/inspect.go | 18 +++++++++++++++++- internal/rdb/inspect_test.go | 24 ++++++++++++++++++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 2fa478b..5bfee07 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -3,6 +3,7 @@ package rdb import ( "encoding/json" "fmt" + "strconv" "strings" "time" @@ -18,6 +19,8 @@ type Stats struct { Scheduled int Retry int Dead int + Processed int + Failed int Timestamp time.Time } @@ -74,23 +77,36 @@ type DeadTask struct { // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats() (*Stats, error) { + now := time.Now() pipe := r.client.Pipeline() qlen := pipe.LLen(base.DefaultQueue) plen := pipe.LLen(base.InProgressQueue) slen := pipe.ZCard(base.ScheduledQueue) rlen := pipe.ZCard(base.RetryQueue) dlen := pipe.ZCard(base.DeadQueue) + pcount := pipe.Get(base.ProcessedKey(now)) + fcount := pipe.Get(base.FailureKey(now)) _, err := pipe.Exec() if err != nil { return nil, err } + p, err := strconv.Atoi(pcount.Val()) + if err != nil { + return nil, err + } + f, err := strconv.Atoi(fcount.Val()) + if err != nil { + return nil, err + } return &Stats{ Enqueued: int(qlen.Val()), InProgress: int(plen.Val()), Scheduled: int(slen.Val()), Retry: int(rlen.Val()), Dead: int(dlen.Val()), - Timestamp: time.Now(), + Processed: p, + Failed: f, + Timestamp: now, }, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 3374bef..8584c25 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -55,12 +55,16 @@ func TestCurrentStats(t *testing.T) { m3 := newTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m4 := newTaskMessage("sync", nil) + now := time.Now() + tests := []struct { enqueued []*base.TaskMessage inProgress []*base.TaskMessage scheduled []sortedSetEntry retry []sortedSetEntry dead []sortedSetEntry + processed int + failed int want *Stats }{ { @@ -69,15 +73,19 @@ func TestCurrentStats(t *testing.T) { scheduled: []sortedSetEntry{ {m3, time.Now().Add(time.Hour).Unix()}, {m4, time.Now().Unix()}}, - retry: []sortedSetEntry{}, - dead: []sortedSetEntry{}, + retry: []sortedSetEntry{}, + dead: []sortedSetEntry{}, + processed: 120, + failed: 2, want: &Stats{ Enqueued: 1, InProgress: 1, Scheduled: 2, Retry: 0, Dead: 0, - Timestamp: time.Now(), + Processed: 120, + Failed: 2, + Timestamp: now, }, }, { @@ -90,13 +98,17 @@ func TestCurrentStats(t *testing.T) { {m1, time.Now().Add(time.Minute).Unix()}}, dead: []sortedSetEntry{ {m2, time.Now().Add(-time.Hour).Unix()}}, + processed: 90, + failed: 10, want: &Stats{ Enqueued: 0, InProgress: 0, Scheduled: 2, Retry: 1, Dead: 1, - Timestamp: time.Now(), + Processed: 90, + Failed: 10, + Timestamp: now, }, }, } @@ -108,6 +120,10 @@ func TestCurrentStats(t *testing.T) { seedScheduledQueue(t, r, tc.scheduled) seedRetryQueue(t, r, tc.retry) seedDeadQueue(t, r, tc.dead) + processedKey := base.ProcessedKey(now) + failedKey := base.FailureKey(now) + r.client.Set(processedKey, tc.processed, 0) + r.client.Set(failedKey, tc.failed, 0) got, err := r.CurrentStats() if err != nil {