mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Remove base.DeadlinesKey
This commit is contained in:
parent
59927509d8
commit
d4006894ad
@ -231,13 +231,6 @@ func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z,
|
|||||||
seedRedisZSet(tb, r, base.ArchivedKey(qname), entries, base.TaskStateArchived)
|
seedRedisZSet(tb, r, base.ArchivedKey(qname), entries, base.TaskStateArchived)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedDeadlines initializes the deadlines set with the given entries.
|
|
||||||
func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
|
||||||
tb.Helper()
|
|
||||||
r.SAdd(context.Background(), base.AllQueues, qname)
|
|
||||||
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SeedLease initializes the lease set with the given entries.
|
// SeedLease initializes the lease set with the given entries.
|
||||||
func SeedLease(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
func SeedLease(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
@ -294,14 +287,6 @@ func SeedAllArchivedQueues(tb testing.TB, r redis.UniversalClient, archived map[
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedAllDeadlines initializes all of the deadlines with the given entries.
|
|
||||||
func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) {
|
|
||||||
tb.Helper()
|
|
||||||
for q, entries := range deadlines {
|
|
||||||
SeedDeadlines(tb, r, entries, q)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SeedAllLease initializes all of the lease sets with the given entries.
|
// SeedAllLease initializes all of the lease sets with the given entries.
|
||||||
func SeedAllLease(tb testing.TB, r redis.UniversalClient, lease map[string][]base.Z) {
|
func SeedAllLease(tb testing.TB, r redis.UniversalClient, lease map[string][]base.Z) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
@ -330,8 +315,6 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
|
|||||||
data := map[string]interface{}{
|
data := map[string]interface{}{
|
||||||
"msg": encoded,
|
"msg": encoded,
|
||||||
"state": state.String(),
|
"state": state.String(),
|
||||||
"timeout": msg.Timeout,
|
|
||||||
"deadline": msg.Deadline,
|
|
||||||
"unique_key": msg.UniqueKey,
|
"unique_key": msg.UniqueKey,
|
||||||
}
|
}
|
||||||
if err := c.HSet(context.Background(), key, data).Err(); err != nil {
|
if err := c.HSet(context.Background(), key, data).Err(); err != nil {
|
||||||
@ -360,8 +343,6 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
|
|||||||
data := map[string]interface{}{
|
data := map[string]interface{}{
|
||||||
"msg": encoded,
|
"msg": encoded,
|
||||||
"state": state.String(),
|
"state": state.String(),
|
||||||
"timeout": msg.Timeout,
|
|
||||||
"deadline": msg.Deadline,
|
|
||||||
"unique_key": msg.UniqueKey,
|
"unique_key": msg.UniqueKey,
|
||||||
}
|
}
|
||||||
if err := c.HSet(context.Background(), key, data).Err(); err != nil {
|
if err := c.HSet(context.Background(), key, data).Err(); err != nil {
|
||||||
@ -439,13 +420,6 @@ func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []
|
|||||||
return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey, base.TaskStateArchived)
|
return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey, base.TaskStateArchived)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
|
|
||||||
// It also asserts the state field of the task.
|
|
||||||
func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
|
||||||
tb.Helper()
|
|
||||||
return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLeaseEntries returns all task IDs and its score in the lease set for the given queue.
|
// GetLeaseEntries returns all task IDs and its score in the lease set for the given queue.
|
||||||
// It also asserts the state field of the task.
|
// It also asserts the state field of the task.
|
||||||
func GetLeaseEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
func GetLeaseEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||||
|
@ -137,11 +137,6 @@ func ArchivedKey(qname string) string {
|
|||||||
return fmt.Sprintf("%sarchived", QueueKeyPrefix(qname))
|
return fmt.Sprintf("%sarchived", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeadlinesKey returns a redis key for the deadlines.
|
|
||||||
func DeadlinesKey(qname string) string {
|
|
||||||
return fmt.Sprintf("%sdeadlines", QueueKeyPrefix(qname))
|
|
||||||
}
|
|
||||||
|
|
||||||
// LeaseKey returns a redis key for the lease.
|
// LeaseKey returns a redis key for the lease.
|
||||||
func LeaseKey(qname string) string {
|
func LeaseKey(qname string) string {
|
||||||
return fmt.Sprintf("%slease", QueueKeyPrefix(qname))
|
return fmt.Sprintf("%slease", QueueKeyPrefix(qname))
|
||||||
|
@ -72,23 +72,6 @@ func TestActiveKey(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeadlinesKey(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
qname string
|
|
||||||
want string
|
|
||||||
}{
|
|
||||||
{"default", "asynq:{default}:deadlines"},
|
|
||||||
{"custom", "asynq:{custom}:deadlines"},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tests {
|
|
||||||
got := DeadlinesKey(tc.qname)
|
|
||||||
if got != tc.want {
|
|
||||||
t.Errorf("DeadlinesKey(%q) = %q, want %q", tc.qname, got, tc.want)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLeaseKey(t *testing.T) {
|
func TestLeaseKey(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
qname string
|
qname string
|
||||||
|
@ -163,7 +163,7 @@ func BenchmarkDone(b *testing.B) {
|
|||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
asynqtest.FlushDB(b, r.client)
|
asynqtest.FlushDB(b, r.client)
|
||||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
asynqtest.SeedLease(b, r.client, zs, base.DefaultQueueName)
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := r.Done(ctx, msgs[0]); err != nil {
|
if err := r.Done(ctx, msgs[0]); err != nil {
|
||||||
@ -190,7 +190,7 @@ func BenchmarkRetry(b *testing.B) {
|
|||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
asynqtest.FlushDB(b, r.client)
|
asynqtest.FlushDB(b, r.client)
|
||||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
asynqtest.SeedLease(b, r.client, zs, base.DefaultQueueName)
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := r.Retry(ctx, msgs[0], time.Now().Add(1*time.Minute), "error", true /*isFailure*/); err != nil {
|
if err := r.Retry(ctx, msgs[0], time.Now().Add(1*time.Minute), "error", true /*isFailure*/); err != nil {
|
||||||
@ -217,7 +217,7 @@ func BenchmarkArchive(b *testing.B) {
|
|||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
asynqtest.FlushDB(b, r.client)
|
asynqtest.FlushDB(b, r.client)
|
||||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
asynqtest.SeedLease(b, r.client, zs, base.DefaultQueueName)
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := r.Archive(ctx, msgs[0], "error"); err != nil {
|
if err := r.Archive(ctx, msgs[0], "error"); err != nil {
|
||||||
@ -244,7 +244,7 @@ func BenchmarkRequeue(b *testing.B) {
|
|||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
asynqtest.FlushDB(b, r.client)
|
asynqtest.FlushDB(b, r.client)
|
||||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
asynqtest.SeedLease(b, r.client, zs, base.DefaultQueueName)
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := r.Requeue(ctx, msgs[0]); err != nil {
|
if err := r.Requeue(ctx, msgs[0]); err != nil {
|
||||||
|
@ -1387,7 +1387,7 @@ func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
|
|||||||
// KEYS[3] -> asynq:{<qname>}:scheduled
|
// KEYS[3] -> asynq:{<qname>}:scheduled
|
||||||
// KEYS[4] -> asynq:{<qname>}:retry
|
// KEYS[4] -> asynq:{<qname>}:retry
|
||||||
// KEYS[5] -> asynq:{<qname>}:archived
|
// KEYS[5] -> asynq:{<qname>}:archived
|
||||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
// KEYS[6] -> asynq:{<qname>}:lease
|
||||||
// --
|
// --
|
||||||
// ARGV[1] -> task key prefix
|
// ARGV[1] -> task key prefix
|
||||||
//
|
//
|
||||||
@ -1447,7 +1447,7 @@ return 1`)
|
|||||||
// KEYS[3] -> asynq:{<qname>}:scheduled
|
// KEYS[3] -> asynq:{<qname>}:scheduled
|
||||||
// KEYS[4] -> asynq:{<qname>}:retry
|
// KEYS[4] -> asynq:{<qname>}:retry
|
||||||
// KEYS[5] -> asynq:{<qname>}:archived
|
// KEYS[5] -> asynq:{<qname>}:archived
|
||||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
// KEYS[6] -> asynq:{<qname>}:lease
|
||||||
// --
|
// --
|
||||||
// ARGV[1] -> task key prefix
|
// ARGV[1] -> task key prefix
|
||||||
//
|
//
|
||||||
@ -1516,7 +1516,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
|||||||
base.ScheduledKey(qname),
|
base.ScheduledKey(qname),
|
||||||
base.RetryKey(qname),
|
base.RetryKey(qname),
|
||||||
base.ArchivedKey(qname),
|
base.ArchivedKey(qname),
|
||||||
base.DeadlinesKey(qname),
|
base.LeaseKey(qname),
|
||||||
}
|
}
|
||||||
res, err := script.Run(context.Background(), r.client, keys, base.TaskKeyPrefix(qname)).Result()
|
res, err := script.Run(context.Background(), r.client, keys, base.TaskKeyPrefix(qname)).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -4036,7 +4036,7 @@ func TestRemoveQueue(t *testing.T) {
|
|||||||
keys := []string{
|
keys := []string{
|
||||||
base.PendingKey(tc.qname),
|
base.PendingKey(tc.qname),
|
||||||
base.ActiveKey(tc.qname),
|
base.ActiveKey(tc.qname),
|
||||||
base.DeadlinesKey(tc.qname),
|
base.LeaseKey(tc.qname),
|
||||||
base.ScheduledKey(tc.qname),
|
base.ScheduledKey(tc.qname),
|
||||||
base.RetryKey(tc.qname),
|
base.RetryKey(tc.qname),
|
||||||
base.ArchivedKey(tc.qname),
|
base.ArchivedKey(tc.qname),
|
||||||
|
Loading…
Reference in New Issue
Block a user