diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 44cec56..72b6b00 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -28,6 +28,13 @@ type Stats struct { Timestamp time.Time } +// DailyStats holds aggregate data for a given day. +type DailyStats struct { + Processed int + Failed int + Time time.Time +} + // EnqueuedTask is a task in a queue and is ready to be processed. type EnqueuedTask struct { ID xid.ID @@ -131,6 +138,51 @@ func (r *RDB) CurrentStats() (*Stats, error) { }, nil } +// HistoricalStats returns a list of stats from the last n days. +func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error) { + if n < 1 { + return []*DailyStats{}, nil + } + const day = 24 * time.Hour + now := time.Now().UTC() + var days []time.Time + var keys []string + for i := 0; i < n; i++ { + ts := now.Add(-time.Duration(i) * day) + days = append(days, ts) + keys = append(keys, base.ProcessedKey(ts)) + keys = append(keys, base.FailureKey(ts)) + } + script := redis.NewScript(` + local res = {} + for _, key in ipairs(KEYS) do + local n = redis.call("GET", key) + if not n then + n = 0 + end + table.insert(res, tonumber(n)) + end + return res + `) + res, err := script.Run(r.client, keys, len(keys)).Result() + if err != nil { + return nil, err + } + data, err := cast.ToIntSliceE(res) + if err != nil { + return nil, err + } + var stats []*DailyStats + for i := 0; i < len(data); i += 2 { + stats = append(stats, &DailyStats{ + Processed: data[i], + Failed: data[i+1], + Time: days[i/2], + }) + } + return stats, nil +} + // RedisInfo returns a map of redis info. func (r *RDB) RedisInfo() (map[string]string, error) { res, err := r.client.Info().Result() diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 8738708..949d23c 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -166,6 +166,55 @@ func TestCurrentStatsWithoutData(t *testing.T) { } } +func TestHistoricalStats(t *testing.T) { + r := setup(t) + now := time.Now().UTC() + + tests := []struct { + n int // number of days + }{ + {90}, + {7}, + {0}, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + + // populate last n days data + for i := 0; i < tc.n; i++ { + ts := now.Add(-time.Duration(i) * 24 * time.Hour) + processedKey := base.ProcessedKey(ts) + failedKey := base.FailureKey(ts) + r.client.Set(processedKey, (i+1)*1000, 0) + r.client.Set(failedKey, (i+1)*10, 0) + } + + got, err := r.HistoricalStats(tc.n) + if err != nil { + t.Errorf("RDB.HistoricalStats(%v) returned error: %v", tc.n, err) + continue + } + + if len(got) != tc.n { + t.Errorf("RDB.HistorycalStats(%v) returned %d daily stats, want %d", tc.n, len(got), tc.n) + continue + } + + for i := 0; i < tc.n; i++ { + want := &DailyStats{ + Processed: (i + 1) * 1000, + Failed: (i + 1) * 10, + Time: now.Add(-time.Duration(i) * 24 * time.Hour), + } + if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" { + t.Errorf("RDB.HistoricalStats %d days ago data; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff) + } + } + } + +} + func TestRedisInfo(t *testing.T) { r := setup(t) diff --git a/tools/asynqmon/cmd/history.go b/tools/asynqmon/cmd/history.go new file mode 100644 index 0000000..d716020 --- /dev/null +++ b/tools/asynqmon/cmd/history.go @@ -0,0 +1,86 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "os" + "strconv" + "strings" + "text/tabwriter" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +// historyCmd represents the history command +var historyCmd = &cobra.Command{ + Use: "history [num of days]", + Short: "Shows historical aggregate data", + Long: `History (asynqmon history) will show the number of processed tasks +as well as the error rate for the last n days. + +Example: asynqmon history 7 -> Shows stats from the last 7 days`, + Args: cobra.ExactArgs(1), + Run: history, +} + +func init() { + rootCmd.AddCommand(historyCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // historyCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // historyCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +func history(cmd *cobra.Command, args []string) { + n, err := strconv.Atoi(args[0]) + if err != nil { + fmt.Printf(`Error: Invalid argument. Argument has to be an integer. + +Usage: asynqmon history [num of days] +`) + os.Exit(1) + } + if err != nil { + + } + c := redis.NewClient(&redis.Options{ + Addr: uri, + DB: db, + }) + r := rdb.NewRDB(c) + + stats, err := r.HistoricalStats(n) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + printDailyStats(stats) +} + +func printDailyStats(stats []*rdb.DailyStats) { + format := strings.Repeat("%v\t", 4) + "\n" + tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) + fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate") + fmt.Fprintf(tw, format, "----------", "---------", "------", "----------") + for _, s := range stats { + var errrate string + if s.Processed == 0 { + errrate = "N/A" + } else { + errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) + } + fmt.Fprintf(tw, format, s.Time.Format("2006-01-02"), s.Processed, s.Failed, errrate) + } + tw.Flush() +}