2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-19 23:30:20 +08:00

Merge 40df09cb367b7eea1a51bf9a184ee9ffac211f53 into c327bc40a28e4db45195cfe082d88faa808ce87d

This commit is contained in:
Tianxin Dong 2025-04-03 20:24:47 +08:00 committed by GitHub
commit 5bb1739d51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 47 additions and 6 deletions

View File

@ -14,8 +14,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
) )
// Task represents a unit of work to be performed. // Task represents a unit of work to be performed.
@ -442,6 +442,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// //
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are: // Supported formats are:
//
// redis://[:password@]host[:port][/dbnumber] // redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber] // rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber] // redis-socket://[:password@]path[?db=dbnumber]
@ -539,12 +540,22 @@ type ResultWriter struct {
func (w *ResultWriter) Write(data []byte) (n int, err error) { func (w *ResultWriter) Write(data []byte) (n int, err error) {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err()) return 0, fmt.Errorf("failed to write task result: %v", w.ctx.Err())
default: default:
} }
return w.broker.WriteResult(w.qname, w.id, data) return w.broker.WriteResult(w.qname, w.id, data)
} }
// Publish publishes the given data to the task channel.
func (w *ResultWriter) Publish(data []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, fmt.Errorf("failed to publish task result: %v", w.ctx.Err())
default:
}
return w.broker.Publish(w.qname, w.id, data)
}
// TaskID returns the ID of the task the ResultWriter is associated with. // TaskID returns the ID of the task the ResultWriter is associated with.
func (w *ResultWriter) TaskID() string { func (w *ResultWriter) TaskID() string {
return w.id return w.id

View File

@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@ -250,6 +251,13 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
} }
// GetTaskPubSub returns a pubsub instance for the given task.
func (i *Inspector) GetTaskPubSub(queue, id string) *redis.PubSub {
taskKey := base.TaskKey(queue, id)
ctx := context.Background()
return i.rdb.Client().Subscribe(ctx, taskKey)
}
// ListOption specifies behavior of list operation. // ListOption specifies behavior of list operation.
type ListOption interface{} type ListOption interface{}

View File

@ -722,4 +722,6 @@ type Broker interface {
PublishCancelation(id string) error PublishCancelation(id string) error
WriteResult(qname, id string, data []byte) (n int, err error) WriteResult(qname, id string, data []byte) (n int, err error)
Publish(qname, id string, data []byte) (n int, err error)
} }

View File

@ -1556,3 +1556,14 @@ func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) {
} }
return len(data), nil return len(data), nil
} }
// Publish publishes the given task message to the specified channel.
func (r *RDB) Publish(qname, taskID string, data []byte) (int, error) {
var op errors.Op = "rdb.Publish"
ctx := context.Background()
taskKey := base.TaskKey(qname, taskID)
if err := r.client.Publish(ctx, taskKey, data).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "publish", Err: err})
}
return len(data), nil
}

View File

@ -217,6 +217,15 @@ func (tb *TestBroker) WriteResult(qname, id string, data []byte) (int, error) {
return tb.real.WriteResult(qname, id, data) return tb.real.WriteResult(qname, id, data)
} }
func (tb *TestBroker) Publish(qname, id string, data []byte) (int, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return 0, errRedisDown
}
return tb.real.Publish(qname, id, data)
}
func (tb *TestBroker) Ping() error { func (tb *TestBroker) Ping() error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()