diff --git a/asynq.go b/asynq.go index dded627..b77dce8 100644 --- a/asynq.go +++ b/asynq.go @@ -14,8 +14,8 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" + "github.com/redis/go-redis/v9" ) // Task represents a unit of work to be performed. @@ -442,10 +442,11 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} { // // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Supported formats are: -// redis://[:password@]host[:port][/dbnumber] -// rediss://[:password@]host[:port][/dbnumber] -// redis-socket://[:password@]path[?db=dbnumber] -// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] +// +// redis://[:password@]host[:port][/dbnumber] +// rediss://[:password@]host[:port][/dbnumber] +// redis-socket://[:password@]path[?db=dbnumber] +// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] func ParseRedisURI(uri string) (RedisConnOpt, error) { u, err := url.Parse(uri) if err != nil { @@ -539,12 +540,22 @@ type ResultWriter struct { func (w *ResultWriter) Write(data []byte) (n int, err error) { select { case <-w.ctx.Done(): - return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err()) + return 0, fmt.Errorf("failed to write task result: %v", w.ctx.Err()) default: } return w.broker.WriteResult(w.qname, w.id, data) } +// Publish publishes the given data to the task channel. +func (w *ResultWriter) Publish(data []byte) (n int, err error) { + select { + case <-w.ctx.Done(): + return 0, fmt.Errorf("failed to publish task result: %v", w.ctx.Err()) + default: + } + return w.broker.Publish(w.qname, w.id, data) +} + // TaskID returns the ID of the task the ResultWriter is associated with. func (w *ResultWriter) TaskID() string { return w.id diff --git a/inspector.go b/inspector.go index e361d22..1d38341 100644 --- a/inspector.go +++ b/inspector.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "fmt" "strconv" "strings" @@ -250,6 +251,13 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) { return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil } +// GetTaskPubSub returns a pubsub instance for the given task. +func (i *Inspector) GetTaskPubSub(queue, id string) *redis.PubSub { + taskKey := base.TaskKey(queue, id) + ctx := context.Background() + return i.rdb.Client().Subscribe(ctx, taskKey) +} + // ListOption specifies behavior of list operation. type ListOption interface{} diff --git a/internal/base/base.go b/internal/base/base.go index 90e635e..0dc646c 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -722,4 +722,6 @@ type Broker interface { PublishCancelation(id string) error WriteResult(qname, id string, data []byte) (n int, err error) + + Publish(qname, id string, data []byte) (n int, err error) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df506..f12aa17 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1556,3 +1556,14 @@ func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) { } return len(data), nil } + +// Publish publishes the given task message to the specified channel. +func (r *RDB) Publish(qname, taskID string, data []byte) (int, error) { + var op errors.Op = "rdb.Publish" + ctx := context.Background() + taskKey := base.TaskKey(qname, taskID) + if err := r.client.Publish(ctx, taskKey, data).Err(); err != nil { + return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "publish", Err: err}) + } + return len(data), nil +} diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index ffab6fe..63c7631 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -217,6 +217,15 @@ func (tb *TestBroker) WriteResult(qname, id string, data []byte) (int, error) { return tb.real.WriteResult(qname, id, data) } +func (tb *TestBroker) Publish(qname, id string, data []byte) (int, error) { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return 0, errRedisDown + } + return tb.real.Publish(qname, id, data) +} + func (tb *TestBroker) Ping() error { tb.mu.Lock() defer tb.mu.Unlock()