mirror of
https://github.com/hibiken/asynq.git
synced 2025-08-19 15:08:55 +08:00
Rename DeadTask to ArchivedTask and action "kill" to "archive"
This commit is contained in:
@@ -220,11 +220,11 @@ func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qn
|
||||
seedRedisZSet(tb, r, base.RetryKey(qname), entries)
|
||||
}
|
||||
|
||||
// SeedDeadQueue initializes the dead queue with the given messages.
|
||||
func SeedDeadQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||
// SeedArchivedQueue initializes the archived queue with the given messages.
|
||||
func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||
tb.Helper()
|
||||
r.SAdd(base.AllQueues, qname)
|
||||
seedRedisZSet(tb, r, base.DeadKey(qname), entries)
|
||||
seedRedisZSet(tb, r, base.ArchivedKey(qname), entries)
|
||||
}
|
||||
|
||||
// SeedDeadlines initializes the deadlines set with the given entries.
|
||||
@@ -264,10 +264,10 @@ func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string
|
||||
}
|
||||
}
|
||||
|
||||
// SeedAllDeadQueues initializes all of the specified dead queues with the given entries.
|
||||
func SeedAllDeadQueues(tb testing.TB, r redis.UniversalClient, dead map[string][]base.Z) {
|
||||
for q, entries := range dead {
|
||||
SeedDeadQueue(tb, r, entries, q)
|
||||
// SeedAllArchivedQueues initializes all of the specified archived queues with the given entries.
|
||||
func SeedAllArchivedQueues(tb testing.TB, r redis.UniversalClient, archived map[string][]base.Z) {
|
||||
for q, entries := range archived {
|
||||
SeedArchivedQueue(tb, r, entries, q)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,10 +320,10 @@ func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*b
|
||||
return getZSetMessages(tb, r, base.RetryKey(qname))
|
||||
}
|
||||
|
||||
// GetDeadMessages returns all dead messages in the given queue.
|
||||
func GetDeadMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||
// GetArchivedMessages returns all archived messages in the given queue.
|
||||
func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||
tb.Helper()
|
||||
return getZSetMessages(tb, r, base.DeadKey(qname))
|
||||
return getZSetMessages(tb, r, base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// GetScheduledEntries returns all scheduled messages and its score in the given queue.
|
||||
@@ -338,10 +338,10 @@ func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []bas
|
||||
return getZSetEntries(tb, r, base.RetryKey(qname))
|
||||
}
|
||||
|
||||
// GetDeadEntries returns all dead messages and its score in the given queue.
|
||||
func GetDeadEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||
// GetArchivedEntries returns all archived messages and its score in the given queue.
|
||||
func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||
tb.Helper()
|
||||
return getZSetEntries(tb, r, base.DeadKey(qname))
|
||||
return getZSetEntries(tb, r, base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
|
||||
|
@@ -56,9 +56,9 @@ func RetryKey(qname string) string {
|
||||
return fmt.Sprintf("asynq:{%s}:retry", qname)
|
||||
}
|
||||
|
||||
// DeadKey returns a redis key for the dead tasks.
|
||||
func DeadKey(qname string) string {
|
||||
return fmt.Sprintf("asynq:{%s}:dead", qname)
|
||||
// ArchivedKey returns a redis key for the archived tasks.
|
||||
func ArchivedKey(qname string) string {
|
||||
return fmt.Sprintf("asynq:{%s}:archived", qname)
|
||||
}
|
||||
|
||||
// DeadlinesKey returns a redis key for the deadlines.
|
||||
@@ -156,7 +156,7 @@ type TaskMessage struct {
|
||||
|
||||
// Timeout specifies timeout in seconds.
|
||||
// If task processing doesn't complete within the timeout, the task will be retried
|
||||
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
||||
// if retry count is remaining. Otherwise it will be moved to the archive.
|
||||
//
|
||||
// Use zero to indicate no timeout.
|
||||
Timeout int64
|
||||
@@ -164,7 +164,7 @@ type TaskMessage struct {
|
||||
// Deadline specifies the deadline for the task in Unix time,
|
||||
// the number of seconds elapsed since January 1, 1970 UTC.
|
||||
// If task processing doesn't complete before the deadline, the task will be retried
|
||||
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
||||
// if retry count is remaining. Otherwise it will be moved to the archive.
|
||||
//
|
||||
// Use zero to indicate no deadline.
|
||||
Deadline int64
|
||||
@@ -369,7 +369,7 @@ type Broker interface {
|
||||
Schedule(msg *TaskMessage, processAt time.Time) error
|
||||
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
|
||||
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
|
||||
Kill(msg *TaskMessage, errMsg string) error
|
||||
Archive(msg *TaskMessage, errMsg string) error
|
||||
CheckAndEnqueue(qnames ...string) error
|
||||
ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
|
||||
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
||||
|
@@ -100,19 +100,19 @@ func TestRetryKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeadKey(t *testing.T) {
|
||||
func TestArchivedKey(t *testing.T) {
|
||||
tests := []struct {
|
||||
qname string
|
||||
want string
|
||||
}{
|
||||
{"default", "asynq:{default}:dead"},
|
||||
{"custom", "asynq:{custom}:dead"},
|
||||
{"default", "asynq:{default}:archived"},
|
||||
{"custom", "asynq:{custom}:archived"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got := DeadKey(tc.qname)
|
||||
got := ArchivedKey(tc.qname)
|
||||
if got != tc.want {
|
||||
t.Errorf("DeadKey(%q) = %q, want %q", tc.qname, got, tc.want)
|
||||
t.Errorf("ArchivedKey(%q) = %q, want %q", tc.qname, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -35,7 +35,7 @@ type Stats struct {
|
||||
Active int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
Archived int
|
||||
// Total number of tasks processed during the current date.
|
||||
// The number includes both succeeded and failed tasks.
|
||||
Processed int
|
||||
@@ -62,7 +62,7 @@ type DailyStats struct {
|
||||
// KEYS[2] -> asynq:<qname>:active
|
||||
// KEYS[3] -> asynq:<qname>:scheduled
|
||||
// KEYS[4] -> asynq:<qname>:retry
|
||||
// KEYS[5] -> asynq:<qname>:dead
|
||||
// KEYS[5] -> asynq:<qname>:archived
|
||||
// KEYS[6] -> asynq:<qname>:processed:<yyyy-mm-dd>
|
||||
// KEYS[7] -> asynq:<qname>:failed:<yyyy-mm-dd>
|
||||
// KEYS[8] -> asynq:<qname>:paused
|
||||
@@ -111,7 +111,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
base.ActiveKey(qname),
|
||||
base.ScheduledKey(qname),
|
||||
base.RetryKey(qname),
|
||||
base.DeadKey(qname),
|
||||
base.ArchivedKey(qname),
|
||||
base.ProcessedKey(qname, now),
|
||||
base.FailedKey(qname, now),
|
||||
base.PausedKey(qname),
|
||||
@@ -144,8 +144,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
case base.RetryKey(qname):
|
||||
stats.Retry = val
|
||||
size += val
|
||||
case base.DeadKey(qname):
|
||||
stats.Dead = val
|
||||
case base.ArchivedKey(qname):
|
||||
stats.Archived = val
|
||||
size += val
|
||||
case base.ProcessedKey(qname, now):
|
||||
stats.Processed = val
|
||||
@@ -328,12 +328,12 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
|
||||
return r.listZSetEntries(base.RetryKey(qname), pgn)
|
||||
}
|
||||
|
||||
// ListDead returns all tasks from the given queue that have exhausted its retry limit.
|
||||
func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) {
|
||||
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
|
||||
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
|
||||
if !r.client.SIsMember(base.AllQueues, qname).Val() {
|
||||
return nil, fmt.Errorf("queue %q does not exist", qname)
|
||||
}
|
||||
return r.listZSetEntries(base.DeadKey(qname), pgn)
|
||||
return r.listZSetEntries(base.ArchivedKey(qname), pgn)
|
||||
}
|
||||
|
||||
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
|
||||
@@ -358,11 +358,11 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// RunDeadTask finds a dead task that matches the given id and score from
|
||||
// RunArchivedTask finds an archived task that matches the given id and score from
|
||||
// the given queue and enqueues it for processing.
|
||||
//If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) RunDeadTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndRun(base.DeadKey(qname), base.QueueKey(qname), id.String(), float64(score))
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -412,10 +412,10 @@ func (r *RDB) RunAllRetryTasks(qname string) (int64, error) {
|
||||
return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname))
|
||||
}
|
||||
|
||||
// RunAllDeadTasks enqueues all tasks from dead queue
|
||||
// RunAllArchivedTasks enqueues all archived tasks from the given queue
|
||||
// and returns the number of tasks enqueued.
|
||||
func (r *RDB) RunAllDeadTasks(qname string) (int64, error) {
|
||||
return r.removeAndRunAll(base.DeadKey(qname), base.QueueKey(qname))
|
||||
func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
|
||||
return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname))
|
||||
}
|
||||
|
||||
var removeAndRunCmd = redis.NewScript(`
|
||||
@@ -462,10 +462,10 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// KillRetryTask finds a retry task that matches the given id and score from the given queue
|
||||
// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndKill(base.RetryKey(qname), base.DeadKey(qname), id.String(), float64(score))
|
||||
// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue
|
||||
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -475,10 +475,10 @@ func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillScheduledTask finds a scheduled task that matches the given id and score from the given queue
|
||||
// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndKill(base.ScheduledKey(qname), base.DeadKey(qname), id.String(), float64(score))
|
||||
// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue
|
||||
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error {
|
||||
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -488,26 +488,26 @@ func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillAllRetryTasks kills all retry tasks from the given queue and
|
||||
// ArchiveAllRetryTasks archives all retry tasks from the given queue and
|
||||
// returns the number of tasks that were moved.
|
||||
func (r *RDB) KillAllRetryTasks(qname string) (int64, error) {
|
||||
return r.removeAndKillAll(base.RetryKey(qname), base.DeadKey(qname))
|
||||
func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) {
|
||||
return r.removeAndArchiveAll(base.RetryKey(qname), base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// KillAllScheduledTasks kills all scheduled tasks from the given queue and
|
||||
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and
|
||||
// returns the number of tasks that were moved.
|
||||
func (r *RDB) KillAllScheduledTasks(qname string) (int64, error) {
|
||||
return r.removeAndKillAll(base.ScheduledKey(qname), base.DeadKey(qname))
|
||||
func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) {
|
||||
return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
|
||||
// KEYS[2] -> asynq:{<qname>}:dead
|
||||
// ARGV[1] -> score of the task to kill
|
||||
// ARGV[2] -> id of the task to kill
|
||||
// KEYS[2] -> asynq:{<qname>}:archived
|
||||
// ARGV[1] -> score of the task to archive
|
||||
// ARGV[2] -> id of the task to archive
|
||||
// ARGV[3] -> current timestamp
|
||||
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||
var removeAndKillCmd = redis.NewScript(`
|
||||
// ARGV[5] -> max number of tasks in archived state (e.g., 100)
|
||||
var removeAndArchiveCmd = redis.NewScript(`
|
||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
|
||||
for _, msg in ipairs(msgs) do
|
||||
local decoded = cjson.decode(msg)
|
||||
@@ -521,12 +521,12 @@ for _, msg in ipairs(msgs) do
|
||||
end
|
||||
return 0`)
|
||||
|
||||
func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) {
|
||||
func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) {
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndKillCmd.Run(r.client,
|
||||
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndArchiveCmd.Run(r.client,
|
||||
[]string{src, dst},
|
||||
score, id, now.Unix(), limit, maxDeadTasks).Result()
|
||||
score, id, now.Unix(), limit, maxArchiveSize).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -538,11 +538,11 @@ func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) {
|
||||
}
|
||||
|
||||
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
|
||||
// KEYS[2] -> asynq:{<qname>}:dead
|
||||
// KEYS[2] -> asynq:{<qname>}:archived
|
||||
// ARGV[1] -> current timestamp
|
||||
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[3] -> max number of tasks in dead queue (e.g., 100)
|
||||
var removeAndKillAllCmd = redis.NewScript(`
|
||||
// ARGV[3] -> max number of tasks in archive (e.g., 100)
|
||||
var removeAndArchiveAllCmd = redis.NewScript(`
|
||||
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
|
||||
for _, msg in ipairs(msgs) do
|
||||
redis.call("ZADD", KEYS[2], ARGV[1], msg)
|
||||
@@ -552,11 +552,11 @@ for _, msg in ipairs(msgs) do
|
||||
end
|
||||
return table.getn(msgs)`)
|
||||
|
||||
func (r *RDB) removeAndKillAll(src, dst string) (int64, error) {
|
||||
func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) {
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndKillAllCmd.Run(r.client, []string{src, dst},
|
||||
now.Unix(), limit, maxDeadTasks).Result()
|
||||
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
||||
res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst},
|
||||
now.Unix(), limit, maxArchiveSize).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -567,10 +567,10 @@ func (r *RDB) removeAndKillAll(src, dst string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// DeleteDeadTask deletes a dead task that matches the given id and score from the given queue.
|
||||
// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue.
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) DeleteDeadTask(qname string, id uuid.UUID, score int64) error {
|
||||
return r.deleteTask(base.DeadKey(qname), id.String(), float64(score))
|
||||
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error {
|
||||
return r.deleteTask(base.ArchivedKey(qname), id.String(), float64(score))
|
||||
}
|
||||
|
||||
// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue.
|
||||
@@ -617,10 +617,10 @@ local n = redis.call("ZCARD", KEYS[1])
|
||||
redis.call("DEL", KEYS[1])
|
||||
return n`)
|
||||
|
||||
// DeleteAllDeadTasks deletes all dead tasks from the given queue
|
||||
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
|
||||
// and returns the number of tasks deleted.
|
||||
func (r *RDB) DeleteAllDeadTasks(qname string) (int64, error) {
|
||||
return r.deleteAll(base.DeadKey(qname))
|
||||
func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) {
|
||||
return r.deleteAll(base.ArchivedKey(qname))
|
||||
}
|
||||
|
||||
// DeleteAllRetryTasks deletes all retry tasks from the given queue
|
||||
@@ -670,7 +670,7 @@ func (e *ErrQueueNotEmpty) Error() string {
|
||||
// KEYS[2] -> asynq:{<qname>}:active
|
||||
// KEYS[3] -> asynq:{<qname>}:scheduled
|
||||
// KEYS[4] -> asynq:{<qname>}:retry
|
||||
// KEYS[5] -> asynq:{<qname>}:dead
|
||||
// KEYS[5] -> asynq:{<qname>}:archived
|
||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
||||
var removeQueueForceCmd = redis.NewScript(`
|
||||
local active = redis.call("LLEN", KEYS[2])
|
||||
@@ -690,15 +690,15 @@ return redis.status_reply("OK")`)
|
||||
// KEYS[2] -> asynq:{<qname>}:active
|
||||
// KEYS[3] -> asynq:{<qname>}:scheduled
|
||||
// KEYS[4] -> asynq:{<qname>}:retry
|
||||
// KEYS[5] -> asynq:{<qname>}:dead
|
||||
// KEYS[5] -> asynq:{<qname>}:archived
|
||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
||||
var removeQueueCmd = redis.NewScript(`
|
||||
local pending = redis.call("LLEN", KEYS[1])
|
||||
local active = redis.call("LLEN", KEYS[2])
|
||||
local scheduled = redis.call("SCARD", KEYS[3])
|
||||
local retry = redis.call("SCARD", KEYS[4])
|
||||
local dead = redis.call("SCARD", KEYS[5])
|
||||
local total = pending + active + scheduled + retry + dead
|
||||
local archived = redis.call("SCARD", KEYS[5])
|
||||
local total = pending + active + scheduled + retry + archived
|
||||
if total > 0 then
|
||||
return redis.error_reply("QUEUE NOT EMPTY")
|
||||
end
|
||||
@@ -735,7 +735,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
||||
base.ActiveKey(qname),
|
||||
base.ScheduledKey(qname),
|
||||
base.RetryKey(qname),
|
||||
base.DeadKey(qname),
|
||||
base.ArchivedKey(qname),
|
||||
base.DeadlinesKey(qname),
|
||||
}
|
||||
if err := script.Run(r.client, keys).Err(); err != nil {
|
||||
|
@@ -63,7 +63,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
processed map[string]int
|
||||
failed map[string]int
|
||||
paused []string
|
||||
@@ -94,7 +94,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
"critical": {},
|
||||
"low": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@@ -119,7 +119,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
Active: 1,
|
||||
Scheduled: 2,
|
||||
Retry: 0,
|
||||
Dead: 0,
|
||||
Archived: 0,
|
||||
Processed: 120,
|
||||
Failed: 2,
|
||||
Timestamp: now,
|
||||
@@ -149,7 +149,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
"critical": {},
|
||||
"low": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@@ -174,7 +174,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
Active: 0,
|
||||
Scheduled: 0,
|
||||
Retry: 0,
|
||||
Dead: 0,
|
||||
Archived: 0,
|
||||
Processed: 100,
|
||||
Failed: 0,
|
||||
Timestamp: now,
|
||||
@@ -193,7 +193,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
for qname, n := range tc.processed {
|
||||
processedKey := base.ProcessedKey(qname, now)
|
||||
r.client.Set(processedKey, n, 0)
|
||||
@@ -869,12 +869,12 @@ func TestListDead(t *testing.T) {
|
||||
f3 := time.Now().Add(-4 * time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
want []base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
want []base.Z
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: f1.Unix()},
|
||||
{Message: m2, Score: f2.Unix()},
|
||||
@@ -890,7 +890,7 @@ func TestListDead(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: f1.Unix()},
|
||||
{Message: m2, Score: f2.Unix()},
|
||||
@@ -905,7 +905,7 @@ func TestListDead(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -915,9 +915,9 @@ func TestListDead(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got, err := r.ListDead(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
got, err := r.ListArchived(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
@@ -939,7 +939,7 @@ func TestListDeadPagination(t *testing.T) {
|
||||
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
|
||||
entries = append(entries, base.Z{Message: msg, Score: int64(i)})
|
||||
}
|
||||
h.SeedDeadQueue(t, r.client, entries, "default")
|
||||
h.SeedArchivedQueue(t, r.client, entries, "default")
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
@@ -958,7 +958,7 @@ func TestListDeadPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListDead(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
got, err := r.ListArchived(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})",
|
||||
tc.size, tc.page)
|
||||
if err != nil {
|
||||
@@ -1005,16 +1005,16 @@ func TestRunDeadTask(t *testing.T) {
|
||||
s2 := time.Now().Add(-time.Hour).Unix()
|
||||
|
||||
tests := []struct {
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
score int64
|
||||
id uuid.UUID
|
||||
want error // expected return value from calling RunDeadTask
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
score int64
|
||||
id uuid.UUID
|
||||
want error // expected return value from calling RunDeadTask
|
||||
wantArchived map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: s1},
|
||||
{Message: t2, Score: s2},
|
||||
@@ -1024,7 +1024,7 @@ func TestRunDeadTask(t *testing.T) {
|
||||
score: s2,
|
||||
id: t2.ID,
|
||||
want: nil,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
@@ -1032,7 +1032,7 @@ func TestRunDeadTask(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: s1},
|
||||
{Message: t2, Score: s2},
|
||||
@@ -1042,7 +1042,7 @@ func TestRunDeadTask(t *testing.T) {
|
||||
score: 123,
|
||||
id: t2.ID,
|
||||
want: ErrTaskNotFound,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2},
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
@@ -1050,7 +1050,7 @@ func TestRunDeadTask(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: s1},
|
||||
{Message: t2, Score: s2},
|
||||
@@ -1063,7 +1063,7 @@ func TestRunDeadTask(t *testing.T) {
|
||||
score: s1,
|
||||
id: t3.ID,
|
||||
want: nil,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2},
|
||||
"critical": {},
|
||||
},
|
||||
@@ -1076,9 +1076,9 @@ func TestRunDeadTask(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got := r.RunDeadTask(tc.qname, tc.id, tc.score)
|
||||
got := r.RunArchivedTask(tc.qname, tc.id, tc.score)
|
||||
if got != tc.want {
|
||||
t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
|
||||
continue
|
||||
@@ -1091,10 +1091,10 @@ func TestRunDeadTask(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadMessages(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.DeadKey(qname), diff)
|
||||
t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1532,16 +1532,16 @@ func TestRunAllDeadTasks(t *testing.T) {
|
||||
t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom")
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
desc string
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantArchived map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
desc: "with tasks in dead queue",
|
||||
dead: map[string][]base.Z{
|
||||
desc: "with tasks in archived queue",
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: time.Now().Add(-time.Minute).Unix()},
|
||||
{Message: t2, Score: time.Now().Add(-time.Minute).Unix()},
|
||||
@@ -1553,13 +1553,13 @@ func TestRunAllDeadTasks(t *testing.T) {
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with empty dead queue",
|
||||
dead: map[string][]base.Z{
|
||||
desc: "with empty archived queue",
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1567,13 +1567,13 @@ func TestRunAllDeadTasks(t *testing.T) {
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with custom queues",
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: time.Now().Add(-time.Minute).Unix()},
|
||||
{Message: t2, Score: time.Now().Add(-time.Minute).Unix()},
|
||||
@@ -1590,7 +1590,7 @@ func TestRunAllDeadTasks(t *testing.T) {
|
||||
"default": {},
|
||||
"custom": {t4, t5},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -1599,9 +1599,9 @@ func TestRunAllDeadTasks(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got, err := r.RunAllDeadTasks(tc.qname)
|
||||
got, err := r.RunAllArchivedTasks(tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil",
|
||||
tc.desc, tc.qname, got, err, tc.want)
|
||||
@@ -1619,10 +1619,10 @@ func TestRunAllDeadTasks(t *testing.T) {
|
||||
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadMessages(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.DeadKey(qname), diff)
|
||||
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1641,14 +1641,14 @@ func TestKillRetryTask(t *testing.T) {
|
||||
t4 := time.Now().Add(3 * time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
retry map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
id uuid.UUID
|
||||
score int64
|
||||
want error
|
||||
wantRetry map[string][]base.Z
|
||||
wantDead map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
id uuid.UUID
|
||||
score int64
|
||||
want error
|
||||
wantRetry map[string][]base.Z
|
||||
wantArchived map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
retry: map[string][]base.Z{
|
||||
@@ -1657,7 +1657,7 @@ func TestKillRetryTask(t *testing.T) {
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1667,7 +1667,7 @@ func TestKillRetryTask(t *testing.T) {
|
||||
wantRetry: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: time.Now().Unix()}},
|
||||
},
|
||||
},
|
||||
@@ -1675,7 +1675,7 @@ func TestKillRetryTask(t *testing.T) {
|
||||
retry: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: t1.Unix()}},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1685,7 +1685,7 @@ func TestKillRetryTask(t *testing.T) {
|
||||
wantRetry: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: t1.Unix()}},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
},
|
||||
@@ -1700,7 +1700,7 @@ func TestKillRetryTask(t *testing.T) {
|
||||
{Message: m4, Score: t4.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -1717,7 +1717,7 @@ func TestKillRetryTask(t *testing.T) {
|
||||
{Message: m4, Score: t4.Unix()},
|
||||
},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {{Message: m3, Score: time.Now().Unix()}},
|
||||
},
|
||||
@@ -1727,9 +1727,9 @@ func TestKillRetryTask(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got := r.KillRetryTask(tc.qname, tc.id, tc.score)
|
||||
got := r.ArchiveRetryTask(tc.qname, tc.id, tc.score)
|
||||
if got != tc.want {
|
||||
t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v",
|
||||
tc.qname, tc.id, tc.score, got, tc.want)
|
||||
@@ -1744,11 +1744,11 @@ func TestKillRetryTask(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedEntries(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
|
||||
base.DeadKey(qname), diff)
|
||||
base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1768,13 +1768,13 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
scheduled map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
id uuid.UUID
|
||||
score int64
|
||||
want error
|
||||
wantScheduled map[string][]base.Z
|
||||
wantDead map[string][]base.Z
|
||||
wantArchived map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
@@ -1783,7 +1783,7 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1793,7 +1793,7 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: time.Now().Unix()}},
|
||||
},
|
||||
},
|
||||
@@ -1801,7 +1801,7 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: t1.Unix()}},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1811,7 +1811,7 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: t1.Unix()}},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
},
|
||||
@@ -1826,7 +1826,7 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
{Message: m4, Score: t4.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -1843,7 +1843,7 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
{Message: m4, Score: t4.Unix()},
|
||||
},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {{Message: m3, Score: time.Now().Unix()}},
|
||||
},
|
||||
@@ -1853,9 +1853,9 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got := r.KillScheduledTask(tc.qname, tc.id, tc.score)
|
||||
got := r.ArchiveScheduledTask(tc.qname, tc.id, tc.score)
|
||||
if got != tc.want {
|
||||
t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v",
|
||||
tc.qname, tc.id, tc.score, got, tc.want)
|
||||
@@ -1870,11 +1870,11 @@ func TestKillScheduledTask(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedEntries(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
|
||||
base.DeadKey(qname), diff)
|
||||
base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1893,12 +1893,12 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
t4 := time.Now().Add(3 * time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
retry map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantRetry map[string][]base.Z
|
||||
wantDead map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantRetry map[string][]base.Z
|
||||
wantArchived map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
retry: map[string][]base.Z{
|
||||
@@ -1907,7 +1907,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1915,7 +1915,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
wantRetry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: time.Now().Unix()},
|
||||
{Message: m2, Score: time.Now().Unix()},
|
||||
@@ -1926,7 +1926,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
retry: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: t1.Unix()}},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -1934,7 +1934,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
wantRetry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: time.Now().Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -1945,7 +1945,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
retry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -1956,7 +1956,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
wantRetry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -1974,7 +1974,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
{Message: m4, Score: t4.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -1987,7 +1987,7 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
},
|
||||
"custom": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {
|
||||
{Message: m3, Score: time.Now().Unix()},
|
||||
@@ -2000,9 +2000,9 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got, err := r.KillAllRetryTasks(tc.qname)
|
||||
got, err := r.ArchiveAllRetryTasks(tc.qname)
|
||||
if got != tc.want || err != nil {
|
||||
t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil",
|
||||
tc.qname, got, err, tc.want)
|
||||
@@ -2017,11 +2017,11 @@ func TestKillAllRetryTasks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedEntries(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
|
||||
base.DeadKey(qname), diff)
|
||||
base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2041,11 +2041,11 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
scheduled map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantScheduled map[string][]base.Z
|
||||
wantDead map[string][]base.Z
|
||||
wantArchived map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
@@ -2054,7 +2054,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -2062,7 +2062,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: time.Now().Unix()},
|
||||
{Message: m2, Score: time.Now().Unix()},
|
||||
@@ -2073,7 +2073,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {{Message: m1, Score: t1.Unix()}},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: t2.Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
@@ -2081,7 +2081,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: time.Now().Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -2092,7 +2092,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -2103,7 +2103,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -2121,7 +2121,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
{Message: m4, Score: t4.Unix()},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -2134,7 +2134,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
},
|
||||
"custom": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {
|
||||
{Message: m3, Score: time.Now().Unix()},
|
||||
@@ -2147,9 +2147,9 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got, err := r.KillAllScheduledTasks(tc.qname)
|
||||
got, err := r.ArchiveAllScheduledTasks(tc.qname)
|
||||
if got != tc.want || err != nil {
|
||||
t.Errorf("(*RDB).KillAllScheduledTasks(%q) = %v, %v; want %v, nil",
|
||||
tc.qname, got, err, tc.want)
|
||||
@@ -2164,11 +2164,11 @@ func TestKillAllScheduledTasks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedEntries(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
|
||||
base.DeadKey(qname), diff)
|
||||
base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2185,15 +2185,15 @@ func TestDeleteDeadTask(t *testing.T) {
|
||||
t3 := time.Now().Add(-time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
id uuid.UUID
|
||||
score int64
|
||||
want error
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
id uuid.UUID
|
||||
score int64
|
||||
want error
|
||||
wantArchived map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -2203,12 +2203,12 @@ func TestDeleteDeadTask(t *testing.T) {
|
||||
id: m1.ID,
|
||||
score: t1.Unix(),
|
||||
want: nil,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {m2},
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -2221,13 +2221,13 @@ func TestDeleteDeadTask(t *testing.T) {
|
||||
id: m3.ID,
|
||||
score: t3.Unix(),
|
||||
want: nil,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {},
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
{Message: m2, Score: t2.Unix()},
|
||||
@@ -2237,19 +2237,19 @@ func TestDeleteDeadTask(t *testing.T) {
|
||||
id: m1.ID,
|
||||
score: t2.Unix(), // id and score mismatch
|
||||
want: ErrTaskNotFound,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
id: m1.ID,
|
||||
score: t1.Unix(),
|
||||
want: ErrTaskNotFound,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
@@ -2257,18 +2257,18 @@ func TestDeleteDeadTask(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got := r.DeleteDeadTask(tc.qname, tc.id, tc.score)
|
||||
got := r.DeleteArchivedTask(tc.qname, tc.id, tc.score)
|
||||
if got != tc.want {
|
||||
t.Errorf("r.DeleteDeadTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadMessages(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff)
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2452,13 +2452,13 @@ func TestDeleteAllDeadTasks(t *testing.T) {
|
||||
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
|
||||
|
||||
tests := []struct {
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantArchived map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: time.Now().Unix()},
|
||||
{Message: m2, Score: time.Now().Unix()},
|
||||
@@ -2469,18 +2469,18 @@ func TestDeleteAllDeadTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 2,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {m3},
|
||||
},
|
||||
},
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
want: 0,
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
@@ -2488,19 +2488,19 @@ func TestDeleteAllDeadTasks(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got, err := r.DeleteAllDeadTasks(tc.qname)
|
||||
got, err := r.DeleteAllArchivedTasks(tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err)
|
||||
}
|
||||
if got != tc.want {
|
||||
t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
|
||||
}
|
||||
for qname, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadMessages(t, r.client, qname)
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotDead := h.GetArchivedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff)
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2643,7 +2643,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string // queue to remove
|
||||
force bool
|
||||
}{
|
||||
@@ -2664,7 +2664,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -2688,7 +2688,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -2703,7 +2703,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
err := r.RemoveQueue(tc.qname, tc.force)
|
||||
if err != nil {
|
||||
@@ -2721,7 +2721,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
base.DeadlinesKey(tc.qname),
|
||||
base.ScheduledKey(tc.qname),
|
||||
base.RetryKey(tc.qname),
|
||||
base.DeadKey(tc.qname),
|
||||
base.ArchivedKey(tc.qname),
|
||||
}
|
||||
for _, key := range keys {
|
||||
if r.client.Exists(key).Val() != 0 {
|
||||
@@ -2745,7 +2745,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
qname string // queue to remove
|
||||
force bool
|
||||
}{
|
||||
@@ -2767,7 +2767,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -2792,7 +2792,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -2817,7 +2817,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -2833,7 +2833,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
got := r.RemoveQueue(tc.qname, tc.force)
|
||||
if got == nil {
|
||||
@@ -2866,10 +2866,10 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.RetryKey(qname), diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.dead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, qname)
|
||||
for qname, want := range tc.archived {
|
||||
gotDead := h.GetArchivedEntries(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.DeadKey(qname), diff)
|
||||
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3080,9 +3080,9 @@ func TestSchedulerEnqueueEvents(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
entryID: "entry123",
|
||||
events: []*base.SchedulerEnqueueEvent{
|
||||
{TaskID: "task123", EnqueuedAt: oneDayAgo},
|
||||
{TaskID: "task789", EnqueuedAt: oneHourAgo},
|
||||
events: []*base.SchedulerEnqueueEvent{
|
||||
{TaskID: "task123", EnqueuedAt: oneDayAgo},
|
||||
{TaskID: "task789", EnqueuedAt: oneHourAgo},
|
||||
{TaskID: "task456", EnqueuedAt: fiveHoursAgo},
|
||||
},
|
||||
// Recent events first
|
||||
|
@@ -381,22 +381,22 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
||||
}
|
||||
|
||||
const (
|
||||
maxDeadTasks = 10000
|
||||
deadExpirationInDays = 90
|
||||
maxArchiveSize = 10000 // maximum number of tasks in archive
|
||||
archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently
|
||||
)
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:active
|
||||
// KEYS[2] -> asynq:{<qname>}:deadlines
|
||||
// KEYS[3] -> asynq:{<qname>}:dead
|
||||
// KEYS[3] -> asynq:{<qname>}:archived
|
||||
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
||||
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
|
||||
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
|
||||
// ARGV[2] -> base.TaskMessage value to add to Dead queue
|
||||
// ARGV[1] -> base.TaskMessage value to remove
|
||||
// ARGV[2] -> base.TaskMessage value to add
|
||||
// ARGV[3] -> died_at UNIX timestamp
|
||||
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||
// ARGV[5] -> max number of tasks in archive (e.g., 100)
|
||||
// ARGV[6] -> stats expiration timestamp
|
||||
var killCmd = redis.NewScript(`
|
||||
var archiveCmd = redis.NewScript(`
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
@@ -416,10 +416,9 @@ if tonumber(m) == 1 then
|
||||
end
|
||||
return redis.status_reply("OK")`)
|
||||
|
||||
// Kill sends the task to "dead" queue from active queue, assigning
|
||||
// the error message to the task.
|
||||
// It also trims the set by timestamp and set size.
|
||||
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
// Archive sends the given task to archive, attaching the error message to the task.
|
||||
// It also trims the archive by timestamp and set size.
|
||||
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
||||
msgToRemove, err := base.EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -431,13 +430,13 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
||||
processedKey := base.ProcessedKey(msg.Queue, now)
|
||||
failedKey := base.FailedKey(msg.Queue, now)
|
||||
expireAt := now.Add(statsTTL)
|
||||
return killCmd.Run(r.client,
|
||||
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey},
|
||||
msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
|
||||
return archiveCmd.Run(r.client,
|
||||
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey},
|
||||
msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err()
|
||||
}
|
||||
|
||||
// CheckAndEnqueue checks for scheduled/retry tasks for the given queues
|
||||
|
@@ -1008,7 +1008,7 @@ func TestRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestKill(t *testing.T) {
|
||||
func TestArchive(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
now := time.Now()
|
||||
@@ -1058,11 +1058,11 @@ func TestKill(t *testing.T) {
|
||||
tests := []struct {
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
deadlines map[string][]base.Z
|
||||
dead map[string][]base.Z
|
||||
target *base.TaskMessage // task to kill
|
||||
archived map[string][]base.Z
|
||||
target *base.TaskMessage // task to archive
|
||||
wantActive map[string][]*base.TaskMessage
|
||||
wantDeadlines map[string][]base.Z
|
||||
wantDead map[string][]base.Z
|
||||
wantArchived map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
inProgress: map[string][]*base.TaskMessage{
|
||||
@@ -1074,7 +1074,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t2, Score: t2Deadline},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t3, Score: now.Add(-time.Hour).Unix()},
|
||||
},
|
||||
@@ -1086,7 +1086,7 @@ func TestKill(t *testing.T) {
|
||||
wantDeadlines: map[string][]base.Z{
|
||||
"default": {{Message: t2, Score: t2Deadline}},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()},
|
||||
{Message: t3, Score: now.Add(-time.Hour).Unix()},
|
||||
@@ -1104,7 +1104,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t3, Score: t3Deadline},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
target: t1,
|
||||
@@ -1117,7 +1117,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t3, Score: t3Deadline},
|
||||
},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()},
|
||||
},
|
||||
@@ -1136,7 +1136,7 @@ func TestKill(t *testing.T) {
|
||||
{Message: t4, Score: t4Deadline},
|
||||
},
|
||||
},
|
||||
dead: map[string][]base.Z{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@@ -1149,7 +1149,7 @@ func TestKill(t *testing.T) {
|
||||
"default": {{Message: t1, Score: t1Deadline}},
|
||||
"custom": {},
|
||||
},
|
||||
wantDead: map[string][]base.Z{
|
||||
wantArchived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {
|
||||
{Message: h.TaskMessageWithError(*t4, errMsg), Score: now.Unix()},
|
||||
@@ -1162,11 +1162,11 @@ func TestKill(t *testing.T) {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllDeadlines(t, r.client, tc.deadlines)
|
||||
h.SeedAllDeadQueues(t, r.client, tc.dead)
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
|
||||
err := r.Kill(tc.target, errMsg)
|
||||
err := r.Archive(tc.target, errMsg)
|
||||
if err != nil {
|
||||
t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err)
|
||||
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1179,13 +1179,13 @@ func TestKill(t *testing.T) {
|
||||
for queue, want := range tc.wantDeadlines {
|
||||
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadlinesKey(queue), diff)
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff)
|
||||
}
|
||||
}
|
||||
for queue, want := range tc.wantDead {
|
||||
gotDead := h.GetDeadEntries(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff)
|
||||
for queue, want := range tc.wantArchived {
|
||||
gotArchived := h.GetArchivedEntries(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -117,13 +117,13 @@ func (tb *TestBroker) Retry(msg *base.TaskMessage, processAt time.Time, errMsg s
|
||||
return tb.real.Retry(msg, processAt, errMsg)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
func (tb *TestBroker) Archive(msg *base.TaskMessage, errMsg string) error {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
if tb.sleeping {
|
||||
return errRedisDown
|
||||
}
|
||||
return tb.real.Kill(msg, errMsg)
|
||||
return tb.real.Archive(msg, errMsg)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error {
|
||||
|
Reference in New Issue
Block a user