From 40df09cb367b7eea1a51bf9a184ee9ffac211f53 Mon Sep 17 00:00:00 2001 From: FogDong Date: Thu, 7 Sep 2023 15:04:16 +0800 Subject: [PATCH] feat: add publish and getpubsub for streaming Signed-off-by: FogDong --- asynq.go | 23 +++++++++++++++++------ inspector.go | 10 +++++++++- internal/base/base.go | 2 ++ internal/rdb/rdb.go | 11 +++++++++++ internal/testbroker/testbroker.go | 11 ++++++++++- 5 files changed, 49 insertions(+), 8 deletions(-) diff --git a/asynq.go b/asynq.go index 4c65831..518bff0 100644 --- a/asynq.go +++ b/asynq.go @@ -14,8 +14,8 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" + "github.com/redis/go-redis/v9" ) // Task represents a unit of work to be performed. @@ -438,10 +438,11 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} { // // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Supported formats are: -// redis://[:password@]host[:port][/dbnumber] -// rediss://[:password@]host[:port][/dbnumber] -// redis-socket://[:password@]path[?db=dbnumber] -// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] +// +// redis://[:password@]host[:port][/dbnumber] +// rediss://[:password@]host[:port][/dbnumber] +// redis-socket://[:password@]path[?db=dbnumber] +// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] func ParseRedisURI(uri string) (RedisConnOpt, error) { u, err := url.Parse(uri) if err != nil { @@ -535,12 +536,22 @@ type ResultWriter struct { func (w *ResultWriter) Write(data []byte) (n int, err error) { select { case <-w.ctx.Done(): - return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err()) + return 0, fmt.Errorf("failed to write task result: %v", w.ctx.Err()) default: } return w.broker.WriteResult(w.qname, w.id, data) } +// Publish publishes the given data to the task channel. +func (w *ResultWriter) Publish(data []byte) (n int, err error) { + select { + case <-w.ctx.Done(): + return 0, fmt.Errorf("failed to publish task result: %v", w.ctx.Err()) + default: + } + return w.broker.Publish(w.qname, w.id, data) +} + // TaskID returns the ID of the task the ResultWriter is associated with. func (w *ResultWriter) TaskID() string { return w.id diff --git a/inspector.go b/inspector.go index ee99f52..cfe0f14 100644 --- a/inspector.go +++ b/inspector.go @@ -5,15 +5,16 @@ package asynq import ( + "context" "fmt" "strconv" "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // Inspector is a client interface to inspect and mutate the state of @@ -235,6 +236,13 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) { return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil } +// GetTaskPubSub returns a pubsub instance for the given task. +func (i *Inspector) GetTaskPubSub(queue, id string) *redis.PubSub { + taskKey := base.TaskKey(queue, id) + ctx := context.Background() + return i.rdb.Client().Subscribe(ctx, taskKey) +} + // ListOption specifies behavior of list operation. type ListOption interface{} diff --git a/internal/base/base.go b/internal/base/base.go index a63c548..77afdcf 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -752,4 +752,6 @@ type Broker interface { PublishCancelation(id string) error WriteResult(qname, id string, data []byte) (n int, err error) + + Publish(qname, id string, data []byte) (n int, err error) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c79a9c5..e4789f5 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1519,3 +1519,14 @@ func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) { } return len(data), nil } + +// Publish publishes the given task message to the specified channel. +func (r *RDB) Publish(qname, taskID string, data []byte) (int, error) { + var op errors.Op = "rdb.Publish" + ctx := context.Background() + taskKey := base.TaskKey(qname, taskID) + if err := r.client.Publish(ctx, taskKey, data).Err(); err != nil { + return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "publish", Err: err}) + } + return len(data), nil +} diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 368ae3e..5c6ff40 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" + "github.com/redis/go-redis/v9" ) var errRedisDown = errors.New("testutil: redis is down") @@ -217,6 +217,15 @@ func (tb *TestBroker) WriteResult(qname, id string, data []byte) (int, error) { return tb.real.WriteResult(qname, id, data) } +func (tb *TestBroker) Publish(qname, id string, data []byte) (int, error) { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return 0, errRedisDown + } + return tb.real.Publish(qname, id, data) +} + func (tb *TestBroker) Ping() error { tb.mu.Lock() defer tb.mu.Unlock()