mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Rename error types
This commit is contained in:
parent
1db516c53c
commit
76486b5cb4
@ -5,6 +5,7 @@
|
|||||||
package inspeq
|
package inspeq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -134,23 +135,12 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrQueueNotFound indicates that the specified queue does not exist.
|
var (
|
||||||
type ErrQueueNotFound struct {
|
// ErrQueueNotFound indicates that the specified queue does not exist.
|
||||||
qname string
|
ErrQueueNotFound = errors.New("queue not found")
|
||||||
}
|
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
||||||
|
ErrQueueNotEmpty = errors.New("queue is not empty")
|
||||||
func (e *ErrQueueNotFound) Error() string {
|
)
|
||||||
return fmt.Sprintf("queue %q does not exist", e.qname)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrQueueNotEmpty indicates that the specified queue is not empty.
|
|
||||||
type ErrQueueNotEmpty struct {
|
|
||||||
qname string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ErrQueueNotEmpty) Error() string {
|
|
||||||
return fmt.Sprintf("queue %q is not empty", e.qname)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteQueue removes the specified queue.
|
// DeleteQueue removes the specified queue.
|
||||||
//
|
//
|
||||||
@ -164,11 +154,11 @@ func (e *ErrQueueNotEmpty) Error() string {
|
|||||||
// returns ErrQueueNotEmpty.
|
// returns ErrQueueNotEmpty.
|
||||||
func (i *Inspector) DeleteQueue(qname string, force bool) error {
|
func (i *Inspector) DeleteQueue(qname string, force bool) error {
|
||||||
err := i.rdb.RemoveQueue(qname, force)
|
err := i.rdb.RemoveQueue(qname, force)
|
||||||
if _, ok := err.(*rdb.ErrQueueNotFound); ok {
|
if _, ok := err.(*rdb.QueueNotFoundError); ok {
|
||||||
return &ErrQueueNotFound{qname}
|
return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, qname)
|
||||||
}
|
}
|
||||||
if _, ok := err.(*rdb.ErrQueueNotEmpty); ok {
|
if _, ok := err.(*rdb.QueueNotEmptyError); ok {
|
||||||
return &ErrQueueNotEmpty{qname}
|
return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, qname)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, &ErrQueueNotFound{qname}
|
return nil, &QueueNotFoundError{qname}
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
res, err := currentStatsCmd.Run(r.client, []string{
|
res, err := currentStatsCmd.Run(r.client, []string{
|
||||||
@ -219,7 +219,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, &ErrQueueNotFound{qname}
|
return nil, &QueueNotFoundError{qname}
|
||||||
}
|
}
|
||||||
const day = 24 * time.Hour
|
const day = 24 * time.Hour
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
@ -594,6 +594,9 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// archiveTaskCmd is a Lua script that arhives a task given a task id.
|
||||||
|
//
|
||||||
|
// Input:
|
||||||
// KEYS[1] -> task key (asynq:{<qname>}:t:<task_id>)
|
// KEYS[1] -> task key (asynq:{<qname>}:t:<task_id>)
|
||||||
// KEYS[2] -> archived key (asynq:{<qname>}:archived)
|
// KEYS[2] -> archived key (asynq:{<qname>}:archived)
|
||||||
// ARGV[1] -> id of the task to archive
|
// ARGV[1] -> id of the task to archive
|
||||||
@ -601,6 +604,9 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
|
|||||||
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
|
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
|
||||||
// ARGV[4] -> max number of tasks in archived state (e.g., 100)
|
// ARGV[4] -> max number of tasks in archived state (e.g., 100)
|
||||||
// ARGV[5] -> queue key prefix (asynq:{<qname>}:)
|
// ARGV[5] -> queue key prefix (asynq:{<qname>}:)
|
||||||
|
//
|
||||||
|
// Output:
|
||||||
|
// TODO: document return value of the script
|
||||||
var archiveTaskCmd = redis.NewScript(`
|
var archiveTaskCmd = redis.NewScript(`
|
||||||
if redis.call("EXISTS", KEYS[1]) == 0 then
|
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||||
return 0
|
return 0
|
||||||
@ -826,22 +832,26 @@ func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrQueueNotFound indicates specified queue does not exist.
|
// QueueNotFoundError indicates specified queue does not exist.
|
||||||
type ErrQueueNotFound struct {
|
type QueueNotFoundError struct {
|
||||||
qname string
|
Name string // name of the queue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ErrQueueNotFound) Error() string {
|
func (e *QueueNotFoundError) Unwrap() error { return base.ErrNotFound }
|
||||||
return fmt.Sprintf("queue %q does not exist", e.qname)
|
|
||||||
|
func (e *QueueNotFoundError) Error() string {
|
||||||
|
return fmt.Sprintf("queue %q does not exist", e.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrQueueNotEmpty indicates specified queue is not empty.
|
// QueueNotEmptyError indicates specified queue is not empty.
|
||||||
type ErrQueueNotEmpty struct {
|
type QueueNotEmptyError struct {
|
||||||
qname string
|
Name string // name of the queue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ErrQueueNotEmpty) Error() string {
|
func (e *QueueNotEmptyError) Unwrap() error { return base.ErrFailedPrecondition }
|
||||||
return fmt.Sprintf("queue %q is not empty", e.qname)
|
|
||||||
|
func (e *QueueNotEmptyError) Error() string {
|
||||||
|
return fmt.Sprintf("queue %q is not empty", e.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only check whether active queue is empty before removing.
|
// Only check whether active queue is empty before removing.
|
||||||
@ -931,7 +941,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
return &ErrQueueNotFound{qname}
|
return &QueueNotFoundError{qname}
|
||||||
}
|
}
|
||||||
var script *redis.Script
|
var script *redis.Script
|
||||||
if force {
|
if force {
|
||||||
@ -949,7 +959,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
|||||||
}
|
}
|
||||||
if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil {
|
if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil {
|
||||||
if err.Error() == "QUEUE NOT EMPTY" {
|
if err.Error() == "QUEUE NOT EMPTY" {
|
||||||
return &ErrQueueNotEmpty{qname}
|
return &QueueNotEmptyError{qname}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -224,7 +224,7 @@ func TestCurrentStatsWithNonExistentQueue(t *testing.T) {
|
|||||||
qname := "non-existent"
|
qname := "non-existent"
|
||||||
got, err := r.CurrentStats(qname)
|
got, err := r.CurrentStats(qname)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("r.CurrentStats(%q) = %v, %v, want nil, %v", qname, got, err, &ErrQueueNotFound{qname})
|
t.Fatalf("r.CurrentStats(%q) = %v, %v, want nil, %v", qname, got, err, &QueueNotFoundError{qname})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user