2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-08-19 15:08:55 +08:00

Upgrade go-redis/redis to version 8

This commit is contained in:
Jason White
2021-09-02 08:56:02 -04:00
committed by GitHub
parent 05534c6f24
commit b3ef9e91a9
24 changed files with 331 additions and 304 deletions

View File

@@ -5,11 +5,12 @@
package rdb
import (
"context"
"fmt"
"strings"
"time"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
@@ -18,7 +19,7 @@ import (
// AllQueues returns a list of all queue names.
func (r *RDB) AllQueues() ([]string, error) {
return r.client.SMembers(base.AllQueues).Result()
return r.client.SMembers(context.Background(), base.AllQueues).Result()
}
// Stats represents a state of queues at a certain time.
@@ -103,7 +104,7 @@ return res`)
// CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats(qname string) (*Stats, error) {
var op errors.Op = "rdb.CurrentStats"
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
@@ -111,7 +112,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{
res, err := currentStatsCmd.Run(context.Background(), r.client, []string{
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
@@ -242,7 +243,7 @@ func (r *RDB) memoryUsage(qname string) (int64, error) {
base.TaskKeyPrefix(qname),
sampleSize,
}
res, err := memoryUsageCmd.Run(r.client, keys, argv...).Result()
res, err := memoryUsageCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
@@ -270,7 +271,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
if n < 1 {
return nil, errors.E(op, errors.FailedPrecondition, "the number of days must be positive")
}
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
@@ -287,7 +288,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
keys = append(keys, base.ProcessedKey(qname, ts))
keys = append(keys, base.FailedKey(qname, ts))
}
res, err := historicalStatsCmd.Run(r.client, keys).Result()
res, err := historicalStatsCmd.Run(context.Background(), r.client, keys).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
@@ -309,7 +310,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
// RedisInfo returns a map of redis info.
func (r *RDB) RedisInfo() (map[string]string, error) {
res, err := r.client.Info().Result()
res, err := r.client.Info(context.Background()).Result()
if err != nil {
return nil, err
}
@@ -318,7 +319,7 @@ func (r *RDB) RedisInfo() (map[string]string, error) {
// RedisClusterInfo returns a map of redis cluster info.
func (r *RDB) RedisClusterInfo() (map[string]string, error) {
res, err := r.client.ClusterInfo().Result()
res, err := r.client.ClusterInfo(context.Background()).Result()
if err != nil {
return nil, err
}
@@ -347,7 +348,7 @@ func reverse(x []string) {
// checkQueueExists verifies whether the queue exists.
// It returns QueueNotFoundError if queue doesn't exist.
func (r *RDB) checkQueueExists(qname string) error {
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
if err != nil {
return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
@@ -396,7 +397,7 @@ func (r *RDB) GetTaskInfo(qname string, id uuid.UUID) (*base.TaskInfo, error) {
time.Now().Unix(),
base.QueueKeyPrefix(qname),
}
res, err := getTaskInfoCmd.Run(r.client, keys, argv...).Result()
res, err := getTaskInfoCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
if err.Error() == "NOT FOUND" {
return nil, errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()})
@@ -462,7 +463,7 @@ func (p Pagination) stop() int64 {
// ListPending returns pending tasks that are ready to be processed.
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListPending"
if !r.client.SIsMember(base.AllQueues, qname).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listMessages(base.PendingKey(qname), qname, pgn)
@@ -475,7 +476,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
// ListActive returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListActive"
if !r.client.SIsMember(base.AllQueues, qname).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listMessages(base.ActiveKey(qname), qname, pgn)
@@ -505,7 +506,7 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
res, err := listMessagesCmd.Run(r.client,
res, err := listMessagesCmd.Run(context.Background(), r.client,
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, errors.E(errors.Unknown, err)
@@ -531,7 +532,7 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
// to be processed in the future.
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListScheduled"
if !r.client.SIsMember(base.AllQueues, qname).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
@@ -545,7 +546,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
// and willl be retried in the future.
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListRetry"
if !r.client.SIsMember(base.AllQueues, qname).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn)
@@ -558,7 +559,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListArchived"
if !r.client.SIsMember(base.AllQueues, qname).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
zs, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
@@ -589,7 +590,7 @@ return res
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) {
res, err := listZSetEntriesCmd.Run(r.client, []string{key},
res, err := listZSetEntriesCmd.Run(context.Background(), r.client, []string{key},
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, errors.E(errors.Unknown, err)
@@ -716,7 +717,7 @@ func (r *RDB) RunTask(qname string, id uuid.UUID) error {
id.String(),
base.QueueKeyPrefix(qname),
}
res, err := runTaskCmd.Run(r.client, keys, argv...).Result()
res, err := runTaskCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
@@ -769,7 +770,7 @@ func (r *RDB) runAll(zset, qname string) (int64, error) {
argv := []interface{}{
base.TaskKeyPrefix(qname),
}
res, err := runAllCmd.Run(r.client, keys, argv...).Result()
res, err := runAllCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, err
}
@@ -857,7 +858,7 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
maxArchiveSize,
base.TaskKeyPrefix(qname),
}
res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result()
res, err := archiveAllPendingCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, errors.E(op, errors.Internal, err)
}
@@ -938,7 +939,7 @@ func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error {
maxArchiveSize,
base.QueueKeyPrefix(qname),
}
res, err := archiveTaskCmd.Run(r.client, keys, argv...).Result()
res, err := archiveTaskCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
@@ -1003,7 +1004,7 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
base.TaskKeyPrefix(qname),
qname,
}
res, err := archiveAllCmd.Run(r.client, keys, argv...).Result()
res, err := archiveAllCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, err
}
@@ -1070,7 +1071,7 @@ func (r *RDB) DeleteTask(qname string, id uuid.UUID) error {
id.String(),
base.QueueKeyPrefix(qname),
}
res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result()
res, err := deleteTaskCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
@@ -1162,7 +1163,7 @@ func (r *RDB) deleteAll(key, qname string) (int64, error) {
base.TaskKeyPrefix(qname),
qname,
}
res, err := deleteAllCmd.Run(r.client, []string{key}, argv...).Result()
res, err := deleteAllCmd.Run(context.Background(), r.client, []string{key}, argv...).Result()
if err != nil {
return 0, err
}
@@ -1203,7 +1204,7 @@ func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
argv := []interface{}{
base.TaskKeyPrefix(qname),
}
res, err := deleteAllPendingCmd.Run(r.client, keys, argv...).Result()
res, err := deleteAllPendingCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, errors.E(op, errors.Unknown, err)
}
@@ -1334,7 +1335,7 @@ return 1`)
// the queue is empty.
func (r *RDB) RemoveQueue(qname string, force bool) error {
var op errors.Op = "rdb.RemoveQueue"
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
if err != nil {
return err
}
@@ -1355,7 +1356,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
base.ArchivedKey(qname),
base.DeadlinesKey(qname),
}
res, err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Result()
res, err := script.Run(context.Background(), r.client, keys, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
@@ -1365,7 +1366,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
}
switch n {
case 1:
if err := r.client.SRem(base.AllQueues, qname).Err(); err != nil {
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
return errors.E(op, errors.Unknown, err)
}
return nil
@@ -1388,7 +1389,7 @@ return keys`)
// ListServers returns the list of server info.
func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
now := time.Now()
res, err := listServerKeysCmd.Run(r.client, []string{base.AllServers}, now.Unix()).Result()
res, err := listServerKeysCmd.Run(context.Background(), r.client, []string{base.AllServers}, now.Unix()).Result()
if err != nil {
return nil, err
}
@@ -1398,7 +1399,7 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
}
var servers []*base.ServerInfo
for _, key := range keys {
data, err := r.client.Get(key).Result()
data, err := r.client.Get(context.Background(), key).Result()
if err != nil {
continue // skip bad data
}
@@ -1422,7 +1423,7 @@ return keys`)
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
var op errors.Op = "rdb.ListWorkers"
now := time.Now()
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
res, err := listWorkersCmd.Run(context.Background(), r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
@@ -1432,7 +1433,7 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
}
var workers []*base.WorkerInfo
for _, key := range keys {
data, err := r.client.HVals(key).Result()
data, err := r.client.HVals(context.Background(), key).Result()
if err != nil {
continue // skip bad data
}
@@ -1457,7 +1458,7 @@ return keys`)
// ListSchedulerEntries returns the list of scheduler entries.
func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
now := time.Now()
res, err := listSchedulerKeysCmd.Run(r.client, []string{base.AllSchedulers}, now.Unix()).Result()
res, err := listSchedulerKeysCmd.Run(context.Background(), r.client, []string{base.AllSchedulers}, now.Unix()).Result()
if err != nil {
return nil, err
}
@@ -1467,7 +1468,7 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
}
var entries []*base.SchedulerEntry
for _, key := range keys {
data, err := r.client.LRange(key, 0, -1).Result()
data, err := r.client.LRange(context.Background(), key, 0, -1).Result()
if err != nil {
continue // skip bad data
}
@@ -1485,7 +1486,7 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) {
key := base.SchedulerHistoryKey(entryID)
zs, err := r.client.ZRevRangeWithScores(key, pgn.start(), pgn.stop()).Result()
zs, err := r.client.ZRevRangeWithScores(context.Background(), key, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -1507,7 +1508,7 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas
// Pause pauses processing of tasks from the given queue.
func (r *RDB) Pause(qname string) error {
key := base.PausedKey(qname)
ok, err := r.client.SetNX(key, time.Now().Unix(), 0).Result()
ok, err := r.client.SetNX(context.Background(), key, time.Now().Unix(), 0).Result()
if err != nil {
return err
}
@@ -1520,7 +1521,7 @@ func (r *RDB) Pause(qname string) error {
// Unpause resumes processing of tasks from the given queue.
func (r *RDB) Unpause(qname string) error {
key := base.PausedKey(qname)
deleted, err := r.client.Del(key).Result()
deleted, err := r.client.Del(context.Background(), key).Result()
if err != nil {
return err
}
@@ -1533,7 +1534,7 @@ func (r *RDB) Unpause(qname string) error {
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (r *RDB) ClusterKeySlot(qname string) (int64, error) {
key := base.PendingKey(qname)
return r.client.ClusterKeySlot(key).Result()
return r.client.ClusterKeySlot(context.Background(), key).Result()
}
// ClusterNodes returns a list of nodes the given queue belongs to.
@@ -1542,7 +1543,7 @@ func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error) {
if err != nil {
return nil, err
}
clusterSlots, err := r.client.ClusterSlots().Result()
clusterSlots, err := r.client.ClusterSlots(context.Background()).Result()
if err != nil {
return nil, err
}

View File

@@ -5,6 +5,7 @@
package rdb
import (
"context"
"encoding/json"
"fmt"
"testing"
@@ -34,7 +35,7 @@ func TestAllQueues(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, qname := range tc.queues {
if err := r.client.SAdd(base.AllQueues, qname).Err(); err != nil {
if err := r.client.SAdd(context.Background(), base.AllQueues, qname).Err(); err != nil {
t.Fatalf("could not initialize all queue set: %v", err)
}
}
@@ -198,11 +199,11 @@ func TestCurrentStats(t *testing.T) {
h.SeedAllArchivedQueues(t, r.client, tc.archived)
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.client.Set(processedKey, n, 0)
r.client.Set(context.Background(), processedKey, n, 0)
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now)
r.client.Set(failedKey, n, 0)
r.client.Set(context.Background(), failedKey, n, 0)
}
got, err := r.CurrentStats(tc.qname)
@@ -247,14 +248,14 @@ func TestHistoricalStats(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
r.client.SAdd(base.AllQueues, tc.qname)
r.client.SAdd(context.Background(), base.AllQueues, tc.qname)
// populate last n days data
for i := 0; i < tc.n; i++ {
ts := now.Add(-time.Duration(i) * 24 * time.Hour)
processedKey := base.ProcessedKey(tc.qname, ts)
failedKey := base.FailedKey(tc.qname, ts)
r.client.Set(processedKey, (i+1)*1000, 0)
r.client.Set(failedKey, (i+1)*10, 0)
r.client.Set(context.Background(), processedKey, (i+1)*1000, 0)
r.client.Set(context.Background(), failedKey, (i+1)*10, 0)
}
got, err := r.HistoricalStats(tc.qname, tc.n)
@@ -3168,7 +3169,7 @@ func TestDeleteTaskWithUniqueLock(t *testing.T) {
}
}
if r.client.Exists(tc.uniqueKey).Val() != 0 {
if r.client.Exists(context.Background(), tc.uniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", tc.uniqueKey)
}
}
@@ -3401,7 +3402,7 @@ func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) {
}
for _, uniqueKey := range tc.uniqueKeys {
if r.client.Exists(uniqueKey).Val() != 0 {
if r.client.Exists(context.Background(), uniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", uniqueKey)
}
}
@@ -3702,7 +3703,7 @@ func TestRemoveQueue(t *testing.T) {
tc.qname, tc.force, err)
continue
}
if r.client.SIsMember(base.AllQueues, tc.qname).Val() {
if r.client.SIsMember(context.Background(), base.AllQueues, tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues)
}
@@ -3715,12 +3716,12 @@ func TestRemoveQueue(t *testing.T) {
base.ArchivedKey(tc.qname),
}
for _, key := range keys {
if r.client.Exists(key).Val() != 0 {
if r.client.Exists(context.Background(), key).Val() != 0 {
t.Errorf("key %q still exists", key)
}
}
if n := len(r.client.Keys(base.TaskKeyPrefix(tc.qname) + "*").Val()); n != 0 {
if n := len(r.client.Keys(context.Background(), base.TaskKeyPrefix(tc.qname) + "*").Val()); n != 0 {
t.Errorf("%d keys still exists for tasks", n)
}
}
@@ -4137,7 +4138,7 @@ func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) {
}
// Make sure the set is full.
if n := r.client.ZCard(key).Val(); n != maxEvents {
if n := r.client.ZCard(context.Background(), key).Val(); n != maxEvents {
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
}
@@ -4149,7 +4150,7 @@ func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) {
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
}
if n := r.client.ZCard(key).Val(); n != maxEvents {
if n := r.client.ZCard(context.Background(), key).Val(); n != maxEvents {
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
}
events, err := r.ListSchedulerEnqueueEvents(entryID, Pagination{Size: maxEvents})
@@ -4182,7 +4183,7 @@ func TestPause(t *testing.T) {
t.Errorf("Pause(%q) returned error: %v", tc.qname, err)
}
key := base.PausedKey(tc.qname)
if r.client.Exists(key).Val() == 0 {
if r.client.Exists(context.Background(), key).Val() == 0 {
t.Errorf("key %q does not exist", key)
}
}
@@ -4237,7 +4238,7 @@ func TestUnpause(t *testing.T) {
t.Errorf("Unpause(%q) returned error: %v", tc.qname, err)
}
key := base.PausedKey(tc.qname)
if r.client.Exists(key).Val() == 1 {
if r.client.Exists(context.Background(), key).Val() == 1 {
t.Errorf("key %q exists", key)
}
}

View File

@@ -6,10 +6,11 @@
package rdb
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/spf13/cast"
@@ -39,11 +40,11 @@ func (r *RDB) Client() redis.UniversalClient {
// Ping checks the connection with redis server.
func (r *RDB) Ping() error {
return r.client.Ping().Err()
return r.client.Ping(context.Background()).Err()
}
func (r *RDB) runScript(op errors.Op, script *redis.Script, keys []string, args ...interface{}) error {
if err := script.Run(r.client, keys, args...).Err(); err != nil {
if err := script.Run(context.Background(), r.client, keys, args...).Err(); err != nil {
return errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
}
return nil
@@ -79,7 +80,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
if err := r.client.SAdd(context.Background(), base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
@@ -133,7 +134,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
if err != nil {
return errors.E(op, errors.Internal, "cannot encode task message: %v", err)
}
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
if err := r.client.SAdd(context.Background(), base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
@@ -148,7 +149,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
msg.Timeout,
msg.Deadline,
}
res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result()
res, err := enqueueUniqueCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
@@ -223,7 +224,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
time.Now().Unix(),
base.TaskKeyPrefix(qname),
}
res, err := dequeueCmd.Run(r.client, keys, argv...).Result()
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err == redis.Nil {
continue
} else if err != nil {
@@ -378,7 +379,7 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
if err := r.client.SAdd(context.Background(), base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
@@ -427,7 +428,7 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
if err != nil {
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err))
}
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
if err := r.client.SAdd(context.Background(), base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
keys := []string{
@@ -443,7 +444,7 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
msg.Timeout,
msg.Deadline,
}
res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result()
res, err := scheduleUniqueCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
@@ -625,7 +626,7 @@ return table.getn(ids)`)
// from the src zset to the dst list. It returns the number of tasks moved.
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
now := float64(time.Now().Unix())
res, err := forwardCmd.Run(r.client, []string{src, dst}, now, taskKeyPrefix).Result()
res, err := forwardCmd.Run(context.Background(), r.client, []string{src, dst}, now, taskKeyPrefix).Result()
if err != nil {
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
}
@@ -672,7 +673,7 @@ func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*bas
var op errors.Op = "rdb.ListDeadlineExceeded"
var msgs []*base.TaskMessage
for _, qname := range qnames {
res, err := listDeadlineExceededCmd.Run(r.client,
res, err := listDeadlineExceededCmd.Run(context.Background(), r.client,
[]string{base.DeadlinesKey(qname)},
deadline.Unix(), base.TaskKeyPrefix(qname)).Result()
if err != nil {
@@ -727,10 +728,10 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo
}
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
if err := r.client.ZAdd(base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
if err := r.client.ZAdd(context.Background(), base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
if err := r.client.ZAdd(context.Background(), base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
}
return r.runScript(op, writeServerStateCmd, []string{skey, wkey}, args...)
@@ -748,10 +749,10 @@ func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
var op errors.Op = "rdb.ClearServerState"
skey := base.ServerInfoKey(host, pid, serverID)
wkey := base.WorkersKey(host, pid, serverID)
if err := r.client.ZRem(base.AllServers, skey).Err(); err != nil {
if err := r.client.ZRem(context.Background(), base.AllServers, skey).Err(); err != nil {
return errors.E(op, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: err})
}
if err := r.client.ZRem(base.AllWorkers, wkey).Err(); err != nil {
if err := r.client.ZRem(context.Background(), base.AllWorkers, wkey).Err(); err != nil {
return errors.E(op, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: err})
}
return r.runScript(op, clearServerStateCmd, []string{skey, wkey})
@@ -781,7 +782,7 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
}
exp := time.Now().Add(ttl).UTC()
key := base.SchedulerEntriesKey(schedulerID)
err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
err := r.client.ZAdd(context.Background(), base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
if err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
}
@@ -792,10 +793,10 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
var op errors.Op = "rdb.ClearSchedulerEntries"
key := base.SchedulerEntriesKey(scheduelrID)
if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil {
if err := r.client.ZRem(context.Background(), base.AllSchedulers, key).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: err})
}
if err := r.client.Del(key).Err(); err != nil {
if err := r.client.Del(context.Background(), key).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: err})
}
return nil
@@ -804,8 +805,8 @@ func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
// CancelationPubSub returns a pubsub for cancelation messages.
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
var op errors.Op = "rdb.CancelationPubSub"
pubsub := r.client.Subscribe(base.CancelChannel)
_, err := pubsub.Receive()
pubsub := r.client.Subscribe(context.Background(), base.CancelChannel)
_, err := pubsub.Receive(context.Background())
if err != nil {
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
@@ -816,7 +817,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
// The message is the ID for the task to be canceled.
func (r *RDB) PublishCancelation(id string) error {
var op errors.Op = "rdb.PublishCancelation"
if err := r.client.Publish(base.CancelChannel, id).Err(); err != nil {
if err := r.client.Publish(context.Background(), base.CancelChannel, id).Err(); err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err))
}
return nil
@@ -856,7 +857,7 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE
func (r *RDB) ClearSchedulerHistory(entryID string) error {
var op errors.Op = "rdb.ClearSchedulerHistory"
key := base.SchedulerHistoryKey(entryID)
if err := r.client.Del(key).Err(); err != nil {
if err := r.client.Del(context.Background(), key).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: err})
}
return nil

View File

@@ -5,6 +5,7 @@
package rdb
import (
"context"
"encoding/json"
"flag"
"strconv"
@@ -13,7 +14,7 @@ import (
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
@@ -85,7 +86,7 @@ func TestEnqueue(t *testing.T) {
// Check Pending list has task ID.
pendingKey := base.PendingKey(tc.msg.Queue)
pendingIDs := r.client.LRange(pendingKey, 0, -1).Val()
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
if n := len(pendingIDs); n != 1 {
t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, n)
continue
@@ -97,26 +98,26 @@ func TestEnqueue(t *testing.T) {
// Check the value under the task key.
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field
decoded := h.MustUnmarshal(t, encoded)
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", decoded, tc.msg, diff)
}
state := r.client.HGet(taskKey, "state").Val() // "state" field
state := r.client.HGet(context.Background(), taskKey, "state").Val() // "state" field
if state != "pending" {
t.Errorf("state field under task-key is set to %q, want %q", state, "pending")
}
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
// Check queue is in the AllQueues set.
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
}
@@ -158,13 +159,13 @@ func TestEnqueueUnique(t *testing.T) {
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
}
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
// Check Pending list has task ID.
pendingKey := base.PendingKey(tc.msg.Queue)
pendingIDs := r.client.LRange(pendingKey, 0, -1).Val()
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
if len(pendingIDs) != 1 {
t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs))
continue
@@ -176,30 +177,30 @@ func TestEnqueueUnique(t *testing.T) {
// Check the value under the task key.
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field
decoded := h.MustUnmarshal(t, encoded)
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", decoded, tc.msg, diff)
}
state := r.client.HGet(taskKey, "state").Val() // "state" field
state := r.client.HGet(context.Background(), taskKey, "state").Val() // "state" field
if state != "pending" {
t.Errorf("state field under task-key is set to %q, want %q", state, "pending")
}
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
uniqueKey := r.client.HGet(taskKey, "unique_key").Val() // "unique_key" field
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
if uniqueKey != tc.msg.UniqueKey {
t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey)
}
// Check queue is in the AllQueues set.
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
@@ -209,7 +210,7 @@ func TestEnqueueUnique(t *testing.T) {
t.Errorf("Second message: (*RDB).EnqueueUnique(msg, ttl) = %v, want %v", got, errors.ErrDuplicateTask)
continue
}
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
gotTTL := r.client.TTL(context.Background(), tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 2)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
@@ -681,7 +682,7 @@ func TestDone(t *testing.T) {
for _, msg := range msgs {
// Set uniqueness lock if unique key is present.
if len(msg.UniqueKey) > 0 {
err := r.client.SetNX(msg.UniqueKey, msg.ID.String(), time.Minute).Err()
err := r.client.SetNX(context.Background(), msg.UniqueKey, msg.ID.String(), time.Minute).Err()
if err != nil {
t.Fatal(err)
}
@@ -711,17 +712,17 @@ func TestDone(t *testing.T) {
}
processedKey := base.ProcessedKey(tc.target.Queue, time.Now())
gotProcessed := r.client.Get(processedKey).Val()
gotProcessed := r.client.Get(context.Background(), processedKey).Val()
if gotProcessed != "1" {
t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedKey, gotProcessed)
}
gotTTL := r.client.TTL(processedKey).Val()
gotTTL := r.client.TTL(context.Background(), processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("%s; TTL %q = %v, want less than or equal to %v", tc.desc, processedKey, gotTTL, statsTTL)
}
if len(tc.target.UniqueKey) > 0 && r.client.Exists(tc.target.UniqueKey).Val() != 0 {
if len(tc.target.UniqueKey) > 0 && r.client.Exists(context.Background(), tc.target.UniqueKey).Val() != 0 {
t.Errorf("%s; Uniqueness lock %q still exists", tc.desc, tc.target.UniqueKey)
}
}
@@ -899,7 +900,7 @@ func TestSchedule(t *testing.T) {
// Check Scheduled zset has task ID.
scheduledKey := base.ScheduledKey(tc.msg.Queue)
zs := r.client.ZRangeWithScores(scheduledKey, 0, -1).Val()
zs := r.client.ZRangeWithScores(context.Background(), scheduledKey, 0, -1).Val()
if n := len(zs); n != 1 {
t.Errorf("Redis ZSET %q contains %d elements, want 1",
scheduledKey, n)
@@ -918,28 +919,28 @@ func TestSchedule(t *testing.T) {
// Check the values under the task key.
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field
decoded := h.MustUnmarshal(t, encoded)
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s",
decoded, tc.msg, diff)
}
state := r.client.HGet(taskKey, "state").Val() // "state" field
state := r.client.HGet(context.Background(), taskKey, "state").Val() // "state" field
if want := "scheduled"; state != want {
t.Errorf("state field under task-key is set to %q, want %q",
state, want)
}
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want)
}
// Check queue is in the AllQueues set.
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
}
@@ -976,7 +977,7 @@ func TestScheduleUnique(t *testing.T) {
// Check Scheduled zset has task ID.
scheduledKey := base.ScheduledKey(tc.msg.Queue)
zs := r.client.ZRangeWithScores(scheduledKey, 0, -1).Val()
zs := r.client.ZRangeWithScores(context.Background(), scheduledKey, 0, -1).Val()
if n := len(zs); n != 1 {
t.Errorf("Redis ZSET %q contains %d elements, want 1",
scheduledKey, n)
@@ -995,32 +996,32 @@ func TestScheduleUnique(t *testing.T) {
// Check the values under the task key.
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field
decoded := h.MustUnmarshal(t, encoded)
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s",
decoded, tc.msg, diff)
}
state := r.client.HGet(taskKey, "state").Val() // "state" field
state := r.client.HGet(context.Background(), taskKey, "state").Val() // "state" field
if want := "scheduled"; state != want {
t.Errorf("state field under task-key is set to %q, want %q",
state, want)
}
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
timeout := r.client.HGet(context.Background(), taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
deadline := r.client.HGet(context.Background(), taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
uniqueKey := r.client.HGet(taskKey, "unique_key").Val() // "unique_key" field
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
if uniqueKey != tc.msg.UniqueKey {
t.Errorf("uniqueue_key field under task key is set to %q, want %q", uniqueKey, tc.msg.UniqueKey)
}
// Check queue is in the AllQueues set.
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
if !r.client.SIsMember(context.Background(), base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
@@ -1031,7 +1032,7 @@ func TestScheduleUnique(t *testing.T) {
continue
}
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
gotTTL := r.client.TTL(context.Background(), tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
@@ -1189,21 +1190,21 @@ func TestRetry(t *testing.T) {
}
processedKey := base.ProcessedKey(tc.msg.Queue, time.Now())
gotProcessed := r.client.Get(processedKey).Val()
gotProcessed := r.client.Get(context.Background(), processedKey).Val()
if gotProcessed != "1" {
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
}
gotTTL := r.client.TTL(processedKey).Val()
gotTTL := r.client.TTL(context.Background(), processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
}
failedKey := base.FailedKey(tc.msg.Queue, time.Now())
gotFailed := r.client.Get(failedKey).Val()
gotFailed := r.client.Get(context.Background(), failedKey).Val()
if gotFailed != "1" {
t.Errorf("GET %q = %q, want 1", failedKey, gotFailed)
}
gotTTL = r.client.TTL(failedKey).Val()
gotTTL = r.client.TTL(context.Background(), failedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL)
}
@@ -1363,14 +1364,14 @@ func TestRetryWithNonFailureError(t *testing.T) {
// If isFailure is set to false, no stats should be recorded to avoid skewing the error rate.
processedKey := base.ProcessedKey(tc.msg.Queue, time.Now())
gotProcessed := r.client.Get(processedKey).Val()
gotProcessed := r.client.Get(context.Background(), processedKey).Val()
if gotProcessed != "" {
t.Errorf("GET %q = %q, want empty", processedKey, gotProcessed)
}
// If isFailure is set to false, no stats should be recorded to avoid skewing the error rate.
failedKey := base.FailedKey(tc.msg.Queue, time.Now())
gotFailed := r.client.Get(failedKey).Val()
gotFailed := r.client.Get(context.Background(), failedKey).Val()
if gotFailed != "" {
t.Errorf("GET %q = %q, want empty", failedKey, gotFailed)
}
@@ -1566,21 +1567,21 @@ func TestArchive(t *testing.T) {
}
processedKey := base.ProcessedKey(tc.target.Queue, time.Now())
gotProcessed := r.client.Get(processedKey).Val()
gotProcessed := r.client.Get(context.Background(), processedKey).Val()
if gotProcessed != "1" {
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
}
gotTTL := r.client.TTL(processedKey).Val()
gotTTL := r.client.TTL(context.Background(), processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
}
failedKey := base.FailedKey(tc.target.Queue, time.Now())
gotFailed := r.client.Get(failedKey).Val()
gotFailed := r.client.Get(context.Background(), failedKey).Val()
if gotFailed != "1" {
t.Errorf("GET %q = %q, want 1", failedKey, gotFailed)
}
gotTTL = r.client.TTL(processedKey).Val()
gotTTL = r.client.TTL(context.Background(), processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL)
}
@@ -1850,7 +1851,7 @@ func TestWriteServerState(t *testing.T) {
// Check ServerInfo was written correctly.
skey := base.ServerInfoKey(host, pid, serverID)
data := r.client.Get(skey).Val()
data := r.client.Get(context.Background(), skey).Val()
got, err := base.DecodeServerInfo([]byte(data))
if err != nil {
t.Fatalf("could not decode server info: %v", err)
@@ -1860,12 +1861,12 @@ func TestWriteServerState(t *testing.T) {
got, info, diff)
}
// Check ServerInfo TTL was set correctly.
gotTTL := r.client.TTL(skey).Val()
gotTTL := r.client.TTL(context.Background(), skey).Val()
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL of %q was %v, want %v", skey, gotTTL, ttl)
}
// Check ServerInfo key was added to the set all server keys correctly.
gotServerKeys := r.client.ZRange(base.AllServers, 0, -1).Val()
gotServerKeys := r.client.ZRange(context.Background(), base.AllServers, 0, -1).Val()
wantServerKeys := []string{skey}
if diff := cmp.Diff(wantServerKeys, gotServerKeys); diff != "" {
t.Errorf("%q contained %v, want %v", base.AllServers, gotServerKeys, wantServerKeys)
@@ -1873,12 +1874,12 @@ func TestWriteServerState(t *testing.T) {
// Check WorkersInfo was written correctly.
wkey := base.WorkersKey(host, pid, serverID)
workerExist := r.client.Exists(wkey).Val()
workerExist := r.client.Exists(context.Background(), wkey).Val()
if workerExist != 0 {
t.Errorf("%q key exists", wkey)
}
// Check WorkersInfo key was added to the set correctly.
gotWorkerKeys := r.client.ZRange(base.AllWorkers, 0, -1).Val()
gotWorkerKeys := r.client.ZRange(context.Background(), base.AllWorkers, 0, -1).Val()
wantWorkerKeys := []string{wkey}
if diff := cmp.Diff(wantWorkerKeys, gotWorkerKeys); diff != "" {
t.Errorf("%q contained %v, want %v", base.AllWorkers, gotWorkerKeys, wantWorkerKeys)
@@ -1940,7 +1941,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
// Check ServerInfo was written correctly.
skey := base.ServerInfoKey(host, pid, serverID)
data := r.client.Get(skey).Val()
data := r.client.Get(context.Background(), skey).Val()
got, err := base.DecodeServerInfo([]byte(data))
if err != nil {
t.Fatalf("could not decode server info: %v", err)
@@ -1950,12 +1951,12 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
got, serverInfo, diff)
}
// Check ServerInfo TTL was set correctly.
gotTTL := r.client.TTL(skey).Val()
gotTTL := r.client.TTL(context.Background(), skey).Val()
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL of %q was %v, want %v", skey, gotTTL, ttl)
}
// Check ServerInfo key was added to the set correctly.
gotServerKeys := r.client.ZRange(base.AllServers, 0, -1).Val()
gotServerKeys := r.client.ZRange(context.Background(), base.AllServers, 0, -1).Val()
wantServerKeys := []string{skey}
if diff := cmp.Diff(wantServerKeys, gotServerKeys); diff != "" {
t.Errorf("%q contained %v, want %v", base.AllServers, gotServerKeys, wantServerKeys)
@@ -1963,7 +1964,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
// Check WorkersInfo was written correctly.
wkey := base.WorkersKey(host, pid, serverID)
wdata := r.client.HGetAll(wkey).Val()
wdata := r.client.HGetAll(context.Background(), wkey).Val()
if len(wdata) != 2 {
t.Fatalf("HGETALL %q returned a hash of size %d, want 2", wkey, len(wdata))
}
@@ -1981,12 +1982,12 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
}
// Check WorkersInfo TTL was set correctly.
gotTTL = r.client.TTL(wkey).Val()
gotTTL = r.client.TTL(context.Background(), wkey).Val()
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL of %q was %v, want %v", wkey, gotTTL, ttl)
}
// Check WorkersInfo key was added to the set correctly.
gotWorkerKeys := r.client.ZRange(base.AllWorkers, 0, -1).Val()
gotWorkerKeys := r.client.ZRange(context.Background(), base.AllWorkers, 0, -1).Val()
wantWorkerKeys := []string{wkey}
if diff := cmp.Diff(wantWorkerKeys, gotWorkerKeys); diff != "" {
t.Errorf("%q contained %v, want %v", base.AllWorkers, gotWorkerKeys, wantWorkerKeys)
@@ -2076,18 +2077,18 @@ func TestClearServerState(t *testing.T) {
otherSKey := base.ServerInfoKey(otherHost, otherPID, otherServerID)
otherWKey := base.WorkersKey(otherHost, otherPID, otherServerID)
// Check all keys are cleared.
if r.client.Exists(skey).Val() != 0 {
if r.client.Exists(context.Background(), skey).Val() != 0 {
t.Errorf("Redis key %q exists", skey)
}
if r.client.Exists(wkey).Val() != 0 {
if r.client.Exists(context.Background(), wkey).Val() != 0 {
t.Errorf("Redis key %q exists", wkey)
}
gotServerKeys := r.client.ZRange(base.AllServers, 0, -1).Val()
gotServerKeys := r.client.ZRange(context.Background(), base.AllServers, 0, -1).Val()
wantServerKeys := []string{otherSKey}
if diff := cmp.Diff(wantServerKeys, gotServerKeys); diff != "" {
t.Errorf("%q contained %v, want %v", base.AllServers, gotServerKeys, wantServerKeys)
}
gotWorkerKeys := r.client.ZRange(base.AllWorkers, 0, -1).Val()
gotWorkerKeys := r.client.ZRange(context.Background(), base.AllWorkers, 0, -1).Val()
wantWorkerKeys := []string{otherWKey}
if diff := cmp.Diff(wantWorkerKeys, gotWorkerKeys); diff != "" {
t.Errorf("%q contained %v, want %v", base.AllWorkers, gotWorkerKeys, wantWorkerKeys)