2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-20 09:16:12 +08:00

Compare commits

..

12 Commits

Author SHA1 Message Date
Ken Hibino
32d3f329b9 v0.17.1 2021-04-04 12:51:00 -07:00
Ken Hibino
544c301a8b Fix bug in RDB.memoryUsage 2021-04-04 12:49:19 -07:00
Ken Hibino
8b997d2fab v0.17.0 2021-03-24 16:51:59 -07:00
Ken Hibino
901105a8d7 Add dial, read, write timeout options to RedisConnOpt 2021-03-24 16:49:04 -07:00
Ken Hibino
aaa3f1d4fd v0.16.1 2021-03-20 06:27:03 -07:00
disc
4722ca2d3d Replaced blocking KEYS XXX:* command to non-blocking SCAN XXX:*
More details: https://redis.io/commands/KEYS
2021-03-20 06:24:08 -07:00
Ken Hibino
6a9d9fd717 v0.16.0 2021-03-10 20:39:46 -08:00
Ken Hibino
de28c1ea19 Add Unregister method to Scheduler 2021-03-10 20:38:44 -08:00
Ken Hibino
f618f5b1f5 Add benchmark tests for rdb package 2021-03-07 16:27:14 -08:00
Ken Hibino
aa936466b3 Minor fix 2021-03-07 16:27:14 -08:00
Ken Hibino
5d1ec70544 Run CI build with go1.16 2021-02-25 09:31:17 -08:00
Ken Hibino
d1d3be9b00 Add Web UI section in README 2021-02-01 17:01:04 -08:00
14 changed files with 470 additions and 22 deletions

View File

@@ -20,7 +20,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15.x
go-version: 1.16.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt
- name: Upload Benchmark

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.13.x, 1.14.x, 1.15.x]
go-version: [1.13.x, 1.14.x, 1.15.x, 1.16.x]
runs-on: ${{ matrix.os }}
services:
redis:

View File

@@ -7,6 +7,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.17.1] - 2021-04-04
### Fixed
- Fix bug in internal `RDB.memoryUsage` method.
## [0.17.0] - 2021-03-24
### Added
- `DialTimeout`, `ReadTimeout`, and `WriteTimeout` options are added to `RedisConnOpt`.
## [0.16.1] - 2021-03-20
### Fixed
- Replace `KEYS` command with `SCAN` as recommended by [redis doc](https://redis.io/commands/KEYS).
## [0.16.0] - 2021-03-10
### Added
- `Unregister` method is added to `Scheduler` to remove a registered entry.
## [0.15.0] - 2021-01-31
**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"
@@ -15,7 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer,
update your code to pass as a value.
update your code to pass as a value.
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
### Added

View File

@@ -44,6 +44,7 @@ A system can consist of multiple worker servers and brokers, giving way to high
- [Periodic Tasks](https://github.com/hibiken/asynq/wiki/Periodic-Tasks)
- [Support Redis Cluster](https://github.com/hibiken/asynq/wiki/Redis-Cluster) for automatic sharding and high availability
- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for high availability
- [Web UI](#web-ui) to inspect and remote-control queues and tasks
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks
## Quickstart
@@ -251,6 +252,19 @@ For a more detailed walk-through of the library, see our [Getting Started Guide]
To Learn more about `asynq` features and APIs, see our [Wiki](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
## Web UI
[Asynqmon](https://github.com/hibiken/asynqmon) is a web based tool for monitoring and administrating Asynq queues and tasks.
Please see the tool's [README](https://github.com/hibiken/asynqmon) for details.
Here's a few screenshots of the web UI.
**Queues view**
![Web UI QueuesView](/docs/assets/asynqmon-queues-view.png)
**Tasks view**
![Web UI TasksView](/docs/assets/asynqmon-task-view.png)
## Command Line Tool
Asynq ships with a command line tool to inspect the state of queues and tasks.

View File

@@ -10,6 +10,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"github.com/go-redis/redis/v7"
)
@@ -68,6 +69,26 @@ type RedisClientOpt struct {
// See: https://redis.io/commands/select.
DB int
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimout.
WriteTimeout time.Duration
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
@@ -79,13 +100,16 @@ type RedisClientOpt struct {
func (opt RedisClientOpt) MakeRedisClient() interface{} {
return redis.NewClient(&redis.Options{
Network: opt.Network,
Addr: opt.Addr,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
Network: opt.Network,
Addr: opt.Addr,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
}
@@ -116,6 +140,26 @@ type RedisFailoverClientOpt struct {
// See: https://redis.io/commands/select.
DB int
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimeout
WriteTimeout time.Duration
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
@@ -133,12 +177,15 @@ func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
}
// RedisFailoverClientOpt is used to creates a redis client that connects to
// RedisClusterClientOpt is used to creates a redis client that connects to
// redis cluster.
type RedisClusterClientOpt struct {
// A seed list of host:port addresses of cluster nodes.
@@ -157,6 +204,26 @@ type RedisClusterClientOpt struct {
// See: https://redis.io/commands/auth.
Password string
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimeout.
WriteTimeout time.Duration
// TLS Config used to connect to a server.
// TLS will be negotiated only if this field is set.
TLSConfig *tls.Config
@@ -168,6 +235,9 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
MaxRedirects: opt.MaxRedirects,
Username: opt.Username,
Password: opt.Password,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
TLSConfig: opt.TLSConfig,
})
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 279 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 347 KiB

View File

@@ -19,7 +19,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.15.0"
const Version = "0.17.1"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"

View File

@@ -0,0 +1,266 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package rdb
import (
"fmt"
"testing"
"time"
"github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
func BenchmarkEnqueue(b *testing.B) {
r := setup(b)
msg := asynqtest.NewTaskMessage("task1", nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.Enqueue(msg); err != nil {
b.Fatalf("Enqueue failed: %v", err)
}
}
}
func BenchmarkEnqueueUnique(b *testing.B) {
r := setup(b)
msg := &base.TaskMessage{
Type: "task1",
Payload: nil,
Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey("default", "task1", nil),
}
uniqueTTL := 5 * time.Minute
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.EnqueueUnique(msg, uniqueTTL); err != nil {
b.Fatalf("EnqueueUnique failed: %v", err)
}
}
}
func BenchmarkSchedule(b *testing.B) {
r := setup(b)
msg := asynqtest.NewTaskMessage("task1", nil)
processAt := time.Now().Add(3 * time.Minute)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.Schedule(msg, processAt); err != nil {
b.Fatalf("Schedule failed: %v", err)
}
}
}
func BenchmarkScheduleUnique(b *testing.B) {
r := setup(b)
msg := &base.TaskMessage{
Type: "task1",
Payload: nil,
Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey("default", "task1", nil),
}
processAt := time.Now().Add(3 * time.Minute)
uniqueTTL := 5 * time.Minute
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.ScheduleUnique(msg, processAt, uniqueTTL); err != nil {
b.Fatalf("EnqueueUnique failed: %v", err)
}
}
}
func BenchmarkDequeueSingleQueue(b *testing.B) {
r := setup(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
for i := 0; i < 10; i++ {
m := asynqtest.NewTaskMessageWithQueue(
fmt.Sprintf("task%d", i), nil, base.DefaultQueueName)
if err := r.Enqueue(m); err != nil {
b.Fatalf("Enqueue failed: %v", err)
}
}
b.StartTimer()
if _, _, err := r.Dequeue(base.DefaultQueueName); err != nil {
b.Fatalf("Dequeue failed: %v", err)
}
}
}
func BenchmarkDequeueMultipleQueues(b *testing.B) {
qnames := []string{"critical", "default", "low"}
r := setup(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
for i := 0; i < 10; i++ {
for _, qname := range qnames {
m := asynqtest.NewTaskMessageWithQueue(
fmt.Sprintf("%s_task%d", qname, i), nil, qname)
if err := r.Enqueue(m); err != nil {
b.Fatalf("Enqueue failed: %v", err)
}
}
}
b.StartTimer()
if _, _, err := r.Dequeue(qnames...); err != nil {
b.Fatalf("Dequeue failed: %v", err)
}
}
}
func BenchmarkDone(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Done(msgs[0]); err != nil {
b.Fatalf("Done failed: %v", err)
}
}
}
func BenchmarkRetry(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Retry(msgs[0], time.Now().Add(1*time.Minute), "error"); err != nil {
b.Fatalf("Retry failed: %v", err)
}
}
}
func BenchmarkArchive(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Archive(msgs[0], "error"); err != nil {
b.Fatalf("Archive failed: %v", err)
}
}
}
func BenchmarkRequeue(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Requeue(msgs[0]); err != nil {
b.Fatalf("Requeue failed: %v", err)
}
}
}
func BenchmarkCheckAndEnqueue(b *testing.B) {
r := setup(b)
now := time.Now()
var zs []base.Z
for i := -100; i < 100; i++ {
msg := asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)
score := now.Add(time.Duration(i) * time.Second).Unix()
zs = append(zs, base.Z{Message: msg, Score: score})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedScheduledQueue(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.CheckAndEnqueue(base.DefaultQueueName); err != nil {
b.Fatalf("CheckAndEnqueue failed: %v", err)
}
}
}

View File

@@ -172,9 +172,21 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
func (r *RDB) memoryUsage(qname string) (int64, error) {
keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result()
if err != nil {
return 0, err
var (
keys []string
data []string
cursor uint64
err error
)
for {
data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
if err != nil {
return 0, err
}
keys = append(keys, data...)
if cursor == 0 {
break
}
}
var usg int64
for _, k := range keys {

View File

@@ -37,12 +37,12 @@ func init() {
flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses")
}
func setup(t *testing.T) (r *RDB) {
t.Helper()
func setup(tb testing.TB) (r *RDB) {
tb.Helper()
if useRedisCluster {
addrs := strings.Split(redisClusterAddrs, ",")
if len(addrs) == 0 {
t.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
}
r = NewRDB(redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
@@ -54,7 +54,7 @@ func setup(t *testing.T) (r *RDB) {
}))
}
// Start each test with a clean slate.
h.FlushDB(t, r.client)
h.FlushDB(tb, r.client)
return r
}

View File

@@ -30,6 +30,10 @@ type Scheduler struct {
done chan struct{}
wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error)
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
}
// NewScheduler returns a new Scheduler instance given the redis connection option.
@@ -65,6 +69,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
location: loc,
done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
}
}
@@ -145,12 +150,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
logger: s.logger,
errHandler: s.errHandler,
}
if _, err = s.cron.AddJob(cronspec, job); err != nil {
cronID, err := s.cron.AddJob(cronspec, job)
if err != nil {
return "", err
}
s.idmap[job.id.String()] = cronID
return job.id.String(), nil
}
// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
s.cron.Remove(cronID)
return nil
}
// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped.
func (s *Scheduler) Run() error {

View File

@@ -14,7 +14,7 @@ import (
"github.com/hibiken/asynq/internal/base"
)
func TestScheduler(t *testing.T) {
func TestSchedulerRegister(t *testing.T) {
tests := []struct {
cronspec string
task *Task
@@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
}
mu.Unlock()
}
func TestSchedulerUnregister(t *testing.T) {
tests := []struct {
cronspec string
task *Task
opts []Option
wait time.Duration
queue string
}{
{
cronspec: "@every 3s",
task: NewTask("task1", nil),
opts: []Option{MaxRetry(10)},
wait: 10 * time.Second,
queue: "default",
},
}
r := setup(t)
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
if err := scheduler.Unregister(entryID); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil {
t.Fatal(err)
}
got := asynqtest.GetPendingMessages(t, r, tc.queue)
if len(got) != 0 {
t.Errorf("%d tasks were enqueued, want zero", len(got))
}
}
}

View File

@@ -70,7 +70,7 @@ func TestServerRun(t *testing.T) {
go func() {
select {
case <-time.After(10 * time.Second):
t.Fatal("server did not stop after receiving TERM signal")
panic("server did not stop after receiving TERM signal")
case <-done:
}
}()