2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update package doc and readme

This commit is contained in:
Ken Hibino 2021-03-18 21:43:10 -07:00
parent b3b50d26a2
commit 4c53446c10
2 changed files with 61 additions and 37 deletions

View File

@ -12,8 +12,8 @@ Asynq is a Go library for queueing tasks and processing them asynchronously with
Highlevel overview of how Asynq works: Highlevel overview of how Asynq works:
- Client puts task on a queue - Client puts tasks on a queue
- Server pulls task off queues and starts a worker goroutine for each task - Server pulls tasks off queues and starts a worker goroutine for each task
- Tasks are processed concurrently by multiple workers - Tasks are processed concurrently by multiple workers
Task queues are used as a mechanism to distribute work across multiple machines. Task queues are used as a mechanism to distribute work across multiple machines.
@ -72,19 +72,36 @@ const (
TypeImageResize = "image:resize" TypeImageResize = "image:resize"
) )
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
type ImageResizePayload struct {
SourceURL string
}
//---------------------------------------------- //----------------------------------------------
// Write a function NewXXXTask to create a task. // Write a function NewXXXTask to create a task.
// A task consists of a type and a payload. // A task consists of a type and a payload.
//---------------------------------------------- //----------------------------------------------
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} payload := EmailDeliveryPayload{UserID: userID, TemplateID: templID}
return asynq.NewTask(TypeEmailDelivery, payload) bytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, bytes), nil
} }
func NewImageResizeTask(src string) *asynq.Task { func NewImageResizeTask(src string) (*asynq.Task, error) {
payload := map[string]interface{}{"src": src} payload := ImageResizePayload{SourceURL: src}
return asynq.NewTask(TypeImageResize, payload) bytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return asynq.NewTask(TypeImageResize, bytes), nil
} }
//--------------------------------------------------------------- //---------------------------------------------------------------
@ -96,15 +113,11 @@ func NewImageResizeTask(src string) *asynq.Task {
//--------------------------------------------------------------- //---------------------------------------------------------------
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
userID, err := t.Payload.GetInt("user_id") var p EmailDeliveryPayload
if err != nil { if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
} }
tmplID, err := t.Payload.GetString("template_id") log.Printf("Sending Email to User: user_id = %d, template_id = %s\n", p.UserID, p.TemplateID)
if err != nil {
return err
}
fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
// Email delivery code ... // Email delivery code ...
return nil return nil
} }
@ -115,11 +128,11 @@ type ImageProcessor struct {
} }
func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src") var p ImageResizePayload
if err != nil { if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
} }
fmt.Printf("Resize image: src = %s\n", src) log.Printf("Resizing image: src = %s\n", p.SourceURL)
// Image resizing code ... // Image resizing code ...
return nil return nil
} }
@ -155,10 +168,13 @@ func main() {
// Use (*Client).Enqueue method. // Use (*Client).Enqueue method.
// ------------------------------------------------------ // ------------------------------------------------------
t := tasks.NewEmailDeliveryTask(42, "some:template:id") t, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
res, err := c.Enqueue(t) res, err := c.Enqueue(t)
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatalf("could not enqueue task: %v", err)
} }
fmt.Printf("Enqueued Result: %+v\n", res) fmt.Printf("Enqueued Result: %+v\n", res)
@ -168,10 +184,9 @@ func main() {
// Use ProcessIn or ProcessAt option. // Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------ // ------------------------------------------------------------
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
if err != nil { if err != nil {
log.Fatal("could not schedule task: %v", err) log.Fatalf("could not schedule task: %v", err)
} }
fmt.Printf("Enqueued Result: %+v\n", res) fmt.Printf("Enqueued Result: %+v\n", res)
@ -183,19 +198,21 @@ func main() {
c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute)) c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
t = tasks.NewImageResizeTask("some/blobstore/path") t, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
res, err = c.Enqueue(t) res, err = c.Enqueue(t)
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatalf("could not enqueue task: %v", err)
} }
fmt.Printf("Enqueued Result: %+v\n", res) fmt.Printf("Enqueued Result: %+v\n", res)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Example 4: Pass options to tune task processing behavior at enqueue time. // Example 4: Pass options to tune task processing behavior at enqueue time.
// Options passed at enqueue time override default ones, if any. // Options passed at enqueue time override default ones.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
t = tasks.NewImageResizeTask("some/blobstore/path")
res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatal("could not enqueue task: %v", err)

25
doc.go
View File

@ -11,7 +11,7 @@ specify the connection using one of RedisConnOpt types.
redisConnOpt = asynq.RedisClientOpt{ redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379", Addr: "127.0.0.1:6379",
Password: "xxxxx", Password: "xxxxx",
DB: 3, DB: 2,
} }
The Client is used to enqueue a task. The Client is used to enqueue a task.
@ -20,12 +20,16 @@ The Client is used to enqueue a task.
client := asynq.NewClient(redisConnOpt) client := asynq.NewClient(redisConnOpt)
// Task is created with two parameters: its type and payload. // Task is created with two parameters: its type and payload.
t := asynq.NewTask( // Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
"send_email", b, err := json.Marshal(ExamplePayload{UserID: 42})
map[string]interface{}{"user_id": 42}) if err != nil {
log.Fatal(err)
}
task := asynq.NewTask("example", b)
// Enqueue the task to be processed immediately. // Enqueue the task to be processed immediately.
res, err := client.Enqueue(t) res, err := client.Enqueue(task)
// Schedule the task to be processed after one minute. // Schedule the task to be processed after one minute.
res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
@ -52,10 +56,13 @@ Example of a type that implements the Handler interface.
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type { switch task.Type {
case "send_email": case "example":
id, err := task.Payload.GetInt("user_id") var data ExamplePayload
// send email if err := json.Unmarshal(task.Payload(), &data); err != nil {
//... return err
}
// perform task with the data
default: default:
return fmt.Errorf("unexpected task type %q", task.Type) return fmt.Errorf("unexpected task type %q", task.Type)
} }