From 6d65ebfb35fd8e6878ab9d885aadce837262fca6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 14 Nov 2019 21:07:19 -0800 Subject: [PATCH] Initial commit --- .gitignore | 15 +++++++++++++ asynq.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 9 ++++++++ go.sum | 20 +++++++++++++++++ 4 files changed, 110 insertions(+) create mode 100644 .gitignore create mode 100644 asynq.go create mode 100644 go.mod create mode 100644 go.sum diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..20ec1f7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Ignore examples for now +/examples \ No newline at end of file diff --git a/asynq.go b/asynq.go new file mode 100644 index 0000000..2689063 --- /dev/null +++ b/asynq.go @@ -0,0 +1,66 @@ +package asynq + +import ( + "encoding/json" + "time" + + "github.com/go-redis/redis/v7" + "github.com/google/uuid" +) + +// Redis keys +const ( + queuePrefix = "asynq:queues:" + scheduled = "asynq:scheduled" +) + +// Client is an interface for scheduling tasks. +type Client struct { + rdb *redis.Client +} + +// Task represents a task to be performed. +type Task struct { + Handler string + Args []interface{} +} + +type delayedTask struct { + ID string + Queue string + Task *Task +} + +// RedisOpt specifies redis options. +type RedisOpt struct { + Addr string + Password string +} + +// NewClient creates and returns a new client. +func NewClient(opt *RedisOpt) *Client { + rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + return &Client{rdb: rdb} +} + +// Enqueue pushes a given task to the specified queue. +func (c *Client) Enqueue(queue string, task *Task, delay time.Duration) error { + if delay == 0 { + bytes, err := json.Marshal(task) + if err != nil { + return err + } + return c.rdb.RPush(queuePrefix+queue, string(bytes)).Err() + } + executeAt := time.Now().Add(delay) + dt := &delayedTask{ + ID: uuid.New().String(), + Queue: queue, + Task: task, + } + bytes, err := json.Marshal(dt) + if err != nil { + return err + } + return c.rdb.ZAdd(scheduled, &redis.Z{Member: string(bytes), Score: float64(executeAt.Unix())}).Err() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..12e3d14 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/hibiken/asynq + +go 1.13 + +require ( + github.com/go-redis/redis v6.15.6+incompatible + github.com/go-redis/redis/v7 v7.0.0-beta.4 + github.com/google/uuid v1.1.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a2a817c --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-redis/redis v6.15.6+incompatible h1:H9evprGPLI8+ci7fxQx6WNZHJSb7be8FqJQRhdQZ5Sg= +github.com/go-redis/redis v6.15.6+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.0.0-beta.4 h1:p6z7Pde69EGRWvlC++y8aFcaWegyrKHzOBGo0zUACTQ= +github.com/go-redis/redis/v7 v7.0.0-beta.4/go.mod h1:xhhSbUMTsleRPur+Vgx9sUHtyN33bdjxY+9/0n9Ig8s= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=