diff --git a/inspector.go b/inspector.go index 59dd6b9..ae44dda 100644 --- a/inspector.go +++ b/inspector.go @@ -948,7 +948,7 @@ func (i *Inspector) CancelProcessing(id string) error { // guarantee that the task with the given id will be canceled. The return // value only indicates whether the cancelation signal has been sent. func (i *Inspector) CancelProcessingContext(ctx context.Context, id string) error { - return i.rdb.PublishCancelationContext(ctx, id) + return i.rdb.PublishCancelation(ctx, id) } // PauseQueue pauses task processing on the specified queue. diff --git a/internal/base/base.go b/internal/base/base.go index 300fbf2..83d14e2 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -749,7 +749,7 @@ type Broker interface { // Cancelation related methods CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers - PublishCancelation(id string) error + PublishCancelation(ctx context.Context, id string) error WriteResult(qname, id string, data []byte) (n int, err error) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 0a02e20..67c8e75 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1459,13 +1459,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { // PublishCancelation publish cancelation message to all subscribers. // The message is the ID for the task to be canceled. -func (r *RDB) PublishCancelation(id string) error { - return r.PublishCancelationContext(context.Background(), id) -} - -// PublishCancelationContext publish cancelation message to all subscribers. -// The message is the ID for the task to be canceled. -func (r *RDB) PublishCancelationContext(ctx context.Context, id string) error { +func (r *RDB) PublishCancelation(ctx context.Context, id string) error { var op errors.Op = "rdb.PublishCancelation" if err := r.client.Publish(ctx, base.CancelChannel, id).Err(); err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err)) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 81ffb4b..7035ec2 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3053,7 +3053,7 @@ func TestCancelationPubSub(t *testing.T) { publish := []string{"one", "two", "three"} for _, msg := range publish { - r.PublishCancelation(msg) + r.PublishCancelation(context.Background(), msg) } // allow for message to reach subscribers. diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 7fd2f78..fdcfa4e 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -199,13 +199,13 @@ func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) { return tb.real.CancelationPubSub() } -func (tb *TestBroker) PublishCancelation(id string) error { +func (tb *TestBroker) PublishCancelation(ctx context.Context, id string) error { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return errRedisDown } - return tb.real.PublishCancelation(id) + return tb.real.PublishCancelation(ctx, id) } func (tb *TestBroker) WriteResult(qname, id string, data []byte) (int, error) { diff --git a/subscriber_test.go b/subscriber_test.go index ec4e65b..5336b24 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "sync" "testing" "time" @@ -51,7 +52,7 @@ func TestSubscriber(t *testing.T) { // wait for subscriber to establish connection to pubsub channel time.Sleep(time.Second) - if err := rdbClient.PublishCancelation(tc.publishID); err != nil { + if err := rdbClient.PublishCancelation(context.Background(), tc.publishID); err != nil { t.Fatalf("could not publish cancelation message: %v", err) } @@ -110,7 +111,7 @@ func TestSubscriberWithRedisDown(t *testing.T) { called = true }) - if err := r.PublishCancelation(id); err != nil { + if err := r.PublishCancelation(context.Background(), id); err != nil { t.Fatalf("could not publish cancelation message: %v", err) }