diff --git a/asynq.go b/asynq.go index 1ffcc41..a27c14d 100644 --- a/asynq.go +++ b/asynq.go @@ -8,21 +8,9 @@ TODOs: - [P2] Web UI */ -import ( - "sync" - "time" - - "github.com/go-redis/redis/v7" -) - // Max retry count by default const defaultMaxRetry = 25 -// Client is an interface for scheduling tasks. -type Client struct { - rdb *rdb -} - // Task represents a task to be performed. type Task struct { // Type indicates the kind of the task to be performed. @@ -59,83 +47,3 @@ type RedisOpt struct { Addr string Password string } - -// NewClient creates and returns a new client. -func NewClient(opt *RedisOpt) *Client { - client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) - return &Client{rdb: newRDB(client)} -} - -// Process enqueues the task to be performed at a given time. -func (c *Client) Process(task *Task, executeAt time.Time) error { - msg := &taskMessage{ - Type: task.Type, - Payload: task.Payload, - Queue: "default", - Retry: defaultMaxRetry, - } - return c.enqueue(msg, executeAt) -} - -// enqueue pushes a given task to the specified queue. -func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { - if time.Now().After(executeAt) { - return c.rdb.push(msg) - } - return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg) -} - -//-------------------- Launcher -------------------- - -// Launcher starts the manager and poller. -type Launcher struct { - // running indicates whether manager and poller are both running. - running bool - mu sync.Mutex - - poller *poller - - manager *manager -} - -// NewLauncher creates and returns a new Launcher. -func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { - client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) - rdb := newRDB(client) - poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) - manager := newManager(rdb, poolSize, nil) - return &Launcher{ - poller: poller, - manager: manager, - } -} - -// TaskHandler handles a given task and report any error. -type TaskHandler func(*Task) error - -// Start starts the manager and poller. -func (l *Launcher) Start(handler TaskHandler) { - l.mu.Lock() - defer l.mu.Unlock() - if l.running { - return - } - l.running = true - l.manager.handler = handler - - l.poller.start() - l.manager.start() -} - -// Stop stops both manager and poller. -func (l *Launcher) Stop() { - l.mu.Lock() - defer l.mu.Unlock() - if !l.running { - return - } - l.running = false - - l.poller.terminate() - l.manager.terminate() -} diff --git a/client.go b/client.go new file mode 100644 index 0000000..b0eec16 --- /dev/null +++ b/client.go @@ -0,0 +1,37 @@ +package asynq + +import ( + "time" + + "github.com/go-redis/redis/v7" +) + +// Client is an interface for scheduling tasks. +type Client struct { + rdb *rdb +} + +// NewClient creates and returns a new client. +func NewClient(opt *RedisOpt) *Client { + client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + return &Client{rdb: newRDB(client)} +} + +// Process enqueues the task to be performed at a given time. +func (c *Client) Process(task *Task, executeAt time.Time) error { + msg := &taskMessage{ + Type: task.Type, + Payload: task.Payload, + Queue: "default", + Retry: defaultMaxRetry, + } + return c.enqueue(msg, executeAt) +} + +// enqueue pushes a given task to the specified queue. +func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { + if time.Now().After(executeAt) { + return c.rdb.push(msg) + } + return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg) +} diff --git a/launcher.go b/launcher.go new file mode 100644 index 0000000..8d23dbf --- /dev/null +++ b/launcher.go @@ -0,0 +1,61 @@ +package asynq + +import ( + "sync" + "time" + + "github.com/go-redis/redis/v7" +) + +// Launcher starts the manager and poller. +type Launcher struct { + // running indicates whether manager and poller are both running. + running bool + mu sync.Mutex + + poller *poller + + manager *manager +} + +// NewLauncher creates and returns a new Launcher. +func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { + client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + rdb := newRDB(client) + poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) + manager := newManager(rdb, poolSize, nil) + return &Launcher{ + poller: poller, + manager: manager, + } +} + +// TaskHandler handles a given task and report any error. +type TaskHandler func(*Task) error + +// Start starts the manager and poller. +func (l *Launcher) Start(handler TaskHandler) { + l.mu.Lock() + defer l.mu.Unlock() + if l.running { + return + } + l.running = true + l.manager.handler = handler + + l.poller.start() + l.manager.start() +} + +// Stop stops both manager and poller. +func (l *Launcher) Stop() { + l.mu.Lock() + defer l.mu.Unlock() + if !l.running { + return + } + l.running = false + + l.poller.terminate() + l.manager.terminate() +}