diff --git a/client.go b/client.go index 248b452..71bc154 100644 --- a/client.go +++ b/client.go @@ -24,7 +24,7 @@ import ( // // Clients are safe for concurrent use by multiple goroutines. type Client struct { - rdb *rdb.RDB + broker base.Broker } // NewClient returns a new Client instance given a redis connection option. @@ -33,7 +33,7 @@ func NewClient(r RedisConnOpt) *Client { if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - return &Client{rdb: rdb.NewRDB(c)} + return &Client{broker: rdb.NewRDB(c)} } type OptionType int @@ -309,7 +309,7 @@ var ( // Close closes the connection with redis. func (c *Client) Close() error { - return c.rdb.Close() + return c.broker.Close() } // Enqueue enqueues the given task to a queue. @@ -406,22 +406,22 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL time.Duration) error { if uniqueTTL > 0 { - return c.rdb.EnqueueUnique(ctx, msg, uniqueTTL) + return c.broker.EnqueueUnique(ctx, msg, uniqueTTL) } - return c.rdb.Enqueue(ctx, msg) + return c.broker.Enqueue(ctx, msg) } func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error { if uniqueTTL > 0 { ttl := t.Add(uniqueTTL).Sub(time.Now()) - return c.rdb.ScheduleUnique(ctx, msg, t, ttl) + return c.broker.ScheduleUnique(ctx, msg, t, ttl) } - return c.rdb.Schedule(ctx, msg, t) + return c.broker.Schedule(ctx, msg, t) } func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string, uniqueTTL time.Duration) error { if uniqueTTL > 0 { - return c.rdb.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL) + return c.broker.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL) } - return c.rdb.AddToGroup(ctx, msg, groupKey) + return c.broker.AddToGroup(ctx, msg, groupKey) }