diff --git a/CHANGELOG.md b/CHANGELOG.md index 1650fc0..3cec214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `CancelActiveTask` method is added to `Inspector`. +- `ListSchedulerEnqueueEvents` method is added to `Inspector`. - `SchedulerEntries` method is added to `Inspector`. - `DeleteQueue` method is added to `Inspector`. diff --git a/inspector.go b/inspector.go index 8ad8ee5..5f9b8e4 100644 --- a/inspector.go +++ b/inspector.go @@ -701,3 +701,29 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { } return entries, nil } + +// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. +type SchedulerEnqueueEvent struct { + // ID of the task that was enqueued. + TaskID string + + // Time the task was enqueued. + EnqueuedAt time.Time +} + +// ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry. +// +// By default, it retrieves the first 30 tasks. +func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) { + opt := composeListOptions(opts...) + pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} + data, err := i.rdb.ListSchedulerEnqueueEvents(entryID, pgn) + if err != nil { + return nil, err + } + var events []*SchedulerEnqueueEvent + for _, e := range data { + events = append(events, &SchedulerEnqueueEvent{TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt}) + } + return events, nil +} diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index db62d44..d5933cb 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -853,9 +853,9 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) { } // ListSchedulerEnqueueEvents returns the list of scheduler enqueue events. -func (r *RDB) ListSchedulerEnqueueEvents(entryID string) ([]*base.SchedulerEnqueueEvent, error) { +func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) { key := base.SchedulerHistoryKey(entryID) - zs, err := r.client.ZRangeWithScores(key, 0, -1).Result() + zs, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result() if err != nil { return nil, err }