mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-20 15:50:20 +08:00
Merge pull request #25 from hibiken/feature/stats
Record processed and failure daily count
This commit is contained in:
commit
f3d7d020a4
1
asynq.go
1
asynq.go
@ -7,7 +7,6 @@ TODOs:
|
|||||||
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
||||||
- [P0] Pagination for `asynqmon ls` command
|
- [P0] Pagination for `asynqmon ls` command
|
||||||
- [P0] Show elapsed time for InProgress tasks (asynqmon ls inprogress)
|
- [P0] Show elapsed time for InProgress tasks (asynqmon ls inprogress)
|
||||||
- [P0] Processed, Failed count for today
|
|
||||||
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
|
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
|
||||||
- [P0] Redis Sentinel support
|
- [P0] Redis Sentinel support
|
||||||
- [P1] Add Support for multiple queues and priority
|
- [P1] Add Support for multiple queues and priority
|
||||||
|
@ -1,11 +1,17 @@
|
|||||||
// Package base defines foundational types and constants used in asynq package.
|
// Package base defines foundational types and constants used in asynq package.
|
||||||
package base
|
package base
|
||||||
|
|
||||||
import "github.com/rs/xid"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/xid"
|
||||||
|
)
|
||||||
|
|
||||||
// Redis keys
|
// Redis keys
|
||||||
const (
|
const (
|
||||||
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
|
||||||
|
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
|
||||||
|
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||||
DefaultQueue = QueuePrefix + "default" // LIST
|
DefaultQueue = QueuePrefix + "default" // LIST
|
||||||
ScheduledQueue = "asynq:scheduled" // ZSET
|
ScheduledQueue = "asynq:scheduled" // ZSET
|
||||||
RetryQueue = "asynq:retry" // ZSET
|
RetryQueue = "asynq:retry" // ZSET
|
||||||
@ -13,6 +19,18 @@ const (
|
|||||||
InProgressQueue = "asynq:in_progress" // LIST
|
InProgressQueue = "asynq:in_progress" // LIST
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ProcessedKey returns a redis key string for procesed count
|
||||||
|
// for the given day.
|
||||||
|
func ProcessedKey(t time.Time) string {
|
||||||
|
return processedPrefix + t.UTC().Format("2006-01-02")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FailureKey returns a redis key string for failure count
|
||||||
|
// for the given day.
|
||||||
|
func FailureKey(t time.Time) string {
|
||||||
|
return failurePrefix + t.UTC().Format("2006-01-02")
|
||||||
|
}
|
||||||
|
|
||||||
// TaskMessage is the internal representation of a task with additional metadata fields.
|
// TaskMessage is the internal representation of a task with additional metadata fields.
|
||||||
// Serialized data of this type gets written in redis.
|
// Serialized data of this type gets written in redis.
|
||||||
type TaskMessage struct {
|
type TaskMessage struct {
|
||||||
|
42
internal/base/base_test.go
Normal file
42
internal/base/base_test.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package base
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProcessedKey(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input time.Time
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:processed:2019-11-14"},
|
||||||
|
{time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:processed:2020-12-01"},
|
||||||
|
{time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:processed:2020-01-06"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
got := ProcessedKey(tc.input)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("ProcessedKey(%v) = %q, want %q", tc.input, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFailureKey(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input time.Time
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:failure:2019-11-14"},
|
||||||
|
{time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:failure:2020-12-01"},
|
||||||
|
{time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:failure:2020-01-06"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
got := FailureKey(tc.input)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("FailureKey(%v) = %q, want %q", tc.input, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/rs/xid"
|
"github.com/rs/xid"
|
||||||
|
"github.com/spf13/cast"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stats represents a state of queues at a certain time.
|
// Stats represents a state of queues at a certain time.
|
||||||
@ -18,6 +19,8 @@ type Stats struct {
|
|||||||
Scheduled int
|
Scheduled int
|
||||||
Retry int
|
Retry int
|
||||||
Dead int
|
Dead int
|
||||||
|
Processed int
|
||||||
|
Failed int
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,23 +77,58 @@ type DeadTask struct {
|
|||||||
|
|
||||||
// CurrentStats returns a current state of the queues.
|
// CurrentStats returns a current state of the queues.
|
||||||
func (r *RDB) CurrentStats() (*Stats, error) {
|
func (r *RDB) CurrentStats() (*Stats, error) {
|
||||||
pipe := r.client.Pipeline()
|
// KEYS[1] -> asynq:queues:default
|
||||||
qlen := pipe.LLen(base.DefaultQueue)
|
// KEYS[2] -> asynq:in_progress
|
||||||
plen := pipe.LLen(base.InProgressQueue)
|
// KEYS[3] -> asynq:scheduled
|
||||||
slen := pipe.ZCard(base.ScheduledQueue)
|
// KEYS[4] -> asynq:retry
|
||||||
rlen := pipe.ZCard(base.RetryQueue)
|
// KEYS[5] -> asynq:dead
|
||||||
dlen := pipe.ZCard(base.DeadQueue)
|
// KEYS[6] -> asynq:processed:<yyyy-mm-dd>
|
||||||
_, err := pipe.Exec()
|
// KEYS[7] -> asynq:failure:<yyyy-mm-dd>
|
||||||
|
script := redis.NewScript(`
|
||||||
|
local qlen = redis.call("LLEN", KEYS[1])
|
||||||
|
local plen = redis.call("LLEN", KEYS[2])
|
||||||
|
local slen = redis.call("ZCARD", KEYS[3])
|
||||||
|
local rlen = redis.call("ZCARD", KEYS[4])
|
||||||
|
local dlen = redis.call("ZCARD", KEYS[5])
|
||||||
|
local pcount = 0
|
||||||
|
local p = redis.call("GET", KEYS[6])
|
||||||
|
if p then
|
||||||
|
pcount = tonumber(p)
|
||||||
|
end
|
||||||
|
local fcount = 0
|
||||||
|
local f = redis.call("GET", KEYS[7])
|
||||||
|
if f then
|
||||||
|
fcount = tonumber(f)
|
||||||
|
end
|
||||||
|
return {qlen, plen, slen, rlen, dlen, pcount, fcount}
|
||||||
|
`)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
res, err := script.Run(r.client, []string{
|
||||||
|
base.DefaultQueue,
|
||||||
|
base.InProgressQueue,
|
||||||
|
base.ScheduledQueue,
|
||||||
|
base.RetryQueue,
|
||||||
|
base.DeadQueue,
|
||||||
|
base.ProcessedKey(now),
|
||||||
|
base.FailureKey(now),
|
||||||
|
}).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nums, err := cast.ToIntSliceE(res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Stats{
|
return &Stats{
|
||||||
Enqueued: int(qlen.Val()),
|
Enqueued: nums[0],
|
||||||
InProgress: int(plen.Val()),
|
InProgress: nums[1],
|
||||||
Scheduled: int(slen.Val()),
|
Scheduled: nums[2],
|
||||||
Retry: int(rlen.Val()),
|
Retry: nums[3],
|
||||||
Dead: int(dlen.Val()),
|
Dead: nums[4],
|
||||||
Timestamp: time.Now(),
|
Processed: nums[5],
|
||||||
|
Failed: nums[6],
|
||||||
|
Timestamp: now,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,12 +55,16 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
m3 := newTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
|
m3 := newTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
|
||||||
m4 := newTaskMessage("sync", nil)
|
m4 := newTaskMessage("sync", nil)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
enqueued []*base.TaskMessage
|
enqueued []*base.TaskMessage
|
||||||
inProgress []*base.TaskMessage
|
inProgress []*base.TaskMessage
|
||||||
scheduled []sortedSetEntry
|
scheduled []sortedSetEntry
|
||||||
retry []sortedSetEntry
|
retry []sortedSetEntry
|
||||||
dead []sortedSetEntry
|
dead []sortedSetEntry
|
||||||
|
processed int
|
||||||
|
failed int
|
||||||
want *Stats
|
want *Stats
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -69,15 +73,19 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
scheduled: []sortedSetEntry{
|
scheduled: []sortedSetEntry{
|
||||||
{m3, time.Now().Add(time.Hour).Unix()},
|
{m3, time.Now().Add(time.Hour).Unix()},
|
||||||
{m4, time.Now().Unix()}},
|
{m4, time.Now().Unix()}},
|
||||||
retry: []sortedSetEntry{},
|
retry: []sortedSetEntry{},
|
||||||
dead: []sortedSetEntry{},
|
dead: []sortedSetEntry{},
|
||||||
|
processed: 120,
|
||||||
|
failed: 2,
|
||||||
want: &Stats{
|
want: &Stats{
|
||||||
Enqueued: 1,
|
Enqueued: 1,
|
||||||
InProgress: 1,
|
InProgress: 1,
|
||||||
Scheduled: 2,
|
Scheduled: 2,
|
||||||
Retry: 0,
|
Retry: 0,
|
||||||
Dead: 0,
|
Dead: 0,
|
||||||
Timestamp: time.Now(),
|
Processed: 120,
|
||||||
|
Failed: 2,
|
||||||
|
Timestamp: now,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -90,13 +98,17 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
{m1, time.Now().Add(time.Minute).Unix()}},
|
{m1, time.Now().Add(time.Minute).Unix()}},
|
||||||
dead: []sortedSetEntry{
|
dead: []sortedSetEntry{
|
||||||
{m2, time.Now().Add(-time.Hour).Unix()}},
|
{m2, time.Now().Add(-time.Hour).Unix()}},
|
||||||
|
processed: 90,
|
||||||
|
failed: 10,
|
||||||
want: &Stats{
|
want: &Stats{
|
||||||
Enqueued: 0,
|
Enqueued: 0,
|
||||||
InProgress: 0,
|
InProgress: 0,
|
||||||
Scheduled: 2,
|
Scheduled: 2,
|
||||||
Retry: 1,
|
Retry: 1,
|
||||||
Dead: 1,
|
Dead: 1,
|
||||||
Timestamp: time.Now(),
|
Processed: 90,
|
||||||
|
Failed: 10,
|
||||||
|
Timestamp: now,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -108,6 +120,10 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
seedScheduledQueue(t, r, tc.scheduled)
|
seedScheduledQueue(t, r, tc.scheduled)
|
||||||
seedRetryQueue(t, r, tc.retry)
|
seedRetryQueue(t, r, tc.retry)
|
||||||
seedDeadQueue(t, r, tc.dead)
|
seedDeadQueue(t, r, tc.dead)
|
||||||
|
processedKey := base.ProcessedKey(now)
|
||||||
|
failedKey := base.FailureKey(now)
|
||||||
|
r.client.Set(processedKey, tc.processed, 0)
|
||||||
|
r.client.Set(failedKey, tc.failed, 0)
|
||||||
|
|
||||||
got, err := r.CurrentStats()
|
got, err := r.CurrentStats()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -122,6 +138,30 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCurrentStatsWithoutData(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
|
||||||
|
want := &Stats{
|
||||||
|
Enqueued: 0,
|
||||||
|
InProgress: 0,
|
||||||
|
Scheduled: 0,
|
||||||
|
Retry: 0,
|
||||||
|
Dead: 0,
|
||||||
|
Processed: 0,
|
||||||
|
Failed: 0,
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := r.CurrentStats()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("r.CurrentStats() = %v, %v, want %+v, nil", got, err, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" {
|
||||||
|
t.Errorf("r.CurrentStats() = %v, %v, want %+v, nil; (-want, +got)\n%s", got, err, want, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRedisInfo(t *testing.T) {
|
func TestRedisInfo(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ var (
|
|||||||
ErrTaskNotFound = errors.New("could not find a task")
|
ErrTaskNotFound = errors.New("could not find a task")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
||||||
|
|
||||||
// RDB is a client interface to query and mutate task queues.
|
// RDB is a client interface to query and mutate task queues.
|
||||||
type RDB struct {
|
type RDB struct {
|
||||||
client *redis.Client
|
client *redis.Client
|
||||||
@ -76,12 +78,26 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
}
|
}
|
||||||
// NOTE: count ZERO means "remove all elements equal to val"
|
// Note: LREM count ZERO means "remove all elements equal to val"
|
||||||
err = r.client.LRem(base.InProgressQueue, 0, string(bytes)).Err()
|
// KEYS[1] -> asynq:in_progress
|
||||||
if err != nil {
|
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
|
||||||
return fmt.Errorf("command `LREM %s 0 %s` failed: %v", base.InProgressQueue, string(bytes), err)
|
// ARGV[1] -> base.TaskMessage value
|
||||||
}
|
// ARGV[2] -> stats expiration timestamp
|
||||||
return nil
|
script := redis.NewScript(`
|
||||||
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
|
local n = redis.call("INCR", KEYS[2])
|
||||||
|
if tonumber(n) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[2], ARGV[2])
|
||||||
|
end
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
now := time.Now()
|
||||||
|
processedKey := base.ProcessedKey(now)
|
||||||
|
expireAt := now.Add(statsTTL)
|
||||||
|
_, err = script.Run(r.client,
|
||||||
|
[]string{base.InProgressQueue, processedKey},
|
||||||
|
string(bytes), expireAt.Unix()).Result()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Requeue moves the task from in-progress queue to the default
|
// Requeue moves the task from in-progress queue to the default
|
||||||
@ -132,18 +148,34 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
||||||
}
|
}
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:retry
|
// KEYS[2] -> asynq:retry
|
||||||
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
||||||
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
// KEYS[4] -> asynq:failure:<yyyy-mm-dd>
|
||||||
// ARGV[3] -> retry_at UNIX timestamp
|
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
||||||
|
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
||||||
|
// ARGV[3] -> retry_at UNIX timestamp
|
||||||
|
// ARGV[4] -> stats expiration timestamp
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||||
|
local n = redis.call("INCR", KEYS[3])
|
||||||
|
if tonumber(n) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[3], ARGV[4])
|
||||||
|
end
|
||||||
|
local m = redis.call("INCR", KEYS[4])
|
||||||
|
if tonumber(m) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[4], ARGV[4])
|
||||||
|
end
|
||||||
return redis.status_reply("OK")
|
return redis.status_reply("OK")
|
||||||
`)
|
`)
|
||||||
_, err = script.Run(r.client, []string{base.InProgressQueue, base.RetryQueue},
|
now := time.Now()
|
||||||
string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result()
|
processedKey := base.ProcessedKey(now)
|
||||||
|
failureKey := base.FailureKey(now)
|
||||||
|
expireAt := now.Add(statsTTL)
|
||||||
|
_, err = script.Run(r.client,
|
||||||
|
[]string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey},
|
||||||
|
string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Result()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,7 +183,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
// the error message to the task.
|
// the error message to the task.
|
||||||
// It also trims the set by timestamp and set size.
|
// It also trims the set by timestamp and set size.
|
||||||
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||||
const maxDeadTask = 10
|
const maxDeadTask = 100
|
||||||
const deadExpirationInDays = 90
|
const deadExpirationInDays = 90
|
||||||
bytesToRemove, err := json.Marshal(msg)
|
bytesToRemove, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -165,22 +197,37 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
|||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||||
|
processedKey := base.ProcessedKey(now)
|
||||||
|
failureKey := base.FailureKey(now)
|
||||||
|
expireAt := now.Add(statsTTL)
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:dead
|
// KEYS[2] -> asynq:dead
|
||||||
|
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
||||||
|
// KEYS[4] -> asynq.failure:<yyyy-mm-dd>
|
||||||
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
||||||
// ARGV[2] -> base.TaskMessage value to add to Dead queue
|
// ARGV[2] -> base.TaskMessage value to add to Dead queue
|
||||||
// ARGV[3] -> died_at UNIX timestamp
|
// ARGV[3] -> died_at UNIX timestamp
|
||||||
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||||
|
// ARGV[6] -> stats expiration timestamp
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||||
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
||||||
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
||||||
|
local n = redis.call("INCR", KEYS[3])
|
||||||
|
if tonumber(n) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[3], ARGV[6])
|
||||||
|
end
|
||||||
|
local m = redis.call("INCR", KEYS[4])
|
||||||
|
if tonumber(m) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[4], ARGV[6])
|
||||||
|
end
|
||||||
return redis.status_reply("OK")
|
return redis.status_reply("OK")
|
||||||
`)
|
`)
|
||||||
_, err = script.Run(r.client, []string{base.InProgressQueue, base.DeadQueue},
|
_, err = script.Run(r.client,
|
||||||
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result()
|
[]string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey},
|
||||||
|
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask, expireAt.Unix()).Result()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,6 +110,17 @@ func TestDone(t *testing.T) {
|
|||||||
t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", base.InProgressQueue, diff)
|
t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", base.InProgressQueue, diff)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
processedKey := base.ProcessedKey(time.Now())
|
||||||
|
gotProcessed := r.client.Get(processedKey).Val()
|
||||||
|
if gotProcessed != "1" {
|
||||||
|
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotTTL := r.client.TTL(processedKey).Val()
|
||||||
|
if gotTTL > statsTTL {
|
||||||
|
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,6 +255,26 @@ func TestKill(t *testing.T) {
|
|||||||
if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff)
|
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
processedKey := base.ProcessedKey(time.Now())
|
||||||
|
gotProcessed := r.client.Get(processedKey).Val()
|
||||||
|
if gotProcessed != "1" {
|
||||||
|
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
|
||||||
|
}
|
||||||
|
gotTTL := r.client.TTL(processedKey).Val()
|
||||||
|
if gotTTL > statsTTL {
|
||||||
|
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
failureKey := base.FailureKey(time.Now())
|
||||||
|
gotFailure := r.client.Get(failureKey).Val()
|
||||||
|
if gotFailure != "1" {
|
||||||
|
t.Errorf("GET %q = %q, want 1", failureKey, gotFailure)
|
||||||
|
}
|
||||||
|
gotTTL = r.client.TTL(processedKey).Val()
|
||||||
|
if gotTTL > statsTTL {
|
||||||
|
t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -489,5 +520,25 @@ func TestRetry(t *testing.T) {
|
|||||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
processedKey := base.ProcessedKey(time.Now())
|
||||||
|
gotProcessed := r.client.Get(processedKey).Val()
|
||||||
|
if gotProcessed != "1" {
|
||||||
|
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
|
||||||
|
}
|
||||||
|
gotTTL := r.client.TTL(processedKey).Val()
|
||||||
|
if gotTTL > statsTTL {
|
||||||
|
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
failureKey := base.FailureKey(time.Now())
|
||||||
|
gotFailure := r.client.Get(failureKey).Val()
|
||||||
|
if gotFailure != "1" {
|
||||||
|
t.Errorf("GET %q = %q, want 1", failureKey, gotFailure)
|
||||||
|
}
|
||||||
|
gotTTL = r.client.TTL(processedKey).Val()
|
||||||
|
if gotTTL > statsTTL {
|
||||||
|
t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,13 +137,12 @@ func (p *processor) exec() {
|
|||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
if msg.Retried >= msg.Retry {
|
if msg.Retried >= msg.Retry {
|
||||||
p.kill(msg, resErr.Error())
|
p.kill(msg, resErr.Error())
|
||||||
return
|
} else {
|
||||||
|
p.retry(msg, resErr.Error())
|
||||||
}
|
}
|
||||||
p.retry(msg, resErr.Error())
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.markAsDone(msg)
|
p.markAsDone(msg)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -58,14 +58,19 @@ func stats(cmd *cobra.Command, args []string) {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
fmt.Println("QUEUES")
|
fmt.Println("QUEUES")
|
||||||
|
printQueues(stats)
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
fmt.Printf("STATS FOR %s UTC\n", stats.Timestamp.UTC().Format("2006-01-02"))
|
||||||
printStats(stats)
|
printStats(stats)
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
|
|
||||||
fmt.Println("REDIS INFO")
|
fmt.Println("REDIS INFO")
|
||||||
printInfo(info)
|
printInfo(info)
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
}
|
}
|
||||||
|
|
||||||
func printStats(s *rdb.Stats) {
|
func printQueues(s *rdb.Stats) {
|
||||||
format := strings.Repeat("%v\t", 5) + "\n"
|
format := strings.Repeat("%v\t", 5) + "\n"
|
||||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||||
fmt.Fprintf(tw, format, "InProgress", "Enqueued", "Scheduled", "Retry", "Dead")
|
fmt.Fprintf(tw, format, "InProgress", "Enqueued", "Scheduled", "Retry", "Dead")
|
||||||
@ -74,6 +79,21 @@ func printStats(s *rdb.Stats) {
|
|||||||
tw.Flush()
|
tw.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func printStats(s *rdb.Stats) {
|
||||||
|
format := strings.Repeat("%v\t", 3) + "\n"
|
||||||
|
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||||
|
fmt.Fprintf(tw, format, "Processed", "Failed", "Error Rate")
|
||||||
|
fmt.Fprintf(tw, format, "---------", "------", "----------")
|
||||||
|
var errrate string
|
||||||
|
if s.Processed == 0 {
|
||||||
|
errrate = "N/A"
|
||||||
|
} else {
|
||||||
|
errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(tw, format, s.Processed, s.Failed, errrate)
|
||||||
|
tw.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
func printInfo(info map[string]string) {
|
func printInfo(info map[string]string) {
|
||||||
format := strings.Repeat("%v\t", 5) + "\n"
|
format := strings.Repeat("%v\t", 5) + "\n"
|
||||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user