Compare commits

...

18 Commits

Author SHA1 Message Date
Ken Hibino
bc77f6fe14 v0.15.0 2021-01-31 06:11:17 -08:00
Ken Hibino
efe197a47b Use db13 for inspeq package testing 2021-01-31 06:09:40 -08:00
Ken Hibino
97b5516183 Update RedisConnOpt interface 2021-01-31 06:09:40 -08:00
Ken Hibino
8eafa03ca7 Fix doc indentation 2021-01-31 06:09:40 -08:00
Ken Hibino
430b01c9aa Fix CLI build 2021-01-31 06:09:40 -08:00
Ken Hibino
14c381dc40 Update documentation for inspeq package 2021-01-31 06:09:40 -08:00
Ken Hibino
e13122723a Move all inspector related code to subpackage inspeq 2021-01-31 06:09:40 -08:00
Ken Hibino
eba7c4e085 Record deadline within WorkerInfo 2021-01-31 06:09:40 -08:00
Ken Hibino
bfde0b6283 Add Retry and LastError fields to inspector tasks 2021-01-31 06:09:40 -08:00
Ken Hibino
afde6a7266 Add MemoryUsage field to QueueStats 2021-01-31 06:09:40 -08:00
Ken Hibino
6529a1e0b1 Fix scheduler
* Delete scheduler history data when scheduler stops

* Fix history trimming bug
2021-01-31 06:09:40 -08:00
Ken Hibino
c9a6ab8ae1 Support delete and archive actions on PendingTask
* Add `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` to `Inspector`

* `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving PendingTask

* Updated `asynq task` command with support for deleting/archiving pending tasks
2021-01-31 06:09:40 -08:00
Ken Hibino
557c1a5044 Remove Travis CI files 2021-01-29 23:01:20 -08:00
Ken Hibino
0236eb9a1c Add benchstat workflow 2021-01-29 22:59:28 -08:00
Ken Hibino
3c2b2cf4a3 Update build status badge 2021-01-29 15:03:27 -08:00
Ken Hibino
04df71198d Create go github action 2021-01-29 14:52:55 -08:00
Ken Hibino
2884044e75 v0.14.1 2021-01-19 06:22:54 -08:00
Ken Hibino
3719fad396 Update asynq version in go.mod for toolings 2021-01-19 06:20:39 -08:00
28 changed files with 1701 additions and 681 deletions

82
.github/workflows/benchstat.yml vendored Normal file
View File

@@ -0,0 +1,82 @@
# This workflow runs benchmarks against the current branch,
# compares them to benchmarks against master,
# and uploads the results as an artifact.
name: benchstat
on: [pull_request]
jobs:
incoming:
runs-on: ubuntu-latest
services:
redis:
image: redis
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt
- name: Upload Benchmark
uses: actions/upload-artifact@v2
with:
name: bench-incoming
path: new.txt
current:
runs-on: ubuntu-latest
services:
redis:
image: redis
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v2
with:
ref: master
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a old.txt
- name: Upload Benchmark
uses: actions/upload-artifact@v2
with:
name: bench-current
path: old.txt
benchstat:
needs: [incoming, current]
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15.x
- name: Install benchstat
run: go get -u golang.org/x/perf/cmd/benchstat
- name: Download Incoming
uses: actions/download-artifact@v2
with:
name: bench-incoming
- name: Download Current
uses: actions/download-artifact@v2
with:
name: bench-current
- name: Benchstat Results
run: benchstat old.txt new.txt | tee -a benchstat.txt
- name: Upload benchstat results
uses: actions/upload-artifact@v2
with:
name: benchstat
path: benchstat.txt

35
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: build
on: [push, pull_request]
jobs:
build:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.13.x, 1.14.x, 1.15.x]
runs-on: ${{ matrix.os }}
services:
redis:
image: redis
ports:
- 6379:6379
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Build
run: go build -v ./...
- name: Test
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
- name: Benchmark Test
run: go test -run=^$ -bench=. -loglevel=debug ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1

View File

@@ -1,13 +0,0 @@
language: go
go_import_path: github.com/hibiken/asynq
git:
depth: 1
go: [1.13.x, 1.14.x, 1.15.x]
script:
- go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
- go test -run=^$ -bench=. -loglevel=debug ./...
services:
- redis-server
after_success:
- travis_wait 60 bash ./.travis/benchstat.sh
- bash <(curl -s https://codecov.io/bash)

View File

@@ -1,20 +0,0 @@
if [ "${TRAVIS_PULL_REQUEST_BRANCH:-$TRAVIS_BRANCH}" != "master" ]; then
REMOTE_URL="$(git config --get remote.origin.url)";
cd ${TRAVIS_BUILD_DIR}/.. && \
git clone ${REMOTE_URL} "${TRAVIS_REPO_SLUG}-bench" && \
# turn the detached message off
git config --global advice.detachedHead false && \
cd "${TRAVIS_REPO_SLUG}-bench" && \
# Benchmark master
git checkout master && \
go test -run=^$ -bench=. -count=5 -timeout=60m -benchmem ./... > master.txt && \
# Benchmark feature branch
git checkout ${TRAVIS_COMMIT} && \
go test -run=^$ -bench=. -count=5 -timeout=60m -benchmem ./... > feature.txt && \
# compare two benchmarks
go get -u golang.org/x/perf/cmd/benchstat && \
benchstat master.txt feature.txt;
fi

View File

@@ -7,6 +7,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.15.0] - 2021-01-31
**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"
### Changed
- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer,
update your code to pass as a value.
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
### Added
- `MaxRetry`, `Retried`, `LastError` fields were added to all task types returned from `Inspector`.
- `MemoryUsage` field was added to `QueueStats`.
- `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector`
- `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`.
- asynq CLI now supports deleting/archiving pending tasks.
## [0.14.1] - 2021-01-19
### Fixed
- `go.mod` file for CLI
## [0.14.0] - 2021-01-14
**IMPORTATNT**: Please run `asynq migrate` command to migrate from the previous versions.

View File

@@ -1,6 +1,6 @@
# Asynq
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq)
![Build Status](https://github.com/hibiken/asynq/workflows/build/badge.svg)
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)

109
asynq.go
View File

@@ -40,7 +40,11 @@ func NewTask(typename string, payload map[string]interface{}) *Task {
// - RedisClientOpt
// - RedisFailoverClientOpt
// - RedisClusterClientOpt
type RedisConnOpt interface{}
type RedisConnOpt interface {
// MakeRedisClient returns a new redis client instance.
// Return value is intentionally opaque to hide the implementation detail of redis client.
MakeRedisClient() interface{}
}
// RedisClientOpt is used to create a redis client that connects
// to a redis server directly.
@@ -73,6 +77,18 @@ type RedisClientOpt struct {
TLSConfig *tls.Config
}
func (opt RedisClientOpt) MakeRedisClient() interface{} {
return redis.NewClient(&redis.Options{
Network: opt.Network,
Addr: opt.Addr,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
}
// RedisFailoverClientOpt is used to creates a redis client that talks
// to redis sentinels for service discovery and has an automatic failover
// capability.
@@ -109,6 +125,19 @@ type RedisFailoverClientOpt struct {
TLSConfig *tls.Config
}
func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: opt.MasterName,
SentinelAddrs: opt.SentinelAddrs,
SentinelPassword: opt.SentinelPassword,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
}
// RedisFailoverClientOpt is used to creates a redis client that connects to
// redis cluster.
type RedisClusterClientOpt struct {
@@ -133,6 +162,16 @@ type RedisClusterClientOpt struct {
TLSConfig *tls.Config
}
func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: opt.Addrs,
MaxRedirects: opt.MaxRedirects,
Username: opt.Username,
Password: opt.Password,
TLSConfig: opt.TLSConfig,
})
}
// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
// It returns a non-nil error if uri cannot be parsed.
//
@@ -205,71 +244,3 @@ func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
}
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil
}
// createRedisClient returns a redis client given a redis connection configuration.
//
// Passing an unexpected type as a RedisConnOpt argument will cause panic.
func createRedisClient(r RedisConnOpt) redis.UniversalClient {
switch r := r.(type) {
case *RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case *RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
case *RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
default:
panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r))
}
}

View File

@@ -7,11 +7,11 @@ package asynq
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
@@ -31,7 +31,11 @@ type Client struct {
// NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r))
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
rdb := rdb.NewRDB(c)
return &Client{
opts: make(map[string][]Option),
rdb: rdb,
@@ -172,73 +176,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)
func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return time.Duration(d) }
// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption(s string) (Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
qname, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return Queue(qname), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {
return nil, err
}
return MaxRetry(n), nil
case "Timeout":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return Timeout(d), nil
case "Deadline":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return Deadline(t), nil
case "Unique":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return Unique(d), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return ProcessAt(t), nil
case "ProcessIn":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return ProcessIn(d), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}
}
func parseOptionFunc(s string) string {
i := strings.Index(s, "(")
return s[:i]
}
func parseOptionArg(s string) string {
i := strings.Index(s, "(")
if i >= 0 {
j := strings.Index(s, ")")
if j > i {
return s[i+1 : j]
}
}
return ""
}
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
@@ -272,7 +209,7 @@ func composeOptions(opts ...Option) (option, error) {
res.retry = int(opt)
case queueOption:
trimmed := strings.TrimSpace(string(opt))
if err := validateQueueName(trimmed); err != nil {
if err := base.ValidateQueueName(trimmed); err != nil {
return option{}, err
}
res.queue = trimmed
@@ -293,13 +230,6 @@ func composeOptions(opts ...Option) (option, error) {
return res, nil
}
func validateQueueName(qname string) error {
if len(qname) == 0 {
return fmt.Errorf("queue name must contain one or more characters")
}
return nil
}
const (
// Default max retry count used if nothing is specified.
defaultMaxRetry = 25

View File

@@ -775,70 +775,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
func TestParseOption(t *testing.T) {
oneHourFromNow := time.Now().Add(1 * time.Hour)
tests := []struct {
s string
wantType OptionType
wantVal interface{}
}{
{`MaxRetry(10)`, MaxRetryOpt, 10},
{`Queue("email")`, QueueOpt, "email"},
{`Timeout(3m)`, TimeoutOpt, 3 * time.Minute},
{Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow},
{`Unique(1h)`, UniqueOpt, 1 * time.Hour},
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
}
for _, tc := range tests {
t.Run(tc.s, func(t *testing.T) {
got, err := parseOption(tc.s)
if err != nil {
t.Fatalf("returned error: %v", err)
}
if got == nil {
t.Fatal("returned nil")
}
if got.Type() != tc.wantType {
t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType)
}
switch tc.wantType {
case QueueOpt:
gotVal, ok := got.Value().(string)
if !ok {
t.Fatal("returned Option with non-string value")
}
if gotVal != tc.wantVal.(string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case MaxRetryOpt:
gotVal, ok := got.Value().(int)
if !ok {
t.Fatal("returned Option with non-int value")
}
if gotVal != tc.wantVal.(int) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case TimeoutOpt, UniqueOpt, ProcessInOpt:
gotVal, ok := got.Value().(time.Duration)
if !ok {
t.Fatal("returned Option with non duration value")
}
if gotVal != tc.wantVal.(time.Duration) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case DeadlineOpt, ProcessAtOpt:
gotVal, ok := got.Value().(time.Time)
if !ok {
t.Fatal("returned Option with non time value")
}
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
default:
t.Fatalf("returned Option with unexpected type: %v", got.Type())
}
})
}
}

View File

@@ -38,13 +38,13 @@ type heartbeater struct {
// heartbeater goroutine. In other words, confine these variables
// to this goroutine only.
started time.Time
workers map[string]workerStat
workers map[string]*workerInfo
// status is shared with other goroutine but is concurrency safe.
status *base.ServerStatus
// channels to receive updates on active workers.
starting <-chan *base.TaskMessage
starting <-chan *workerInfo
finished <-chan *base.TaskMessage
}
@@ -56,7 +56,7 @@ type heartbeaterParams struct {
queues map[string]int
strictPriority bool
status *base.ServerStatus
starting <-chan *base.TaskMessage
starting <-chan *workerInfo
finished <-chan *base.TaskMessage
}
@@ -80,7 +80,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
strictPriority: params.strictPriority,
status: params.status,
workers: make(map[string]workerStat),
workers: make(map[string]*workerInfo),
starting: params.starting,
finished: params.finished,
}
@@ -92,11 +92,14 @@ func (h *heartbeater) terminate() {
h.done <- struct{}{}
}
// A workerStat records the message a worker is working on
// and the time the worker has started processing the message.
type workerStat struct {
// A workerInfo holds an active worker information.
type workerInfo struct {
// the task message the worker is processing.
msg *base.TaskMessage
// the time the worker has started processing the message.
started time.Time
msg *base.TaskMessage
// deadline the worker has to finish processing the task by.
deadline time.Time
}
func (h *heartbeater) start(wg *sync.WaitGroup) {
@@ -121,8 +124,8 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
h.beat()
timer.Reset(h.interval)
case msg := <-h.starting:
h.workers[msg.ID.String()] = workerStat{time.Now(), msg}
case w := <-h.starting:
h.workers[w.msg.ID.String()] = w
case msg := <-h.finished:
delete(h.workers, msg.ID.String())
@@ -145,16 +148,17 @@ func (h *heartbeater) beat() {
}
var ws []*base.WorkerInfo
for id, stat := range h.workers {
for id, w := range h.workers {
ws = append(ws, &base.WorkerInfo{
Host: h.host,
PID: h.pid,
ServerID: h.serverID,
ID: id,
Type: stat.msg.Type,
Queue: stat.msg.Queue,
Payload: stat.msg.Payload,
Started: stat.started,
Type: w.msg.Type,
Queue: w.msg.Queue,
Payload: w.msg.Payload,
Started: w.started,
Deadline: w.deadline,
})
}

View File

@@ -47,7 +47,7 @@ func TestHeartbeater(t *testing.T) {
queues: tc.queues,
strictPriority: false,
status: status,
starting: make(chan *base.TaskMessage),
starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage),
})
@@ -139,7 +139,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
queues: map[string]int{"default": 1},
strictPriority: false,
status: base.NewServerStatus(base.StatusRunning),
starting: make(chan *base.TaskMessage),
starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage),
})

22
inspeq/doc.go Normal file
View File

@@ -0,0 +1,22 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
/*
Package inspeq provides helper types and functions to inspect queues and tasks managed by Asynq.
Inspector is used to query and mutate the state of queues and tasks.
Example:
inspector := inspeq.New(asynq.RedisClientOpt{Addr: "localhost:6379"})
tasks, err := inspector.ListArchivedTasks("my-queue")
for _, t := range tasks {
if err := inspector.DeleteTaskByKey(t.Key()); err != nil {
// handle error
}
}
*/
package inspeq

View File

@@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
package inspeq
import (
"fmt"
@@ -10,7 +10,10 @@ import (
"strings"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
@@ -20,10 +23,14 @@ type Inspector struct {
rdb *rdb.RDB
}
// NewInspector returns a new instance of Inspector.
func NewInspector(r RedisConnOpt) *Inspector {
// New returns a new instance of Inspector.
func New(r asynq.RedisConnOpt) *Inspector {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
}
return &Inspector{
rdb: rdb.NewRDB(createRedisClient(r)),
rdb: rdb.NewRDB(c),
}
}
@@ -41,6 +48,8 @@ func (i *Inspector) Queues() ([]string, error) {
type QueueStats struct {
// Name of the queue.
Queue string
// Total number of bytes that the queue and its tasks require to be stored in redis.
MemoryUsage int64
// Size is the total number of tasks in the queue.
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
Size int
@@ -68,7 +77,7 @@ type QueueStats struct {
// CurrentStats returns a current stats of the given queue.
func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
stats, err := i.rdb.CurrentStats(qname)
@@ -76,17 +85,18 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
return nil, err
}
return &QueueStats{
Queue: stats.Queue,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
Queue: stats.Queue,
MemoryUsage: stats.MemoryUsage,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
}, nil
}
@@ -105,7 +115,7 @@ type DailyStats struct {
// History returns a list of stats from the last n days.
func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
stats, err := i.rdb.HistoricalStats(qname, n)
@@ -165,23 +175,32 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*Task
ID string
Queue string
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
}
// ActiveTask is a task that's currently being processed.
type ActiveTask struct {
*Task
ID string
Queue string
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
}
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*Task
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
NextProcessAt time.Time
score int64
@@ -189,13 +208,13 @@ type ScheduledTask struct {
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*Task
*asynq.Task
ID string
Queue string
NextProcessAt time.Time
MaxRetry int
Retried int
ErrorMsg string
LastError string
// TODO: LastFailedAt time.Time
score int64
@@ -206,52 +225,73 @@ type RetryTask struct {
// A task can be archived when the task exhausts its retry counts or manually
// archived by a user via the CLI or Inspector.
type ArchivedTask struct {
*Task
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastFailedAt time.Time
ErrorMsg string
LastError string
score int64
}
// Key returns a key used to delete, run, and archive the task.
// Format string used for task key.
// Format is <prefix>:<uuid>:<score>.
const taskKeyFormat = "%s:%v:%v"
// Prefix used for task key.
const (
keyPrefixPending = "p"
keyPrefixScheduled = "s"
keyPrefixRetry = "r"
keyPrefixArchived = "a"
allKeyPrefixes = keyPrefixPending + keyPrefixScheduled + keyPrefixRetry + keyPrefixArchived
)
// Key returns a key used to delete, and archive the pending task.
func (t *PendingTask) Key() string {
// Note: Pending tasks are stored in redis LIST, therefore no score.
// Use zero for the score to use the same key format.
return fmt.Sprintf(taskKeyFormat, keyPrefixPending, t.ID, 0)
}
// Key returns a key used to delete, run, and archive the scheduled task.
func (t *ScheduledTask) Key() string {
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
return fmt.Sprintf(taskKeyFormat, keyPrefixScheduled, t.ID, t.score)
}
// Key returns a key used to delete, run, and archive the task.
// Key returns a key used to delete, run, and archive the retry task.
func (t *RetryTask) Key() string {
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
return fmt.Sprintf(taskKeyFormat, keyPrefixRetry, t.ID, t.score)
}
// Key returns a key used to delete, run, and archive the task.
// Key returns a key used to delete and run the archived task.
func (t *ArchivedTask) Key() string {
return fmt.Sprintf("a:%v:%v", t.ID, t.score)
return fmt.Sprintf(taskKeyFormat, keyPrefixArchived, t.ID, t.score)
}
// parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error.
func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err error) {
func parseTaskKey(key string) (prefix string, id uuid.UUID, score int64, err error) {
parts := strings.Split(key, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[1])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
state = parts[0]
if len(state) != 1 || !strings.Contains("sra", state) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
prefix = parts[0]
if len(prefix) != 1 || !strings.Contains(allKeyPrefixes, prefix) {
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
return id, score, state, nil
return prefix, id, score, nil
}
// ListOption specifies behavior of list operation.
@@ -319,7 +359,7 @@ func Page(n int) ListOption {
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
@@ -331,9 +371,12 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
var tasks []*PendingTask
for _, m := range msgs {
tasks = append(tasks, &PendingTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
Task: asynq.NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
})
}
return tasks, err
@@ -343,7 +386,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
@@ -354,10 +397,14 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
}
var tasks []*ActiveTask
for _, m := range msgs {
tasks = append(tasks, &ActiveTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
Task: asynq.NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
})
}
return tasks, err
@@ -368,7 +415,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
@@ -380,11 +427,14 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
var tasks []*ScheduledTask
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastError: z.Message.ErrorMsg,
NextProcessAt: processAt,
score: z.Score,
})
@@ -397,7 +447,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
@@ -409,7 +459,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
var tasks []*RetryTask
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
@@ -418,8 +468,8 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
LastError: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
@@ -430,7 +480,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
@@ -442,7 +492,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
var tasks []*ArchivedTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ArchivedTask{
Task: t,
ID: z.Message.ID.String(),
@@ -450,17 +500,27 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastFailedAt: failedAt,
ErrorMsg: z.Message.ErrorMsg,
LastError: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
}
// DeleteAllPendingTasks deletes all pending tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllPendingTasks(qname)
return int(n), err
}
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllScheduledTasks(qname)
@@ -470,7 +530,7 @@ func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
// DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllRetryTasks(qname)
@@ -480,7 +540,7 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
// DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllArchivedTasks(qname)
@@ -489,49 +549,51 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
// DeleteTaskByKey deletes a task with the given key from the given queue.
func (i *Inspector) DeleteTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
id, score, state, err := parseTaskKey(key)
prefix, id, score, err := parseTaskKey(key)
if err != nil {
return err
}
switch state {
case "s":
switch prefix {
case keyPrefixPending:
return i.rdb.DeletePendingTask(qname, id)
case keyPrefixScheduled:
return i.rdb.DeleteScheduledTask(qname, id, score)
case "r":
case keyPrefixRetry:
return i.rdb.DeleteRetryTask(qname, id, score)
case "a":
case keyPrefixArchived:
return i.rdb.DeleteArchivedTask(qname, id, score)
default:
return fmt.Errorf("invalid key")
}
}
// RunAllScheduledTasks transition all scheduled tasks to pending state within the given queue,
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.RunAllScheduledTasks(qname)
return int(n), err
}
// RunAllRetryTasks transition all retry tasks to pending state within the given queue,
// RunAllRetryTasks transition all retry tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.RunAllRetryTasks(qname)
return int(n), err
}
// RunAllArchivedTasks transition all archived tasks to pending state within the given queue,
// RunAllArchivedTasks transition all archived tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.RunAllArchivedTasks(qname)
@@ -540,39 +602,51 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
// RunTaskByKey transition a task to pending state given task key and queue name.
func (i *Inspector) RunTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
id, score, state, err := parseTaskKey(key)
prefix, id, score, err := parseTaskKey(key)
if err != nil {
return err
}
switch state {
case "s":
switch prefix {
case keyPrefixScheduled:
return i.rdb.RunScheduledTask(qname, id, score)
case "r":
case keyPrefixRetry:
return i.rdb.RunRetryTask(qname, id, score)
case "a":
case keyPrefixArchived:
return i.rdb.RunArchivedTask(qname, id, score)
case keyPrefixPending:
return fmt.Errorf("task is already pending for run")
default:
return fmt.Errorf("invalid key")
}
}
// ArchiveAllScheduledTasks archives all scheduled tasks within the given queue,
// ArchiveAllPendingTasks archives all pending tasks from the given queue,
// and reports the number of tasks archived.
func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllPendingTasks(qname)
return int(n), err
}
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
// and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllScheduledTasks(qname)
return int(n), err
}
// ArchiveAllRetryTasks archives all retry tasks within the given queue,
// ArchiveAllRetryTasks archives all retry tasks from the given queue,
// and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllRetryTasks(qname)
@@ -581,20 +655,22 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
// ArchiveTaskByKey archives a task with the given key in the given queue.
func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
id, score, state, err := parseTaskKey(key)
prefix, id, score, err := parseTaskKey(key)
if err != nil {
return err
}
switch state {
case "s":
switch prefix {
case keyPrefixPending:
return i.rdb.ArchivePendingTask(qname, id)
case keyPrefixScheduled:
return i.rdb.ArchiveScheduledTask(qname, id, score)
case "r":
case keyPrefixRetry:
return i.rdb.ArchiveRetryTask(qname, id, score)
case "a":
return fmt.Errorf("task already archived")
case keyPrefixArchived:
return fmt.Errorf("task is already archived")
default:
return fmt.Errorf("invalid key")
}
@@ -611,7 +687,7 @@ func (i *Inspector) CancelActiveTask(id string) error {
// PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
return i.rdb.Pause(qname)
@@ -620,7 +696,7 @@ func (i *Inspector) PauseQueue(qname string) error {
// UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error {
if err := validateQueueName(qname); err != nil {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
return i.rdb.Unpause(qname)
@@ -656,9 +732,10 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) {
continue
}
wrkInfo := &WorkerInfo{
Started: w.Started,
Started: w.Started,
Deadline: w.Deadline,
Task: &ActiveTask{
Task: NewTask(w.Type, w.Payload),
Task: asynq.NewTask(w.Type, w.Payload),
ID: w.ID,
Queue: w.Queue,
},
@@ -702,6 +779,8 @@ type WorkerInfo struct {
Task *ActiveTask
// Time the worker started processing the task.
Started time.Time
// Time the worker needs to finish processing the task by.
Deadline time.Time
}
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
@@ -740,10 +819,10 @@ type SchedulerEntry struct {
Spec string
// Periodic Task registered for this entry.
Task *Task
Task *asynq.Task
// Opts is the options for the periodic task.
Opts []Option
Opts []asynq.Option
// Next shows the next time the task will be enqueued.
Next time.Time
@@ -762,8 +841,8 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return nil, err
}
for _, e := range res {
task := NewTask(e.Type, e.Payload)
var opts []Option
task := asynq.NewTask(e.Type, e.Payload)
var opts []asynq.Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
// ignore bad data
@@ -782,6 +861,74 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return entries, nil
}
// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption(s string) (asynq.Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
qname, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return asynq.Queue(qname), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {
return nil, err
}
return asynq.MaxRetry(n), nil
case "Timeout":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Timeout(d), nil
case "Deadline":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.Deadline(t), nil
case "Unique":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Unique(d), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.ProcessAt(t), nil
case "ProcessIn":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.ProcessIn(d), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}
}
func parseOptionFunc(s string) string {
i := strings.Index(s, "(")
return s[:i]
}
func parseOptionArg(s string) string {
i := strings.Index(s, "(")
if i >= 0 {
j := strings.Index(s, ")")
if j > i {
return s[i+1 : j]
}
}
return ""
}
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct {
// ID of the task that was enqueued.

File diff suppressed because it is too large Load Diff

View File

@@ -19,7 +19,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.14.0"
const Version = "0.15.0"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
@@ -36,6 +36,15 @@ const (
CancelChannel = "asynq:cancel" // PubSub channel
)
// ValidateQueueName validates a given qname to be used as a queue name.
// Returns nil if valid, otherwise returns non-nil error.
func ValidateQueueName(qname string) error {
if len(qname) == 0 {
return fmt.Errorf("queue name must contain one or more characters")
}
return nil
}
// QueueKey returns a redis key for the given queue name.
func QueueKey(qname string) string {
return fmt.Sprintf("asynq:{%s}", qname)
@@ -283,6 +292,7 @@ type WorkerInfo struct {
Queue string
Payload map[string]interface{}
Started time.Time
Deadline time.Time
}
// SchedulerEntry holds information about a periodic task registered with a scheduler.

View File

@@ -25,6 +25,9 @@ func (r *RDB) AllQueues() ([]string, error) {
type Stats struct {
// Name of the queue (e.g. "default", "critical").
Queue string
// MemoryUsage is the total number of bytes the queue and its tasks require
// to be stored in redis.
MemoryUsage int64
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
@@ -160,9 +163,30 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
}
stats.Size = size
memusg, err := r.memoryUsage(qname)
if err != nil {
return nil, err
}
stats.MemoryUsage = memusg
return stats, nil
}
func (r *RDB) memoryUsage(qname string) (int64, error) {
keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result()
if err != nil {
return 0, err
}
var usg int64
for _, k := range keys {
n, err := r.client.MemoryUsage(k).Result()
if err != nil {
return 0, err
}
usg += n
}
return usg, nil
}
var historicalStatsCmd = redis.NewScript(`
local res = {}
for _, key in ipairs(KEYS) do
@@ -488,6 +512,66 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) erro
return nil
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> task message to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archive (e.g., 100)
var archivePendingCmd = redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 1, ARGV[1])
if x == 0 then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) archivePending(qname, msg string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{msg, now.Unix(), limit, maxArchiveSize}
res, err := archivePendingCmd.Run(r.client, keys, args...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// ArchivePendingTask finds a pending task that matches the given id from the given queue
// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.archivePending(qname, s)
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
}
return ErrTaskNotFound
}
// ArchiveAllRetryTasks archives all retry tasks from the given queue and
// returns the number of tasks that were moved.
func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) {
@@ -500,6 +584,39 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) {
return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname))
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
var archiveAllPendingCmd = redis.NewScript(`
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZADD", KEYS[2], ARGV[1], msg)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
// ArchiveAllPendingTasks archives all pending tasks from the given queue and
// returns the number of tasks that were moved.
func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{now.Unix(), limit, maxArchiveSize}
res, err := archiveAllPendingCmd.Run(r.client, keys, args...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> score of the task to archive
@@ -585,6 +702,33 @@ func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error
return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score))
}
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.client.LRem(qkey, 1, s).Result()
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
}
return ErrTaskNotFound
}
var deleteTaskCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
@@ -647,6 +791,26 @@ func (r *RDB) deleteAll(key string) (int64, error) {
return n, nil
}
// KEYS[1] -> asynq:{<qname>}
var deleteAllPendingCmd = redis.NewScript(`
local n = redis.call("LLEN", KEYS[1])
redis.call("DEL", KEYS[1])
return n`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
res, err := deleteAllPendingCmd.Run(r.client, []string{base.QueueKey(qname)}).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// ErrQueueNotFound indicates specified queue does not exist.
type ErrQueueNotFound struct {
qname string
@@ -741,9 +905,8 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
if err := script.Run(r.client, keys).Err(); err != nil {
if err.Error() == "QUEUE NOT EMPTY" {
return &ErrQueueNotEmpty{qname}
} else {
return err
}
return err
}
return r.client.SRem(base.AllQueues, qname).Err()
}
@@ -811,7 +974,6 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
continue // skip bad data
}
workers = append(workers, &w)
}
}
return workers, nil

View File

@@ -209,7 +209,8 @@ func TestCurrentStats(t *testing.T) {
continue
}
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
ignoreMemUsg := cmpopts.IgnoreFields(Stats{}, "MemoryUsage")
if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" {
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff)
continue
}
@@ -2965,6 +2966,7 @@ func TestListWorkers(t *testing.T) {
Queue: m1.Queue,
Payload: m1.Payload,
Started: time.Now().Add(-1 * time.Second),
Deadline: time.Now().Add(30 * time.Second),
},
{
Host: host,
@@ -2975,6 +2977,7 @@ func TestListWorkers(t *testing.T) {
Queue: m2.Queue,
Payload: m2.Payload,
Started: time.Now().Add(-5 * time.Second),
Deadline: time.Now().Add(10 * time.Minute),
},
{
Host: host,
@@ -2985,6 +2988,7 @@ func TestListWorkers(t *testing.T) {
Queue: m3.Queue,
Payload: m3.Payload,
Started: time.Now().Add(-30 * time.Second),
Deadline: time.Now().Add(30 * time.Minute),
},
},
},
@@ -3017,7 +3021,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) {
schedulerID := "127.0.0.1:9876:abc123"
data := []*base.SchedulerEntry{
&base.SchedulerEntry{
{
Spec: "* * * * *",
Type: "foo",
Payload: nil,
@@ -3025,7 +3029,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) {
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
&base.SchedulerEntry{
{
Spec: "@every 20m",
Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"},
@@ -3067,12 +3071,6 @@ func TestSchedulerEnqueueEvents(t *testing.T) {
oneHourAgo = now.Add(-1 * time.Hour)
)
type event struct {
entryID string
taskID string
enqueuedAt time.Time
}
tests := []struct {
entryID string
events []*base.SchedulerEnqueueEvent
@@ -3121,6 +3119,53 @@ loop:
}
}
func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) {
r := setup(t)
var (
entryID = "entry123"
now = time.Now()
key = base.SchedulerHistoryKey(entryID)
)
// Record maximum number of events.
for i := 1; i <= maxEvents; i++ {
event := base.SchedulerEnqueueEvent{
TaskID: fmt.Sprintf("task%d", i),
EnqueuedAt: now.Add(-time.Duration(i) * time.Second),
}
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
}
}
// Make sure the set is full.
if n := r.client.ZCard(key).Val(); n != maxEvents {
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
}
// Record one more event, should evict the oldest event.
event := base.SchedulerEnqueueEvent{
TaskID: "latest",
EnqueuedAt: now,
}
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
}
if n := r.client.ZCard(key).Val(); n != maxEvents {
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
}
events, err := r.ListSchedulerEnqueueEvents(entryID, Pagination{Size: maxEvents})
if err != nil {
t.Fatalf("ListSchedulerEnqueueEvents failed: %v", err)
}
if first := events[0]; first.TaskID != "latest" {
t.Errorf("unexpected first event; got %q, want %q", first.TaskID, "latest")
}
if last := events[maxEvents-1]; last.TaskID != fmt.Sprintf("task%d", maxEvents-1) {
t.Errorf("unexpected last event; got %q, want %q", last.TaskID, fmt.Sprintf("task%d", maxEvents-1))
}
}
func TestPause(t *testing.T) {
r := setup(t)

View File

@@ -634,12 +634,12 @@ func (r *RDB) PublishCancelation(id string) error {
// ARGV[2] -> serialized SchedulerEnqueueEvent data
// ARGV[3] -> max number of events to be persisted
var recordSchedulerEnqueueEventCmd = redis.NewScript(`
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3])
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3])
return redis.status_reply("OK")`)
// Maximum number of enqueue events to store per entry.
const maxEvents = 10000
const maxEvents = 1000
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
@@ -651,3 +651,9 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE
return recordSchedulerEnqueueEventCmd.Run(
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
}
// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
func (r *RDB) ClearSchedulerHistory(entryID string) error {
key := base.SchedulerHistoryKey(entryID)
return r.client.Del(key).Err()
}

View File

@@ -63,7 +63,7 @@ type processor struct {
// cancelations is a set of cancel functions for all active tasks.
cancelations *base.Cancelations
starting chan<- *base.TaskMessage
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
}
@@ -78,7 +78,7 @@ type processorParams struct {
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time.Duration
starting chan<- *base.TaskMessage
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
}
@@ -180,7 +180,7 @@ func (p *processor) exec() {
return
}
p.starting <- msg
p.starting <- &workerInfo{msg, time.Now(), deadline}
go func() {
defer func() {
p.finished <- msg

View File

@@ -20,7 +20,7 @@ import (
)
// fakeHeartbeater receives from starting and finished channels and do nothing.
func fakeHeartbeater(starting, finished <-chan *base.TaskMessage, done <-chan struct{}) {
func fakeHeartbeater(starting <-chan *workerInfo, finished <-chan *base.TaskMessage, done <-chan struct{}) {
for {
select {
case <-starting:
@@ -86,7 +86,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
@@ -177,7 +177,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
@@ -258,7 +258,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
@@ -389,7 +389,7 @@ func TestProcessorRetry(t *testing.T) {
defer mu.Unlock()
n++
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
done := make(chan struct{})
defer func() { close(done) }()
@@ -470,7 +470,7 @@ func TestProcessorQueues(t *testing.T) {
}
for _, tc := range tests {
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
done := make(chan struct{})
defer func() { close(done) }()
@@ -559,7 +559,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"critical": 3,
"low": 1,
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})

View File

@@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
@@ -34,6 +35,10 @@ type Scheduler struct {
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
if opts == nil {
opts = &SchedulerOpts{}
}
@@ -55,7 +60,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
status: base.NewServerStatus(base.StatusIdle),
logger: logger,
client: NewClient(r),
rdb: rdb.NewRDB(createRedisClient(r)),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
@@ -186,6 +191,7 @@ func (s *Scheduler) Stop() error {
<-ctx.Done()
s.wg.Wait()
s.clearHistory()
s.client.Close()
s.rdb.Close()
s.status.Set(base.StatusStopped)
@@ -237,3 +243,12 @@ func stringifyOptions(opts []Option) []string {
}
return res
}
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
}
}
}

View File

@@ -15,6 +15,7 @@ import (
"sync"
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
@@ -279,6 +280,10 @@ const (
// NewServer returns a new Server given a redis connection option
// and background processing configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
n := cfg.Concurrency
if n < 1 {
n = runtime.NumCPU()
@@ -315,8 +320,8 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
}
logger.SetLevel(toInternalLogLevel(loglevel))
rdb := rdb.NewRDB(createRedisClient(r))
starting := make(chan *base.TaskMessage)
rdb := rdb.NewRDB(c)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
status := base.NewServerStatus(base.StatusIdle)

View File

@@ -11,7 +11,7 @@ import (
"sort"
"time"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/inspeq"
"github.com/spf13/cobra"
)
@@ -108,7 +108,7 @@ func cronHistory(cmd *cobra.Command, args []string) {
fmt.Printf("Entry: %s\n\n", entryID)
events, err := inspector.ListSchedulerEnqueueEvents(
entryID, asynq.PageSize(pageSize), asynq.Page(pageNum))
entryID, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
fmt.Printf("error: %v\n", err)
continue

View File

@@ -10,7 +10,7 @@ import (
"os"
"github.com/fatih/color"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/inspeq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
)
@@ -82,7 +82,7 @@ func queueList(cmd *cobra.Command, args []string) {
type queueInfo struct {
name string
keyslot int64
nodes []asynq.ClusterNode
nodes []inspeq.ClusterNode
}
inspector := createInspector()
queues, err := inspector.Queues()
@@ -141,7 +141,7 @@ func queueInspect(cmd *cobra.Command, args []string) {
}
}
func printQueueStats(s *asynq.QueueStats) {
func printQueueStats(s *inspeq.QueueStats) {
bold := color.New(color.Bold)
bold.Println("Queue Info")
fmt.Printf("Name: %s\n", s.Queue)
@@ -191,7 +191,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
}
}
func printDailyStats(stats []*asynq.DailyStats) {
func printDailyStats(stats []*inspeq.DailyStats) {
printTable(
[]string{"date (UTC)", "processed", "failed", "error rate"},
func(w io.Writer, tmpl string) {

View File

@@ -14,6 +14,7 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/inspeq"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
@@ -135,7 +136,7 @@ func createRDB() *rdb.RDB {
}
// createRDB creates a Inspector instance using flag values and returns it.
func createInspector() *asynq.Inspector {
func createInspector() *inspeq.Inspector {
var connOpt asynq.RedisConnOpt
if useRedisCluster {
addrs := strings.Split(viper.GetString("cluster_addrs"), ",")
@@ -152,7 +153,7 @@ func createInspector() *asynq.Inspector {
TLSConfig: getTLSConfig(),
}
}
return asynq.NewInspector(connOpt)
return inspeq.New(connOpt)
}
func getTLSConfig() *tls.Config {

View File

@@ -10,7 +10,7 @@ import (
"os"
"time"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/inspeq"
"github.com/spf13/cobra"
)
@@ -104,7 +104,7 @@ var taskArchiveCmd = &cobra.Command{
Use: "archive --queue=QUEUE --key=KEY",
Short: "Archive a task with the given key",
Args: cobra.NoArgs,
Run: taskKill,
Run: taskArchive,
}
var taskDeleteCmd = &cobra.Command{
@@ -183,7 +183,7 @@ func taskList(cmd *cobra.Command, args []string) {
func listActiveTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
tasks, err := i.ListActiveTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -204,7 +204,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
func listPendingTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
tasks, err := i.ListPendingTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -214,10 +214,10 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
return
}
printTable(
[]string{"ID", "Type", "Payload"},
[]string{"Key", "Type", "Payload"},
func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload)
}
},
)
@@ -225,7 +225,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
func listScheduledTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
tasks, err := i.ListScheduledTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -248,7 +248,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
func listRetryTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
tasks, err := i.ListRetryTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -267,7 +267,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
} else {
nextRetry = "right now"
}
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry)
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.LastError, t.Retried, t.MaxRetry)
}
},
)
@@ -275,7 +275,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
func listArchivedTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
tasks, err := i.ListArchivedTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -288,7 +288,7 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
[]string{"Key", "Type", "Payload", "Last Failed", "Last Error"},
func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.LastError)
}
})
}
@@ -305,7 +305,7 @@ func taskCancel(cmd *cobra.Command, args []string) {
}
}
func taskKill(cmd *cobra.Command, args []string) {
func taskArchive(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
@@ -323,7 +323,7 @@ func taskKill(cmd *cobra.Command, args []string) {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Println("task transitioned to archived state")
fmt.Println("task archived")
}
func taskDelete(cmd *cobra.Command, args []string) {
@@ -365,7 +365,7 @@ func taskRun(cmd *cobra.Command, args []string) {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Println("task transitioned to pending state")
fmt.Println("task is now pending")
}
func taskArchiveAll(cmd *cobra.Command, args []string) {
@@ -383,6 +383,8 @@ func taskArchiveAll(cmd *cobra.Command, args []string) {
i := createInspector()
var n int
switch state {
case "pending":
n, err = i.ArchiveAllPendingTasks(qname)
case "scheduled":
n, err = i.ArchiveAllScheduledTasks(qname)
case "retry":
@@ -395,7 +397,7 @@ func taskArchiveAll(cmd *cobra.Command, args []string) {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("%d tasks transitioned to archived state\n", n)
fmt.Printf("%d tasks archived\n", n)
}
func taskDeleteAll(cmd *cobra.Command, args []string) {
@@ -413,6 +415,8 @@ func taskDeleteAll(cmd *cobra.Command, args []string) {
i := createInspector()
var n int
switch state {
case "pending":
n, err = i.DeleteAllPendingTasks(qname)
case "scheduled":
n, err = i.DeleteAllScheduledTasks(qname)
case "retry":
@@ -459,5 +463,5 @@ func taskRunAll(cmd *cobra.Command, args []string) {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("%d tasks transitioned to pending state\n", n)
fmt.Printf("%d tasks are now pending\n", n)
}

View File

@@ -3,17 +3,20 @@ module github.com/hibiken/asynq/tools
go 1.13
require (
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6 // indirect
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
github.com/cpuguy83/go-md2man v1.0.10 // indirect
github.com/fatih/color v1.9.0
github.com/go-redis/redis/v7 v7.4.0
github.com/google/uuid v1.1.1
github.com/hibiken/asynq v0.4.0
github.com/hibiken/asynq v0.14.0
github.com/mitchellh/go-homedir v1.1.0
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.6.2
github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.0
github.com/ugorji/go v1.1.4 // indirect
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 // indirect
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77 // indirect
)
replace github.com/hibiken/asynq => ./..

View File

@@ -1,31 +1,54 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
@@ -39,29 +62,61 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
@@ -77,16 +132,27 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.11 h1:FxPOTFNqGkuDUGi3H/qkUbQO4ZiBa2brKq5r0l8TGeM=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -94,11 +160,14 @@ github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
@@ -111,8 +180,11 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
@@ -130,17 +202,25 @@ github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4=
github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.6.2 h1:7aKfF+e8/k68gda3LOjo5RxiUqddoFxVq4BKBPrxk5E=
github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k=
github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
@@ -149,60 +229,137 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e h1:9vRrk9YW2BTzLP0VCB9ZDjU4cPqkg+IDWL7XgxA1yxQ=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
@@ -216,4 +373,10 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=