diff --git a/CHANGELOG.md b/CHANGELOG.md index 6438706..0a81ba7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `Scheduler` type is added to enable periodic tasks. See the godoc for its APIs and [wiki](https://github.com/hibiken/asynq/wiki/Periodic-Tasks) for the getting-started guide. + ### Changed + - interface `Option` has changed. See the godoc for the new interface. This change would have no impact as long as you are using exported functions (e.g. `MaxRetry`, `Queue`, etc) to create `Option`s. diff --git a/README.md b/README.md index e923b69..98d485a 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ A system can consist of multiple worker servers and brokers, giving way to high - Allow [timeout and deadline per task](https://github.com/hibiken/asynq/wiki/Task-Timeout-and-Cancelation) - [Flexible handler interface with support for middlewares](https://github.com/hibiken/asynq/wiki/Handler-Deep-Dive) - [Ability to pause queue](/tools/asynq/README.md#pause) to stop processing tasks from the queue +- [Periodic Tasks](https://github.com/hibiken/asynq/wiki/Periodic-Tasks) - [Support Redis Cluster](https://github.com/hibiken/asynq/wiki/Redis-Cluster) for automatic sharding and high availability - [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for high availability - [CLI](#command-line-tool) to inspect and remote-control queues and tasks diff --git a/example_test.go b/example_test.go index 5921ef8..1620d3f 100644 --- a/example_test.go +++ b/example_test.go @@ -9,6 +9,7 @@ import ( "log" "os" "os/signal" + "time" "github.com/hibiken/asynq" "golang.org/x/sys/unix" @@ -78,6 +79,25 @@ func ExampleServer_Quiet() { srv.Stop() } +func ExampleScheduler() { + scheduler := asynq.NewScheduler( + asynq.RedisClientOpt{Addr: ":6379"}, + &asynq.SchedulerOpts{Location: time.Local}, + ) + + if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { + log.Fatal(err) + } + if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { + log.Fatal(err) + } + + // Run blocks and waits for os signal to terminate the program. + if err := scheduler.Run(); err != nil { + log.Fatal(err) + } +} + func ExampleParseRedisURI() { rconn, err := asynq.ParseRedisURI("redis://localhost:6379/10") if err != nil { diff --git a/scheduler.go b/scheduler.go index 0cd5dea..8430e7c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -126,7 +126,7 @@ func (j *enqueueJob) Run() { } } -// Register registers a task to be enqueued with given schedule specified by the cronspec. +// Register registers a task to be enqueued on the given schedule specified by the cronspec. // It returns an ID of the newly registered entry. func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { job := &enqueueJob{ diff --git a/tools/asynq/cmd/cron.go b/tools/asynq/cmd/cron.go index 9c86bc9..94c2f97 100644 --- a/tools/asynq/cmd/cron.go +++ b/tools/asynq/cmd/cron.go @@ -84,6 +84,7 @@ func prevEnqueue(prevEnqueuedAt time.Time) string { return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second)) } +// TODO: Paginate the result set. func cronHistory(cmd *cobra.Command, args []string) { r := createRDB() for i, entryID := range args {