diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 8e4cdb2..82357ac 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -69,9 +69,15 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - keys := []string{base.TaskKey(msg.Queue, msg.ID.String()), base.PendingKey(msg.Queue)} - args := []interface{}{encoded, msg.ID.String()} - return enqueueCmd.Run(r.client, keys, args...).Err() + keys := []string{ + base.TaskKey(msg.Queue, msg.ID.String()), + base.PendingKey(msg.Queue), + } + argv := []interface{}{ + encoded, + msg.ID.String(), + } + return enqueueCmd.Run(r.client, keys, argv...).Err() } // KEYS[1] -> unique key @@ -100,9 +106,17 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - keys := []string{msg.UniqueKey, base.TaskKey(msg.Queue, msg.ID.String()), base.PendingKey(msg.Queue)} - args := []interface{}{msg.ID.String(), int(ttl.Seconds()), encoded} - res, err := enqueueUniqueCmd.Run(r.client, keys, args...).Result() + keys := []string{ + msg.UniqueKey, + base.TaskKey(msg.Queue, msg.ID.String()), + base.PendingKey(msg.Queue), + } + argv := []interface{}{ + msg.ID.String(), + int(ttl.Seconds()), + encoded, + } + res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { return err } @@ -308,9 +322,16 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - keys := []string{base.TaskKey(msg.Queue, msg.ID.String()), base.ScheduledKey(msg.Queue)} - args := []interface{}{encoded, processAt.Unix(), msg.ID.String()} - return scheduleCmd.Run(r.client, keys, args...).Err() + keys := []string{ + base.TaskKey(msg.Queue, msg.ID.String()), + base.ScheduledKey(msg.Queue), + } + argv := []interface{}{ + encoded, + processAt.Unix(), + msg.ID.String(), + } + return scheduleCmd.Run(r.client, keys, argv...).Err() } // KEYS[1] -> unique key @@ -340,8 +361,17 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - keys := []string{msg.UniqueKey, base.TaskKey(msg.Queue, msg.ID.String()), base.ScheduledKey(msg.Queue)} - argv := []interface{}{msg.ID.String(), int(ttl.Seconds()), processAt.Unix(), encoded} + keys := []string{ + msg.UniqueKey, + base.TaskKey(msg.Queue, msg.ID.String()), + base.ScheduledKey(msg.Queue), + } + argv := []interface{}{ + msg.ID.String(), + int(ttl.Seconds()), + processAt.Unix(), + encoded, + } res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { return err