mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-16 11:32:26 +08:00
Unexport redis key name constants from rdb package
This commit is contained in:
@@ -14,13 +14,13 @@ import (
|
||||
|
||||
// Redis keys
|
||||
const (
|
||||
allQueues = "asynq:queues" // SET
|
||||
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||
DefaultQueue = queuePrefix + "default" // LIST
|
||||
Scheduled = "asynq:scheduled" // ZSET
|
||||
Retry = "asynq:retry" // ZSET
|
||||
Dead = "asynq:dead" // ZSET
|
||||
InProgress = "asynq:in_progress" // SET
|
||||
allQueues = "asynq:queues" // SET
|
||||
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||
defaultQ = queuePrefix + "default" // LIST
|
||||
scheduledQ = "asynq:scheduled" // ZSET
|
||||
retryQ = "asynq:retry" // ZSET
|
||||
deadQ = "asynq:dead" // ZSET
|
||||
inProgressQ = "asynq:in_progress" // LIST
|
||||
)
|
||||
|
||||
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
|
||||
@@ -143,19 +143,19 @@ func (r *RDB) Enqueue(msg *TaskMessage) error {
|
||||
// once a task is available, it adds the task to "in progress" list
|
||||
// and returns the task.
|
||||
func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) {
|
||||
data, err := r.client.BRPopLPush(DefaultQueue, InProgress, timeout).Result()
|
||||
data, err := r.client.BRPopLPush(defaultQ, inProgressQ, timeout).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, ErrDequeueTimeout
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", DefaultQueue, InProgress, timeout, err)
|
||||
return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", defaultQ, inProgressQ, timeout, err)
|
||||
}
|
||||
var msg TaskMessage
|
||||
err = json.Unmarshal([]byte(data), &msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err)
|
||||
}
|
||||
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, DefaultQueue)
|
||||
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, defaultQ)
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
@@ -166,21 +166,21 @@ func (r *RDB) Done(msg *TaskMessage) error {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
// NOTE: count ZERO means "remove all elements equal to val"
|
||||
err = r.client.LRem(InProgress, 0, string(bytes)).Err()
|
||||
err = r.client.LRem(inProgressQ, 0, string(bytes)).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command `LREM %s 0 %s` failed: %v", InProgress, string(bytes), err)
|
||||
return fmt.Errorf("command `LREM %s 0 %s` failed: %v", inProgressQ, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Schedule adds the task to the backlog queue to be processed in the future.
|
||||
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
||||
return r.schedule(Scheduled, processAt, msg)
|
||||
return r.schedule(scheduledQ, processAt, msg)
|
||||
}
|
||||
|
||||
// RetryLater adds the task to the backlog queue to be retried in the future.
|
||||
func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error {
|
||||
return r.schedule(Retry, processAt, msg)
|
||||
return r.schedule(retryQ, processAt, msg)
|
||||
}
|
||||
|
||||
// schedule adds the task to the zset to be processd at the specified time.
|
||||
@@ -208,10 +208,10 @@ func (r *RDB) Kill(msg *TaskMessage) error {
|
||||
}
|
||||
now := time.Now()
|
||||
pipe := r.client.Pipeline()
|
||||
pipe.ZAdd(Dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
||||
pipe.ZAdd(deadQ, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
pipe.ZRemRangeByScore(Dead, "-inf", strconv.Itoa(int(limit)))
|
||||
pipe.ZRemRangeByRank(Dead, 0, -maxDeadTask) // trim the set to 100
|
||||
pipe.ZRemRangeByScore(deadQ, "-inf", strconv.Itoa(int(limit)))
|
||||
pipe.ZRemRangeByRank(deadQ, 0, -maxDeadTask) // trim the set to 100
|
||||
_, err = pipe.Exec()
|
||||
return err
|
||||
}
|
||||
@@ -225,14 +225,14 @@ func (r *RDB) RestoreUnfinished() error {
|
||||
end
|
||||
return len
|
||||
`)
|
||||
_, err := script.Run(r.client, []string{InProgress, DefaultQueue}).Result()
|
||||
_, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
|
||||
// have to be processed.
|
||||
func (r *RDB) CheckAndEnqueue() error {
|
||||
delayed := []string{Scheduled, Retry}
|
||||
delayed := []string{scheduledQ, retryQ}
|
||||
for _, zset := range delayed {
|
||||
if err := r.forward(zset); err != nil {
|
||||
return err
|
||||
@@ -254,7 +254,7 @@ func (r *RDB) forward(from string) error {
|
||||
return msgs
|
||||
`)
|
||||
now := float64(time.Now().Unix())
|
||||
res, err := script.Run(r.client, []string{from, allQueues, DefaultQueue}, now).Result()
|
||||
res, err := script.Run(r.client, []string{from, allQueues, defaultQ}, now).Result()
|
||||
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from)
|
||||
return err
|
||||
}
|
||||
@@ -262,11 +262,11 @@ func (r *RDB) forward(from string) error {
|
||||
// CurrentStats returns a current state of the queues.
|
||||
func (r *RDB) CurrentStats() (*Stats, error) {
|
||||
pipe := r.client.Pipeline()
|
||||
qlen := pipe.LLen(DefaultQueue)
|
||||
plen := pipe.LLen(InProgress)
|
||||
slen := pipe.ZCard(Scheduled)
|
||||
rlen := pipe.ZCard(Retry)
|
||||
dlen := pipe.ZCard(Dead)
|
||||
qlen := pipe.LLen(defaultQ)
|
||||
plen := pipe.LLen(inProgressQ)
|
||||
slen := pipe.ZCard(scheduledQ)
|
||||
rlen := pipe.ZCard(retryQ)
|
||||
dlen := pipe.ZCard(deadQ)
|
||||
_, err := pipe.Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -283,7 +283,7 @@ func (r *RDB) CurrentStats() (*Stats, error) {
|
||||
|
||||
// ListEnqueued returns all enqueued tasks that are ready to be processed.
|
||||
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
|
||||
data, err := r.client.LRange(DefaultQueue, 0, -1).Result()
|
||||
data, err := r.client.LRange(defaultQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -305,7 +305,7 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
|
||||
|
||||
// ListInProgress returns all tasks that are currently being processed.
|
||||
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
|
||||
data, err := r.client.LRange(DefaultQueue, 0, -1).Result()
|
||||
data, err := r.client.LRange(defaultQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -328,7 +328,7 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
|
||||
// ListScheduled returns all tasks that are scheduled to be processed
|
||||
// in the future.
|
||||
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result()
|
||||
data, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -357,7 +357,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
|
||||
// ListRetry returns all tasks that have failed before and willl be retried
|
||||
// in the future.
|
||||
func (r *RDB) ListRetry() ([]*RetryTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(Retry, 0, -1).Result()
|
||||
data, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -388,7 +388,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
|
||||
|
||||
// ListDead returns all tasks that have exhausted its retry limit.
|
||||
func (r *RDB) ListDead() ([]*DeadTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(Dead, 0, -1).Result()
|
||||
data, err := r.client.ZRangeWithScores(deadQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user