mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Add ListScheduelerEnqueueEvents to Inspector
This commit is contained in:
parent
c06e9de97d
commit
f4dd8fe962
@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
### Added
|
### Added
|
||||||
|
|
||||||
- `CancelActiveTask` method is added to `Inspector`.
|
- `CancelActiveTask` method is added to `Inspector`.
|
||||||
|
- `ListSchedulerEnqueueEvents` method is added to `Inspector`.
|
||||||
- `SchedulerEntries` method is added to `Inspector`.
|
- `SchedulerEntries` method is added to `Inspector`.
|
||||||
- `DeleteQueue` method is added to `Inspector`.
|
- `DeleteQueue` method is added to `Inspector`.
|
||||||
|
|
||||||
|
26
inspector.go
26
inspector.go
@ -701,3 +701,29 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
|
|||||||
}
|
}
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
|
||||||
|
type SchedulerEnqueueEvent struct {
|
||||||
|
// ID of the task that was enqueued.
|
||||||
|
TaskID string
|
||||||
|
|
||||||
|
// Time the task was enqueued.
|
||||||
|
EnqueuedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
|
||||||
|
//
|
||||||
|
// By default, it retrieves the first 30 tasks.
|
||||||
|
func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) {
|
||||||
|
opt := composeListOptions(opts...)
|
||||||
|
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||||
|
data, err := i.rdb.ListSchedulerEnqueueEvents(entryID, pgn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var events []*SchedulerEnqueueEvent
|
||||||
|
for _, e := range data {
|
||||||
|
events = append(events, &SchedulerEnqueueEvent{TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt})
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
@ -853,9 +853,9 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
|
// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
|
||||||
func (r *RDB) ListSchedulerEnqueueEvents(entryID string) ([]*base.SchedulerEnqueueEvent, error) {
|
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) {
|
||||||
key := base.SchedulerHistoryKey(entryID)
|
key := base.SchedulerHistoryKey(entryID)
|
||||||
zs, err := r.client.ZRangeWithScores(key, 0, -1).Result()
|
zs, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user