mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-23 23:36:35 +08:00
create an interface for inspector methods
This commit is contained in:
18
inspector.go
18
inspector.go
@@ -19,7 +19,7 @@ import (
|
||||
// Inspector is a client interface to inspect and mutate the state of
|
||||
// queues and tasks.
|
||||
type Inspector struct {
|
||||
rdb *rdb.RDB
|
||||
rdb base.QueueInspector
|
||||
}
|
||||
|
||||
// New returns a new instance of Inspector.
|
||||
@@ -304,7 +304,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListPending(queue, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -332,7 +332,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListActive(queue, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -372,7 +372,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListAggregating(queue, group, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -401,7 +401,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListScheduled(queue, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -430,7 +430,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListRetry(queue, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -459,7 +459,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListArchived(queue, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -488,7 +488,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
infos, err := i.rdb.ListCompleted(queue, pgn)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
@@ -998,7 +998,7 @@ type SchedulerEnqueueEvent struct {
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) {
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
pgn := base.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
data, err := i.rdb.ListSchedulerEnqueueEvents(entryID, pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -753,3 +753,81 @@ type Broker interface {
|
||||
|
||||
WriteResult(qname, id string, data []byte) (n int, err error)
|
||||
}
|
||||
|
||||
// Pagination specifies the page size and page number
|
||||
// for the list operation.
|
||||
type Pagination struct {
|
||||
// Number of items in the page.
|
||||
Size int
|
||||
|
||||
// Page number starting from zero.
|
||||
Page int
|
||||
}
|
||||
|
||||
func (p Pagination) Start() int64 {
|
||||
return int64(p.Size * p.Page)
|
||||
}
|
||||
|
||||
func (p Pagination) Stop() int64 {
|
||||
return int64(p.Size*p.Page + p.Size - 1)
|
||||
}
|
||||
|
||||
// QueueInspector is the inspector part of a message broker, it exposes information about task queues.
|
||||
//
|
||||
// See rdb.RDB as a reference implementation.
|
||||
type QueueInspector interface {
|
||||
Close() error
|
||||
|
||||
// Describe task and queues
|
||||
AllQueues() ([]string, error)
|
||||
GetTaskInfo(qname, id string) (*TaskInfo, error)
|
||||
ListPending(qname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
ListActive(qname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
ListScheduled(qname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
ListRetry(qname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
ListArchived(qname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
ListCompleted(qname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
ListAggregating(qname, gname string, pgn Pagination) ([]*TaskInfo, error)
|
||||
|
||||
//Scheduler info
|
||||
ListSchedulerEntries() ([]*SchedulerEntry, error)
|
||||
ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*SchedulerEnqueueEvent, error)
|
||||
|
||||
// Stats
|
||||
GroupStats(qname string) ([]*GroupStat, error)
|
||||
CurrentStats(qname string) (*Stats, error)
|
||||
HistoricalStats(qname string, n int) ([]*DailyStats, error)
|
||||
|
||||
//Server info
|
||||
ListServers() ([]*ServerInfo, error)
|
||||
ListWorkers() ([]*WorkerInfo, error)
|
||||
ClusterKeySlot(qname string) (int64, error)
|
||||
ClusterNodes(qname string) ([]redis.ClusterNode, error)
|
||||
|
||||
//Run tasks
|
||||
RunAllScheduledTasks(qname string) (int64, error)
|
||||
RunAllRetryTasks(qname string) (int64, error)
|
||||
RunAllArchivedTasks(qname string) (int64, error)
|
||||
RunAllAggregatingTasks(qname, gname string) (int64, error)
|
||||
RunTask(qname, id string) error
|
||||
|
||||
// Archive actions
|
||||
ArchiveAllRetryTasks(qname string) (int64, error)
|
||||
ArchiveAllScheduledTasks(qname string) (int64, error)
|
||||
ArchiveAllAggregatingTasks(qname, gname string) (int64, error)
|
||||
ArchiveAllPendingTasks(qname string) (int64, error)
|
||||
ArchiveTask(qname, id string) error
|
||||
|
||||
DeleteTask(qname, id string) error
|
||||
DeleteAllArchivedTasks(qname string) (int64, error)
|
||||
DeleteAllRetryTasks(qname string) (int64, error)
|
||||
DeleteAllScheduledTasks(qname string) (int64, error)
|
||||
DeleteAllCompletedTasks(qname string) (int64, error)
|
||||
DeleteAllAggregatingTasks(qname, gname string) (int64, error)
|
||||
DeleteAllPendingTasks(qname string) (int64, error)
|
||||
|
||||
// Queue actions
|
||||
Pause(qname string) error
|
||||
Unpause(qname string) error
|
||||
RemoveQueue(qname string, force bool) error
|
||||
}
|
||||
|
67
internal/base/stats.go
Normal file
67
internal/base/stats.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package base
|
||||
|
||||
import "time"
|
||||
|
||||
type GroupStat struct {
|
||||
// Name of the group.
|
||||
Group string
|
||||
|
||||
// Size of the group.
|
||||
Size int
|
||||
}
|
||||
|
||||
// Stats represents a state of queues at a certain time.
|
||||
type Stats struct {
|
||||
// Name of the queue (e.g. "default", "critical").
|
||||
Queue string
|
||||
// MemoryUsage is the total number of bytes the queue and its tasks require
|
||||
// to be stored in redis. It is an approximate memory usage value in bytes
|
||||
// since the value is computed by sampling.
|
||||
MemoryUsage int64
|
||||
// Paused indicates whether the queue is paused.
|
||||
// If true, tasks in the queue should not be processed.
|
||||
Paused bool
|
||||
// Size is the total number of tasks in the queue.
|
||||
Size int
|
||||
|
||||
// Groups is the total number of groups in the queue.
|
||||
Groups int
|
||||
|
||||
// Number of tasks in each state.
|
||||
Pending int
|
||||
Active int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Archived int
|
||||
Completed int
|
||||
Aggregating int
|
||||
|
||||
// Number of tasks processed within the current date.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
// Number of tasks failed within the current date.
|
||||
Failed int
|
||||
|
||||
// Total number of tasks processed (both succeeded and failed) from this queue.
|
||||
ProcessedTotal int
|
||||
// Total number of tasks failed.
|
||||
FailedTotal int
|
||||
|
||||
// Latency of the queue, measured by the oldest pending task in the queue.
|
||||
Latency time.Duration
|
||||
// Time this stats was taken.
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// DailyStats holds aggregate data for a given day.
|
||||
type DailyStats struct {
|
||||
// Name of the queue (e.g. "default", "critical").
|
||||
Queue string
|
||||
// Total number of tasks processed during the given day.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
// Total number of tasks failed during the given day.
|
||||
Failed int
|
||||
// Date this stats was taken.
|
||||
Time time.Time
|
||||
}
|
@@ -21,62 +21,6 @@ func (r *RDB) AllQueues() ([]string, error) {
|
||||
return r.client.SMembers(context.Background(), base.AllQueues).Result()
|
||||
}
|
||||
|
||||
// Stats represents a state of queues at a certain time.
|
||||
type Stats struct {
|
||||
// Name of the queue (e.g. "default", "critical").
|
||||
Queue string
|
||||
// MemoryUsage is the total number of bytes the queue and its tasks require
|
||||
// to be stored in redis. It is an approximate memory usage value in bytes
|
||||
// since the value is computed by sampling.
|
||||
MemoryUsage int64
|
||||
// Paused indicates whether the queue is paused.
|
||||
// If true, tasks in the queue should not be processed.
|
||||
Paused bool
|
||||
// Size is the total number of tasks in the queue.
|
||||
Size int
|
||||
|
||||
// Groups is the total number of groups in the queue.
|
||||
Groups int
|
||||
|
||||
// Number of tasks in each state.
|
||||
Pending int
|
||||
Active int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Archived int
|
||||
Completed int
|
||||
Aggregating int
|
||||
|
||||
// Number of tasks processed within the current date.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
// Number of tasks failed within the current date.
|
||||
Failed int
|
||||
|
||||
// Total number of tasks processed (both succeeded and failed) from this queue.
|
||||
ProcessedTotal int
|
||||
// Total number of tasks failed.
|
||||
FailedTotal int
|
||||
|
||||
// Latency of the queue, measured by the oldest pending task in the queue.
|
||||
Latency time.Duration
|
||||
// Time this stats was taken.
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// DailyStats holds aggregate data for a given day.
|
||||
type DailyStats struct {
|
||||
// Name of the queue (e.g. "default", "critical").
|
||||
Queue string
|
||||
// Total number of tasks processed during the given day.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
// Total number of tasks failed during the given day.
|
||||
Failed int
|
||||
// Date this stats was taken.
|
||||
Time time.Time
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:<qname>:pending
|
||||
// KEYS[2] -> asynq:<qname>:active
|
||||
// KEYS[3] -> asynq:<qname>:scheduled
|
||||
@@ -137,7 +81,7 @@ table.insert(res, aggregating_count)
|
||||
return res`)
|
||||
|
||||
// CurrentStats returns a current state of the queues.
|
||||
func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
func (r *RDB) CurrentStats(qname string) (*base.Stats, error) {
|
||||
var op errors.Op = "rdb.CurrentStats"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -173,7 +117,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "cast error: unexpected return value from Lua script")
|
||||
}
|
||||
stats := &Stats{
|
||||
stats := &base.Stats{
|
||||
Queue: qname,
|
||||
Timestamp: now,
|
||||
}
|
||||
@@ -360,7 +304,7 @@ end
|
||||
return res`)
|
||||
|
||||
// HistoricalStats returns a list of stats from the last n days for the given queue.
|
||||
func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
|
||||
func (r *RDB) HistoricalStats(qname string, n int) ([]*base.DailyStats, error) {
|
||||
var op errors.Op = "rdb.HistoricalStats"
|
||||
if n < 1 {
|
||||
return nil, errors.E(op, errors.FailedPrecondition, "the number of days must be positive")
|
||||
@@ -390,9 +334,9 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
||||
}
|
||||
var stats []*DailyStats
|
||||
var stats []*base.DailyStats
|
||||
for i := 0; i < len(data); i += 2 {
|
||||
stats = append(stats, &DailyStats{
|
||||
stats = append(stats, &base.DailyStats{
|
||||
Queue: qname,
|
||||
Processed: data[i],
|
||||
Failed: data[i+1],
|
||||
@@ -547,14 +491,6 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
type GroupStat struct {
|
||||
// Name of the group.
|
||||
Group string
|
||||
|
||||
// Size of the group.
|
||||
Size int
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:groups
|
||||
// -------
|
||||
// ARGV[1] -> group key prefix
|
||||
@@ -575,7 +511,7 @@ end
|
||||
return res
|
||||
`)
|
||||
|
||||
func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) {
|
||||
func (r *RDB) GroupStats(qname string) ([]*base.GroupStat, error) {
|
||||
var op errors.Op = "RDB.GroupStats"
|
||||
keys := []string{base.AllGroups(qname)}
|
||||
argv := []interface{}{base.GroupKeyPrefix(qname)}
|
||||
@@ -587,9 +523,9 @@ func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) {
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "cast error: unexpected return value from Lua script")
|
||||
}
|
||||
var stats []*GroupStat
|
||||
var stats []*base.GroupStat
|
||||
for i := 0; i < len(data); i += 2 {
|
||||
stats = append(stats, &GroupStat{
|
||||
stats = append(stats, &base.GroupStat{
|
||||
Group: data[i].(string),
|
||||
Size: int(data[i+1].(int64)),
|
||||
})
|
||||
@@ -597,26 +533,8 @@ func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) {
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// Pagination specifies the page size and page number
|
||||
// for the list operation.
|
||||
type Pagination struct {
|
||||
// Number of items in the page.
|
||||
Size int
|
||||
|
||||
// Page number starting from zero.
|
||||
Page int
|
||||
}
|
||||
|
||||
func (p Pagination) start() int64 {
|
||||
return int64(p.Size * p.Page)
|
||||
}
|
||||
|
||||
func (p Pagination) stop() int64 {
|
||||
return int64(p.Size*p.Page + p.Size - 1)
|
||||
}
|
||||
|
||||
// ListPending returns pending tasks that are ready to be processed.
|
||||
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListPending(qname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListPending"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -633,7 +551,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error
|
||||
}
|
||||
|
||||
// ListActive returns all tasks that are currently being processed for the given queue.
|
||||
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListActive(qname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListActive"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -666,7 +584,7 @@ return data
|
||||
`)
|
||||
|
||||
// listMessages returns a list of TaskInfo in Redis list with the given key.
|
||||
func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) listMessages(qname string, state base.TaskState, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var key string
|
||||
switch state {
|
||||
case base.TaskStateActive:
|
||||
@@ -677,9 +595,9 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) (
|
||||
panic(fmt.Sprintf("unsupported task state: %v", state))
|
||||
}
|
||||
// Note: Because we use LPUSH to redis list, we need to calculate the
|
||||
// correct range and reverse the list to get the tasks with pagination.
|
||||
stop := -pgn.start() - 1
|
||||
start := -pgn.stop() - 1
|
||||
// correct range and reverse the list to get the tasks with base.Pagination.
|
||||
stop := -pgn.Start() - 1
|
||||
start := -pgn.Stop() - 1
|
||||
res, err := listMessagesCmd.Run(context.Background(), r.client,
|
||||
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
|
||||
if err != nil {
|
||||
@@ -717,7 +635,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) (
|
||||
|
||||
// ListScheduled returns all tasks from the given queue that are scheduled
|
||||
// to be processed in the future.
|
||||
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListScheduled(qname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListScheduled"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -735,7 +653,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, err
|
||||
|
||||
// ListRetry returns all tasks from the given queue that have failed before
|
||||
// and willl be retried in the future.
|
||||
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListRetry(qname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListRetry"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -752,7 +670,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error)
|
||||
}
|
||||
|
||||
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
|
||||
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListArchived(qname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListArchived"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -769,7 +687,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, erro
|
||||
}
|
||||
|
||||
// ListCompleted returns all tasks from the given queue that have completed successfully.
|
||||
func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListCompleted(qname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListCompleted"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -786,7 +704,7 @@ func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, err
|
||||
}
|
||||
|
||||
// ListAggregating returns all tasks from the given group.
|
||||
func (r *RDB) ListAggregating(qname, gname string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) ListAggregating(qname, gname string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.ListAggregating"
|
||||
exists, err := r.queueExists(qname)
|
||||
if err != nil {
|
||||
@@ -831,9 +749,9 @@ return data
|
||||
|
||||
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
|
||||
// with the given key.
|
||||
func (r *RDB) listZSetEntries(qname string, state base.TaskState, key string, pgn Pagination) ([]*base.TaskInfo, error) {
|
||||
func (r *RDB) listZSetEntries(qname string, state base.TaskState, key string, pgn base.Pagination) ([]*base.TaskInfo, error) {
|
||||
res, err := listZSetEntriesCmd.Run(context.Background(), r.client, []string{key},
|
||||
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
|
||||
pgn.Start(), pgn.Stop(), base.TaskKeyPrefix(qname)).Result()
|
||||
if err != nil {
|
||||
return nil, errors.E(errors.Unknown, err)
|
||||
}
|
||||
@@ -1947,9 +1865,9 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
|
||||
}
|
||||
|
||||
// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
|
||||
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) {
|
||||
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn base.Pagination) ([]*base.SchedulerEnqueueEvent, error) {
|
||||
key := base.SchedulerHistoryKey(entryID)
|
||||
zs, err := r.client.ZRevRangeWithScores(context.Background(), key, pgn.start(), pgn.stop()).Result()
|
||||
zs, err := r.client.ZRevRangeWithScores(context.Background(), key, pgn.Start(), pgn.Stop()).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -796,8 +796,8 @@ func TestListPending(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
|
||||
got, err := r.ListPending(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListPending(tc.qname, base.Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListPending(%q, base.Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
continue
|
||||
@@ -809,7 +809,7 @@ func TestListPending(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListPendingPagination(t *testing.T) {
|
||||
func TestListPendingbase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
var msgs []*base.TaskMessage
|
||||
@@ -846,8 +846,8 @@ func TestListPendingPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListPending(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
got, err := r.ListPending(tc.qname, base.Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListPending(%q, base.Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
continue
|
||||
@@ -915,8 +915,8 @@ func TestListActive(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
||||
|
||||
got, err := r.ListActive(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListActive(tc.qname, base.Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListActive(%q, base.Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress)
|
||||
continue
|
||||
@@ -928,7 +928,7 @@ func TestListActive(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListActivePagination(t *testing.T) {
|
||||
func TestListActivebase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
var msgs []*base.TaskMessage
|
||||
@@ -955,8 +955,8 @@ func TestListActivePagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListActive(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
got, err := r.ListActive(tc.qname, base.Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListActive(%q, base.Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
continue
|
||||
@@ -1050,8 +1050,8 @@ func TestListScheduled(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
|
||||
got, err := r.ListScheduled(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListScheduled(tc.qname, base.Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListScheduled(%q, base.Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
continue
|
||||
@@ -1063,7 +1063,7 @@ func TestListScheduled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListScheduledPagination(t *testing.T) {
|
||||
func TestListScheduledbase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
// create 100 tasks with an increasing number of wait time.
|
||||
@@ -1091,8 +1091,8 @@ func TestListScheduledPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListScheduled(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
got, err := r.ListScheduled(tc.qname, base.Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListScheduled(%q, base.Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
continue
|
||||
@@ -1204,8 +1204,8 @@ func TestListRetry(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
|
||||
got, err := r.ListRetry(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListRetry(tc.qname, base.Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListRetry(%q, base.Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
continue
|
||||
@@ -1218,7 +1218,7 @@ func TestListRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListRetryPagination(t *testing.T) {
|
||||
func TestListRetrybase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
// create 100 tasks with an increasing number of wait time.
|
||||
@@ -1248,8 +1248,8 @@ func TestListRetryPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListRetry(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: %d, Page: %d})",
|
||||
got, err := r.ListRetry(tc.qname, base.Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListRetry(%q, base.Pagination{Size: %d, Page: %d})",
|
||||
tc.qname, tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
@@ -1357,8 +1357,8 @@ func TestListArchived(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got, err := r.ListArchived(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListArchived(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListArchived(tc.qname, base.Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListArchived(%q, base.Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
continue
|
||||
@@ -1371,7 +1371,7 @@ func TestListArchived(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListArchivedPagination(t *testing.T) {
|
||||
func TestListArchivedbase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
var entries []base.Z
|
||||
@@ -1398,8 +1398,8 @@ func TestListArchivedPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListArchived(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListArchived(Pagination{Size: %d, Page: %d})",
|
||||
got, err := r.ListArchived(tc.qname, base.Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListArchived(base.Pagination{Size: %d, Page: %d})",
|
||||
tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
@@ -1497,8 +1497,8 @@ func TestListCompleted(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllCompletedQueues(t, r.client, tc.completed)
|
||||
|
||||
got, err := r.ListCompleted(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListCompleted(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListCompleted(tc.qname, base.Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListCompleted(%q, base.Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
continue
|
||||
@@ -1512,7 +1512,7 @@ func TestListCompleted(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestListCompletedPagination(t *testing.T) {
|
||||
func TestListCompletedbase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
var entries []base.Z
|
||||
@@ -1539,8 +1539,8 @@ func TestListCompletedPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListCompleted(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListCompleted(Pagination{Size: %d, Page: %d})",
|
||||
got, err := r.ListCompleted(tc.qname, base.Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListCompleted(base.Pagination{Size: %d, Page: %d})",
|
||||
tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
@@ -1645,7 +1645,7 @@ func TestListAggregating(t *testing.T) {
|
||||
h.SeedRedisZSets(t, r.client, fxt.groups)
|
||||
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got, err := r.ListAggregating(tc.qname, tc.gname, Pagination{})
|
||||
got, err := r.ListAggregating(tc.qname, tc.gname, base.Pagination{})
|
||||
if err != nil {
|
||||
t.Fatalf("ListAggregating returned error: %v", err)
|
||||
}
|
||||
@@ -1656,7 +1656,7 @@ func TestListAggregating(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListAggregatingPagination(t *testing.T) {
|
||||
func TestListAggregatingbase.Pagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
@@ -1759,7 +1759,7 @@ func TestListAggregatingPagination(t *testing.T) {
|
||||
h.SeedRedisZSets(t, r.client, fxt.groups)
|
||||
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got, err := r.ListAggregating(tc.qname, tc.gname, Pagination{Page: tc.page, Size: tc.size})
|
||||
got, err := r.ListAggregating(tc.qname, tc.gname, base.Pagination{Page: tc.page, Size: tc.size})
|
||||
if err != nil {
|
||||
t.Fatalf("ListAggregating returned error: %v", err)
|
||||
}
|
||||
@@ -1802,7 +1802,7 @@ func TestListTasksError(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
pgn := Pagination{Page: 0, Size: 20}
|
||||
pgn := base.Pagination{Page: 0, Size: 20}
|
||||
if _, got := r.ListActive(tc.qname, pgn); !tc.match(got) {
|
||||
t.Errorf("%s: ListActive returned %v", tc.desc, got)
|
||||
}
|
||||
@@ -5418,7 +5418,7 @@ loop:
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
got, err := r.ListSchedulerEnqueueEvents(tc.entryID, Pagination{Size: 20, Page: 0})
|
||||
got, err := r.ListSchedulerEnqueueEvents(tc.entryID, base.Pagination{Size: 20, Page: 0})
|
||||
if err != nil {
|
||||
t.Errorf("ListSchedulerEnqueueEvents(%q) failed: %v", tc.entryID, err)
|
||||
continue
|
||||
@@ -5465,7 +5465,7 @@ func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) {
|
||||
if n := r.client.ZCard(context.Background(), key).Val(); n != maxEvents {
|
||||
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
|
||||
}
|
||||
events, err := r.ListSchedulerEnqueueEvents(entryID, Pagination{Size: maxEvents})
|
||||
events, err := r.ListSchedulerEnqueueEvents(entryID, base.Pagination{Size: maxEvents})
|
||||
if err != nil {
|
||||
t.Fatalf("ListSchedulerEnqueueEvents failed: %v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user