diff --git a/client.go b/client.go index 7948036..a62aa6c 100644 --- a/client.go +++ b/client.go @@ -150,9 +150,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) } // TTL duration must be greater than or equal to 1 second. // // Uniqueness of a task is based on the following properties: -// - Task Type -// - Task Payload -// - Queue Name +// - Task Type +// - Task Payload +// - Queue Name func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } @@ -426,3 +426,8 @@ func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, group st } return c.broker.AddToGroup(ctx, msg, group) } + +// StateChanged watchs state updates, with more customized detail +func (c *Client) StateChanged(handler func(map[string]interface{}), more ...string) error { + return c.broker.StateChanged(handler, more...) +} diff --git a/internal/base/base.go b/internal/base/base.go index ec342f8..3dcce40 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -752,4 +752,7 @@ type Broker interface { PublishCancelation(id string) error WriteResult(qname, id string, data []byte) (n int, err error) + + // StateChanged watch state updates, with more customized detail + StateChanged(handler func(map[string]interface{}), more ...string) error } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 7b25f15..33389d4 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -7,8 +7,10 @@ package rdb import ( "context" + "encoding/json" "fmt" "math" + "strings" "time" "github.com/go-redis/redis/v8" @@ -131,6 +133,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { if n == 0 { return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict) } + r.state(ctx, msg, "pending") return nil } @@ -198,6 +201,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time if n == 0 { return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict) } + r.state(ctx, msg, "pending") return nil } @@ -464,9 +468,21 @@ func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error { // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(msg.UniqueKey) > 0 { keys = append(keys, msg.UniqueKey) - return r.runScript(ctx, op, markAsCompleteUniqueCmd, keys, argv...) + err := r.runScript(ctx, op, markAsCompleteUniqueCmd, keys, argv...) + if err == nil { + r.state(ctx, msg, "completed") + } else { + r.state(ctx, msg, "failed") + } + return err } - return r.runScript(ctx, op, markAsCompleteCmd, keys, argv...) + err = r.runScript(ctx, op, markAsCompleteCmd, keys, argv...) + if err == nil { + r.state(ctx, msg, "completed") + } else { + r.state(ctx, msg, "failed") + } + return err } // KEYS[1] -> asynq:{}:active @@ -495,7 +511,11 @@ func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error { base.PendingKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), } - return r.runScript(ctx, op, requeueCmd, keys, msg.ID) + err := r.runScript(ctx, op, requeueCmd, keys, msg.ID) + if err == nil { + r.state(ctx, msg, "pending") + } + return err } // KEYS[1] -> asynq:{}:t: @@ -550,6 +570,7 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st if n == 0 { return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict) } + r.state(ctx, msg, "aggregating") return nil } @@ -617,6 +638,7 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group if n == 0 { return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict) } + r.state(ctx, msg, "aggregating") return nil } @@ -667,6 +689,7 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim if n == 0 { return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict) } + r.state(ctx, msg, "scheduled") return nil } @@ -731,6 +754,7 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process if n == 0 { return errors.E(op, errors.AlreadyExists, errors.ErrTaskIdConflict) } + r.state(ctx, msg, "scheduled") return nil } @@ -813,7 +837,11 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T isFailure, int64(math.MaxInt64), } - return r.runScript(ctx, op, retryCmd, keys, argv...) + err = r.runScript(ctx, op, retryCmd, keys, argv...) + if err == nil { + r.state(ctx, msg, "retry") + } + return err } const ( @@ -899,7 +927,11 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) expireAt.Unix(), int64(math.MaxInt64), } - return r.runScript(ctx, op, archiveCmd, keys, argv...) + err = r.runScript(ctx, op, archiveCmd, keys, argv...) + if err == nil { + r.state(ctx, msg, "archived") + } + return err } // ForwardIfReady checks scheduled and retry sets of the given queues @@ -1444,6 +1476,91 @@ func (r *RDB) ClearSchedulerEntries(scheduelrID string) error { return nil } +func (r *RDB) state(ctx context.Context, msg *base.TaskMessage, state string) { + out := map[string]interface{}{ + "queue": msg.Queue, + "id": msg.ID, + "state": state, + } + if len(msg.GroupKey) > 0 { + out["group"] = msg.GroupKey + } + if len(msg.UniqueKey) > 0 { + out["unique"] = msg.UniqueKey + } + payload, _ := json.Marshal(out) + r.client.Publish(ctx, "state-changed", payload) +} + +// StateChanged watch state updates, with more customized detail +func (r *RDB) StateChanged(handler func(map[string]interface{}), more ...string) error { + ctx := context.Background() + pubsub := r.client.Subscribe(ctx, "state-changed") + details := map[string]string{ + "completed": "result", + } + if len(more) > 0 { + key := more[0] + i := strings.Index(key, ":") + state := "" + if i > -1 { + state = key[:i] + key = key[i+1:] + } + if len(state) > 0 && len(key) > 0 && + key == "task" || key == "next" || + key == "result" || key == "message" { + details[state] = key + } + } + + for m := range pubsub.Channel() { + var out map[string]interface{} + json.Unmarshal([]byte(m.Payload), &out) + s, ok := out["state"] + if !ok { + continue + } + state := s.(string) + key, ok := details[state] + if !ok { + key, ok = details["*"] + } + if !ok { + handler(out) + continue + } + res, err := r.GetTaskInfo(out["queue"].(string), out["id"].(string)) + if err != nil { + out["err"] = err.Error() + handler(out) + continue + } + msg := res.Message + if state == "completed" { + out["at"] = msg.CompletedAt + } + var data interface{} + switch key { + case "next": + data = res.NextProcessAt + case "task": + data = res + case "message": + data = msg + default: + if len(res.Result) > 0 { + data = res.Result + } + } + if data != nil { + out[key] = data + } + handler(out) + } + return nil +} + // CancelationPubSub returns a pubsub for cancelation messages. func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { var op errors.Op = "rdb.CancelationPubSub" diff --git a/server.go b/server.go index 4bf04e0..6e4f875 100644 --- a/server.go +++ b/server.go @@ -697,3 +697,8 @@ func (srv *Server) Stop() { srv.processor.stop() srv.logger.Info("Processor stopped") } + +// StateChanged watch state updates, with more customized detail +func (srv *Server) StateChanged(handler func(map[string]interface{}), more ...string) error { + return srv.broker.StateChanged(handler, more...) +}