2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-05 05:52:00 +08:00

Create base internal package

This commit is contained in:
Ken Hibino
2019-12-22 07:15:45 -08:00
parent 5de314400d
commit 3fd248615b
12 changed files with 390 additions and 387 deletions

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/rs/xid"
)
@@ -73,11 +74,11 @@ type DeadTask struct {
// CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats() (*Stats, error) {
pipe := r.client.Pipeline()
qlen := pipe.LLen(defaultQ)
plen := pipe.LLen(inProgressQ)
slen := pipe.ZCard(scheduledQ)
rlen := pipe.ZCard(retryQ)
dlen := pipe.ZCard(deadQ)
qlen := pipe.LLen(base.DefaultQueue)
plen := pipe.LLen(base.InProgressQueue)
slen := pipe.ZCard(base.ScheduledQueue)
rlen := pipe.ZCard(base.RetryQueue)
dlen := pipe.ZCard(base.DeadQueue)
_, err := pipe.Exec()
if err != nil {
return nil, err
@@ -94,13 +95,13 @@ func (r *RDB) CurrentStats() (*Stats, error) {
// ListEnqueued returns all enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
data, err := r.client.LRange(defaultQ, 0, -1).Result()
data, err := r.client.LRange(base.DefaultQueue, 0, -1).Result()
if err != nil {
return nil, err
}
var tasks []*EnqueuedTask
for _, s := range data {
var msg TaskMessage
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
// continue // bad data, ignore and continue
@@ -117,13 +118,13 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
data, err := r.client.LRange(inProgressQ, 0, -1).Result()
data, err := r.client.LRange(base.InProgressQueue, 0, -1).Result()
if err != nil {
return nil, err
}
var tasks []*InProgressTask
for _, s := range data {
var msg TaskMessage
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
@@ -140,7 +141,7 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
// ListScheduled returns all tasks that are scheduled to be processed
// in the future.
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result()
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Result()
if err != nil {
return nil, err
}
@@ -150,7 +151,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
if !ok {
continue // bad data, ignore and continue
}
var msg TaskMessage
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
@@ -170,7 +171,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry() ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result()
data, err := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Result()
if err != nil {
return nil, err
}
@@ -180,7 +181,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
if !ok {
continue // bad data, ignore and continue
}
var msg TaskMessage
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
@@ -202,7 +203,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead() ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(deadQ, 0, -1).Result()
data, err := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Result()
if err != nil {
return nil, err
}
@@ -212,7 +213,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
if !ok {
continue // bad data, ignore and continue
}
var msg TaskMessage
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
@@ -234,7 +235,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
// and enqueues it for processing. If a task that matches the id and score
// does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error {
n, err := r.removeAndEnqueue(deadQ, id.String(), float64(score))
n, err := r.removeAndEnqueue(base.DeadQueue, id.String(), float64(score))
if err != nil {
return err
}
@@ -248,7 +249,7 @@ func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error {
// and enqueues it for processing. If a task that matches the id and score
// does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error {
n, err := r.removeAndEnqueue(retryQ, id.String(), float64(score))
n, err := r.removeAndEnqueue(base.RetryQueue, id.String(), float64(score))
if err != nil {
return err
}
@@ -262,7 +263,7 @@ func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error {
// and enqueues it for processing. If a task that matches the id and score does not
// exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error {
n, err := r.removeAndEnqueue(scheduledQ, id.String(), float64(score))
n, err := r.removeAndEnqueue(base.ScheduledQueue, id.String(), float64(score))
if err != nil {
return err
}
@@ -275,19 +276,19 @@ func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error {
// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue
// and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllScheduledTasks() (int64, error) {
return r.removeAndEnqueueAll(scheduledQ)
return r.removeAndEnqueueAll(base.ScheduledQueue)
}
// EnqueueAllRetryTasks enqueues all tasks from retry queue
// and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllRetryTasks() (int64, error) {
return r.removeAndEnqueueAll(retryQ)
return r.removeAndEnqueueAll(base.RetryQueue)
}
// EnqueueAllDeadTasks enqueues all tasks from dead queue
// and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllDeadTasks() (int64, error) {
return r.removeAndEnqueueAll(deadQ)
return r.removeAndEnqueueAll(base.DeadQueue)
}
func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
@@ -303,7 +304,7 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
end
return 0
`)
res, err := script.Run(r.client, []string{zset, defaultQ}, score, id).Result()
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}, score, id).Result()
if err != nil {
return 0, err
}
@@ -323,7 +324,7 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
end
return table.getn(msgs)
`)
res, err := script.Run(r.client, []string{zset, defaultQ}).Result()
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}).Result()
if err != nil {
return 0, err
}
@@ -338,21 +339,21 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
// and deletes it. If a task that matches the id and score does not exist,
// it returns ErrTaskNotFound.
func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error {
return r.deleteTask(deadQ, id.String(), float64(score))
return r.deleteTask(base.DeadQueue, id.String(), float64(score))
}
// DeleteRetryTask finds a task that matches the given id and score from retry queue
// and deletes it. If a task that matches the id and score does not exist,
// it returns ErrTaskNotFound.
func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error {
return r.deleteTask(retryQ, id.String(), float64(score))
return r.deleteTask(base.RetryQueue, id.String(), float64(score))
}
// DeleteScheduledTask finds a task that matches the given id and score from
// scheduled queue and deletes it. If a task that matches the id and score
//does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error {
return r.deleteTask(scheduledQ, id.String(), float64(score))
return r.deleteTask(base.ScheduledQueue, id.String(), float64(score))
}
func (r *RDB) deleteTask(zset, id string, score float64) error {
@@ -383,15 +384,15 @@ func (r *RDB) deleteTask(zset, id string, score float64) error {
// DeleteAllDeadTasks deletes all tasks from the dead queue.
func (r *RDB) DeleteAllDeadTasks() error {
return r.client.Del(deadQ).Err()
return r.client.Del(base.DeadQueue).Err()
}
// DeleteAllRetryTasks deletes all tasks from the dead queue.
func (r *RDB) DeleteAllRetryTasks() error {
return r.client.Del(retryQ).Err()
return r.client.Del(base.RetryQueue).Err()
}
// DeleteAllScheduledTasks deletes all tasks from the dead queue.
func (r *RDB) DeleteAllScheduledTasks() error {
return r.client.Del(scheduledQ).Err()
return r.client.Del(base.ScheduledQueue).Err()
}