Add StateChanged(func(map[string]interface{}), more ...string) to client and server, to watch state updates with more customized detail, e.g. "completed:result" as default, e.g. pending:next\|task\|message\|result

This commit is contained in:
mindon
2022-10-19 11:11:05 +08:00
parent c08f142b56
commit dbe886c071
4 changed files with 138 additions and 8 deletions

View File

@@ -7,8 +7,10 @@ package rdb
import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
"time"
"github.com/go-redis/redis/v8"
@@ -131,6 +133,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
r.state(ctx, msg, "pending")
return nil
}
@@ -198,6 +201,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
r.state(ctx, msg, "pending")
return nil
}
@@ -464,9 +468,21 @@ func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error {
// Note: We cannot pass empty unique key when running this script in redis-cluster.
if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey)
return r.runScript(ctx, op, markAsCompleteUniqueCmd, keys, argv...)
err := r.runScript(ctx, op, markAsCompleteUniqueCmd, keys, argv...)
if err == nil {
r.state(ctx, msg, "completed")
} else {
r.state(ctx, msg, "failed")
}
return err
}
return r.runScript(ctx, op, markAsCompleteCmd, keys, argv...)
err = r.runScript(ctx, op, markAsCompleteCmd, keys, argv...)
if err == nil {
r.state(ctx, msg, "completed")
} else {
r.state(ctx, msg, "failed")
}
return err
}
// KEYS[1] -> asynq:{<qname>}:active
@@ -495,7 +511,11 @@ func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error {
base.PendingKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID),
}
return r.runScript(ctx, op, requeueCmd, keys, msg.ID)
err := r.runScript(ctx, op, requeueCmd, keys, msg.ID)
if err == nil {
r.state(ctx, msg, "pending")
}
return err
}
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
@@ -550,6 +570,7 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
r.state(ctx, msg, "aggregating")
return nil
}
@@ -617,6 +638,7 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
r.state(ctx, msg, "aggregating")
return nil
}
@@ -667,6 +689,7 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
r.state(ctx, msg, "scheduled")
return nil
}
@@ -731,6 +754,7 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
if n == 0 {
return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict)
}
r.state(ctx, msg, "scheduled")
return nil
}
@@ -813,7 +837,11 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T
isFailure,
int64(math.MaxInt64),
}
return r.runScript(ctx, op, retryCmd, keys, argv...)
err = r.runScript(ctx, op, retryCmd, keys, argv...)
if err == nil {
r.state(ctx, msg, "retry")
}
return err
}
const (
@@ -899,7 +927,11 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string)
expireAt.Unix(),
int64(math.MaxInt64),
}
return r.runScript(ctx, op, archiveCmd, keys, argv...)
err = r.runScript(ctx, op, archiveCmd, keys, argv...)
if err == nil {
r.state(ctx, msg, "archived")
}
return err
}
// ForwardIfReady checks scheduled and retry sets of the given queues
@@ -1444,6 +1476,91 @@ func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
return nil
}
func (r *RDB) state(ctx context.Context, msg *base.TaskMessage, state string) {
out := map[string]interface{}{
"queue": msg.Queue,
"id": msg.ID,
"state": state,
}
if len(msg.GroupKey) > 0 {
out["group"] = msg.GroupKey
}
if len(msg.UniqueKey) > 0 {
out["unique"] = msg.UniqueKey
}
payload, _ := json.Marshal(out)
r.client.Publish(ctx, "state-changed", payload)
}
// StateChanged watch state updates, with more customized detail
func (r *RDB) StateChanged(handler func(map[string]interface{}), more ...string) error {
ctx := context.Background()
pubsub := r.client.Subscribe(ctx, "state-changed")
details := map[string]string{
"completed": "result",
}
if len(more) > 0 {
key := more[0]
i := strings.Index(key, ":")
state := ""
if i > -1 {
state = key[:i]
key = key[i+1:]
}
if len(state) > 0 && len(key) > 0 &&
key == "task" || key == "next" ||
key == "result" || key == "message" {
details[state] = key
}
}
for m := range pubsub.Channel() {
var out map[string]interface{}
json.Unmarshal([]byte(m.Payload), &out)
s, ok := out["state"]
if !ok {
continue
}
state := s.(string)
key, ok := details[state]
if !ok {
key, ok = details["*"]
}
if !ok {
handler(out)
continue
}
res, err := r.GetTaskInfo(out["queue"].(string), out["id"].(string))
if err != nil {
out["err"] = err.Error()
handler(out)
continue
}
msg := res.Message
if state == "completed" {
out["at"] = msg.CompletedAt
}
var data interface{}
switch key {
case "next":
data = res.NextProcessAt
case "task":
data = res
case "message":
data = msg
default:
if len(res.Result) > 0 {
data = res.Result
}
}
if data != nil {
out[key] = data
}
handler(out)
}
return nil
}
// CancelationPubSub returns a pubsub for cancelation messages.
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
var op errors.Op = "rdb.CancelationPubSub"