mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
feat: add publish and getpubsub for streaming
Signed-off-by: FogDong <fog@bentoml.com>
This commit is contained in:
parent
6b98c0bbae
commit
40df09cb36
15
asynq.go
15
asynq.go
@ -14,8 +14,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Task represents a unit of work to be performed.
|
// Task represents a unit of work to be performed.
|
||||||
@ -438,6 +438,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
|
|||||||
//
|
//
|
||||||
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
|
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
|
||||||
// Supported formats are:
|
// Supported formats are:
|
||||||
|
//
|
||||||
// redis://[:password@]host[:port][/dbnumber]
|
// redis://[:password@]host[:port][/dbnumber]
|
||||||
// rediss://[:password@]host[:port][/dbnumber]
|
// rediss://[:password@]host[:port][/dbnumber]
|
||||||
// redis-socket://[:password@]path[?db=dbnumber]
|
// redis-socket://[:password@]path[?db=dbnumber]
|
||||||
@ -535,12 +536,22 @@ type ResultWriter struct {
|
|||||||
func (w *ResultWriter) Write(data []byte) (n int, err error) {
|
func (w *ResultWriter) Write(data []byte) (n int, err error) {
|
||||||
select {
|
select {
|
||||||
case <-w.ctx.Done():
|
case <-w.ctx.Done():
|
||||||
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
|
return 0, fmt.Errorf("failed to write task result: %v", w.ctx.Err())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
return w.broker.WriteResult(w.qname, w.id, data)
|
return w.broker.WriteResult(w.qname, w.id, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish publishes the given data to the task channel.
|
||||||
|
func (w *ResultWriter) Publish(data []byte) (n int, err error) {
|
||||||
|
select {
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
return 0, fmt.Errorf("failed to publish task result: %v", w.ctx.Err())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return w.broker.Publish(w.qname, w.id, data)
|
||||||
|
}
|
||||||
|
|
||||||
// TaskID returns the ID of the task the ResultWriter is associated with.
|
// TaskID returns the ID of the task the ResultWriter is associated with.
|
||||||
func (w *ResultWriter) TaskID() string {
|
func (w *ResultWriter) TaskID() string {
|
||||||
return w.id
|
return w.id
|
||||||
|
10
inspector.go
10
inspector.go
@ -5,15 +5,16 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Inspector is a client interface to inspect and mutate the state of
|
// Inspector is a client interface to inspect and mutate the state of
|
||||||
@ -235,6 +236,13 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
|
|||||||
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
|
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetTaskPubSub returns a pubsub instance for the given task.
|
||||||
|
func (i *Inspector) GetTaskPubSub(queue, id string) *redis.PubSub {
|
||||||
|
taskKey := base.TaskKey(queue, id)
|
||||||
|
ctx := context.Background()
|
||||||
|
return i.rdb.Client().Subscribe(ctx, taskKey)
|
||||||
|
}
|
||||||
|
|
||||||
// ListOption specifies behavior of list operation.
|
// ListOption specifies behavior of list operation.
|
||||||
type ListOption interface{}
|
type ListOption interface{}
|
||||||
|
|
||||||
|
@ -752,4 +752,6 @@ type Broker interface {
|
|||||||
PublishCancelation(id string) error
|
PublishCancelation(id string) error
|
||||||
|
|
||||||
WriteResult(qname, id string, data []byte) (n int, err error)
|
WriteResult(qname, id string, data []byte) (n int, err error)
|
||||||
|
|
||||||
|
Publish(qname, id string, data []byte) (n int, err error)
|
||||||
}
|
}
|
||||||
|
@ -1519,3 +1519,14 @@ func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish publishes the given task message to the specified channel.
|
||||||
|
func (r *RDB) Publish(qname, taskID string, data []byte) (int, error) {
|
||||||
|
var op errors.Op = "rdb.Publish"
|
||||||
|
ctx := context.Background()
|
||||||
|
taskKey := base.TaskKey(qname, taskID)
|
||||||
|
if err := r.client.Publish(ctx, taskKey, data).Err(); err != nil {
|
||||||
|
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "publish", Err: err})
|
||||||
|
}
|
||||||
|
return len(data), nil
|
||||||
|
}
|
||||||
|
@ -11,8 +11,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errRedisDown = errors.New("testutil: redis is down")
|
var errRedisDown = errors.New("testutil: redis is down")
|
||||||
@ -217,6 +217,15 @@ func (tb *TestBroker) WriteResult(qname, id string, data []byte) (int, error) {
|
|||||||
return tb.real.WriteResult(qname, id, data)
|
return tb.real.WriteResult(qname, id, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tb *TestBroker) Publish(qname, id string, data []byte) (int, error) {
|
||||||
|
tb.mu.Lock()
|
||||||
|
defer tb.mu.Unlock()
|
||||||
|
if tb.sleeping {
|
||||||
|
return 0, errRedisDown
|
||||||
|
}
|
||||||
|
return tb.real.Publish(qname, id, data)
|
||||||
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) Ping() error {
|
func (tb *TestBroker) Ping() error {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user