mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
fix some typos
Signed-off-by: cui fliter <imcusg@gmail.com>
This commit is contained in:
parent
783071c47f
commit
cc777ebdaa
@ -47,7 +47,7 @@ Task queues are used as a mechanism to distribute work across multiple machines.
|
|||||||
|
|
||||||
**Status**: The library is currently undergoing **heavy development** with frequent, breaking API changes.
|
**Status**: The library is currently undergoing **heavy development** with frequent, breaking API changes.
|
||||||
|
|
||||||
> ☝️ **Important Note**: Current major version is zero (`v0.x.x`) to accomodate rapid development and fast iteration while getting early feedback from users (_feedback on APIs are appreciated!_). The public API could change without a major version update before `v1.0.0` release.
|
> ☝️ **Important Note**: Current major version is zero (`v0.x.x`) to accommodate rapid development and fast iteration while getting early feedback from users (_feedback on APIs are appreciated!_). The public API could change without a major version update before `v1.0.0` release.
|
||||||
|
|
||||||
## Quickstart
|
## Quickstart
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ import (
|
|||||||
// Test goes through a few phases.
|
// Test goes through a few phases.
|
||||||
//
|
//
|
||||||
// Phase1: Simulate Server startup; Simulate starting tasks listed in startedWorkers
|
// Phase1: Simulate Server startup; Simulate starting tasks listed in startedWorkers
|
||||||
// Phase2: Simluate finishing tasks listed in finishedTasks
|
// Phase2: Simulate finishing tasks listed in finishedTasks
|
||||||
// Phase3: Simulate Server shutdown;
|
// Phase3: Simulate Server shutdown;
|
||||||
func TestHeartbeater(t *testing.T) {
|
func TestHeartbeater(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
@ -41,7 +41,7 @@ func TestHeartbeater(t *testing.T) {
|
|||||||
t5 := h.NewTaskMessageWithQueue("task5", nil, "custom")
|
t5 := h.NewTaskMessageWithQueue("task5", nil, "custom")
|
||||||
t6 := h.NewTaskMessageWithQueue("task6", nil, "default")
|
t6 := h.NewTaskMessageWithQueue("task6", nil, "default")
|
||||||
|
|
||||||
// Note: intentionally set to time less than now.Add(rdb.LeaseDuration) to test lease extention is working.
|
// Note: intentionally set to time less than now.Add(rdb.LeaseDuration) to test lease extension is working.
|
||||||
lease1 := h.NewLeaseWithClock(now.Add(10*time.Second), clock)
|
lease1 := h.NewLeaseWithClock(now.Add(10*time.Second), clock)
|
||||||
lease2 := h.NewLeaseWithClock(now.Add(10*time.Second), clock)
|
lease2 := h.NewLeaseWithClock(now.Add(10*time.Second), clock)
|
||||||
lease3 := h.NewLeaseWithClock(now.Add(10*time.Second), clock)
|
lease3 := h.NewLeaseWithClock(now.Add(10*time.Second), clock)
|
||||||
|
@ -67,7 +67,7 @@ func (r *RDB) runScript(ctx context.Context, op errors.Op, script *redis.Script,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs the given script with keys and args and retuns the script's return value as int64.
|
// Runs the given script with keys and args and returns the script's return value as int64.
|
||||||
func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *redis.Script, keys []string, args ...interface{}) (int64, error) {
|
func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *redis.Script, keys []string, args ...interface{}) (int64, error) {
|
||||||
res, err := script.Run(ctx, r.client, keys, args...).Result()
|
res, err := script.Run(ctx, r.client, keys, args...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -368,7 +368,7 @@ func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error {
|
|||||||
//
|
//
|
||||||
// ARGV[1] -> task ID
|
// ARGV[1] -> task ID
|
||||||
// ARGV[2] -> stats expiration timestamp
|
// ARGV[2] -> stats expiration timestamp
|
||||||
// ARGV[3] -> task exipration time in unix time
|
// ARGV[3] -> task expiration time in unix time
|
||||||
// ARGV[4] -> task message data
|
// ARGV[4] -> task message data
|
||||||
// ARGV[5] -> max int64 value
|
// ARGV[5] -> max int64 value
|
||||||
var markAsCompleteCmd = redis.NewScript(`
|
var markAsCompleteCmd = redis.NewScript(`
|
||||||
@ -405,7 +405,7 @@ return redis.status_reply("OK")
|
|||||||
//
|
//
|
||||||
// ARGV[1] -> task ID
|
// ARGV[1] -> task ID
|
||||||
// ARGV[2] -> stats expiration timestamp
|
// ARGV[2] -> stats expiration timestamp
|
||||||
// ARGV[3] -> task exipration time in unix time
|
// ARGV[3] -> task expiration time in unix time
|
||||||
// ARGV[4] -> task message data
|
// ARGV[4] -> task message data
|
||||||
// ARGV[5] -> max int64 value
|
// ARGV[5] -> max int64 value
|
||||||
var markAsCompleteUniqueCmd = redis.NewScript(`
|
var markAsCompleteUniqueCmd = redis.NewScript(`
|
||||||
@ -1086,7 +1086,7 @@ const aggregationTimeout = 2 * time.Minute
|
|||||||
// The time for gracePeriod and maxDelay is computed relative to the time t.
|
// The time for gracePeriod and maxDelay is computed relative to the time t.
|
||||||
//
|
//
|
||||||
// Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words,
|
// Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words,
|
||||||
// the function only checks the most recently added task aganist the given gracePeriod.
|
// the function only checks the most recently added task against the given gracePeriod.
|
||||||
func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error) {
|
func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error) {
|
||||||
var op errors.Op = "RDB.AggregationCheck"
|
var op errors.Op = "RDB.AggregationCheck"
|
||||||
aggregationSetID := uuid.NewString()
|
aggregationSetID := uuid.NewString()
|
||||||
|
@ -252,7 +252,7 @@ func SeedLease(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname s
|
|||||||
seedRedisZSet(tb, r, base.LeaseKey(qname), entries, base.TaskStateActive)
|
seedRedisZSet(tb, r, base.LeaseKey(qname), entries, base.TaskStateActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeedCompletedQueue initializes the completed set witht the given entries.
|
// SeedCompletedQueue initializes the completed set with the given entries.
|
||||||
func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r.SAdd(context.Background(), base.AllQueues, qname)
|
r.SAdd(context.Background(), base.AllQueues, qname)
|
||||||
|
@ -87,7 +87,7 @@ func (r *recoverer) recover() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *recoverer) recoverLeaseExpiredTasks() {
|
func (r *recoverer) recoverLeaseExpiredTasks() {
|
||||||
// Get all tasks which have expired 30 seconds ago or earlier to accomodate certain amount of clock skew.
|
// Get all tasks which have expired 30 seconds ago or earlier to accommodate certain amount of clock skew.
|
||||||
cutoff := time.Now().Add(-30 * time.Second)
|
cutoff := time.Now().Add(-30 * time.Second)
|
||||||
msgs, err := r.broker.ListLeaseExpired(cutoff, r.queues...)
|
msgs, err := r.broker.ListLeaseExpired(cutoff, r.queues...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -446,7 +446,7 @@ func nextTaskState(current asynq.TaskState) asynq.TaskState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
panic("unkown task state")
|
panic("unknown task state")
|
||||||
}
|
}
|
||||||
|
|
||||||
func prevTaskState(current asynq.TaskState) asynq.TaskState {
|
func prevTaskState(current asynq.TaskState) asynq.TaskState {
|
||||||
@ -459,7 +459,7 @@ func prevTaskState(current asynq.TaskState) asynq.TaskState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
panic("unkown task state")
|
panic("unknown task state")
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTaskCount(queue *asynq.QueueInfo, taskState asynq.TaskState) int {
|
func getTaskCount(queue *asynq.QueueInfo, taskState asynq.TaskState) int {
|
||||||
|
Loading…
Reference in New Issue
Block a user