diff --git a/README.md b/README.md index 4da4638..3217322 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ end There are several flags which control the operation of the goworker client. -* `-queues="comma,delimited,queues"` — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's `*` queue). +* `-queues="comma,delimited,queues"` — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's `*` queue). If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: `-queues='high=2,low=1'`. * `-interval=5.0` — Specifies the wait period between polling if no job was in the queue the last time one was requested. * `-concurrency=25` — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources. * `-connections=2` — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on `maxclients`. diff --git a/flags.go b/flags.go index ef13c25..93ed84b 100644 --- a/flags.go +++ b/flags.go @@ -19,7 +19,12 @@ // cause the goworker process to fail the jobs. // Because of this, there is no default queue, // nor is there a way to select all queues (à la -// Resque's * queue). +// Resque's * queue). Queues are processed in +// the order they are specififed. +// If you have multiple queues you can assign +// them weights. A queue with a weight of 2 will +// be checked twice as often as a queue with a +// weight of 1: -queues='high=2,low=1'. // // -interval=5.0 // — Specifies the wait period between polling if @@ -75,6 +80,7 @@ package goworker import ( "flag" "os" + "strings" ) var ( @@ -87,6 +93,7 @@ var ( uri string namespace string exitOnComplete bool + isStrict bool ) func init() { @@ -125,5 +132,6 @@ func flags() error { if err := interval.SetFloat(intervalFloat); err != nil { return err } + isStrict = strings.IndexRune(queuesString, '=') == -1 return nil } diff --git a/goworker.go b/goworker.go index 9d44376..5406856 100644 --- a/goworker.go +++ b/goworker.go @@ -31,7 +31,7 @@ func Work() error { pool := newRedisPool(uri, connections, connections, time.Minute) defer pool.Close() - poller, err := newPoller(queues) + poller, err := newPoller(queues, isStrict) if err != nil { return err } diff --git a/poller.go b/poller.go index 061210d..a18eb8f 100644 --- a/poller.go +++ b/poller.go @@ -9,20 +9,22 @@ import ( type poller struct { process + isStrict bool } -func newPoller(queues []string) (*poller, error) { +func newPoller(queues []string, isStrict bool) (*poller, error) { process, err := newProcess("poller", queues) if err != nil { return nil, err } return &poller{ - process: *process, + process: *process, + isStrict: isStrict, }, nil } func (p *poller) getJob(conn *redisConn) (*job, error) { - for _, queue := range p.Queues { + for _, queue := range p.queues(p.isStrict) { logger.Debugf("Checking %s", queue) reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", namespace, queue)) diff --git a/process.go b/process.go index b024c0e..2c593bf 100644 --- a/process.go +++ b/process.go @@ -2,6 +2,7 @@ package goworker import ( "fmt" + "math/rand" "os" "strings" "time" @@ -73,3 +74,17 @@ func (p *process) fail(conn *redisConn) error { return nil } + +func (p *process) queues(strict bool) []string { + // If the queues order is strict then just return them. + if strict { + return p.Queues + } + + // If not then we want to to shuffle the queues before returning them. + queues := make([]string, len(p.Queues)) + for i, v := range rand.Perm(len(p.Queues)) { + queues[i] = p.Queues[v] + } + return queues +} diff --git a/queues_flag.go b/queues_flag.go index bf2e878..e5afac8 100644 --- a/queues_flag.go +++ b/queues_flag.go @@ -3,24 +3,57 @@ package goworker import ( "errors" "fmt" + "strconv" "strings" ) var ( - errorEmptyQueues = errors.New("You must specify at least one queue.") + errorEmptyQueues = errors.New("You must specify at least one queue.") + errorNonNumericWeight = errors.New("The weight must be a numeric value.") ) type queuesFlag []string func (q *queuesFlag) Set(value string) error { - if value == "" { + for _, queueAndWeight := range strings.Split(value, ",") { + if queueAndWeight == "" { + continue + } + + queue, weight, err := parseQueueAndWeight(queueAndWeight) + if err != nil { + return err + } + + for i := 0; i < weight; i++ { + *q = append(*q, queue) + } + } + if len(*q) == 0 { return errorEmptyQueues } - - *q = append(*q, strings.Split(value, ",")...) return nil } func (q *queuesFlag) String() string { return fmt.Sprint(*q) } + +func parseQueueAndWeight(queueAndWeight string) (queue string, weight int, err error) { + parts := strings.SplitN(queueAndWeight, "=", 2) + queue = parts[0] + + if queue == "" { + return + } + + if len(parts) == 1 { + weight = 1 + } else { + weight, err = strconv.Atoi(parts[1]) + if err != nil { + err = errorNonNumericWeight + } + } + return +} diff --git a/queues_flag_test.go b/queues_flag_test.go index d597dcc..a8108e2 100644 --- a/queues_flag_test.go +++ b/queues_flag_test.go @@ -26,6 +26,56 @@ var queuesFlagSetTests = []struct { queuesFlag([]string{"high", "low"}), nil, }, + { + "high=2,low=1", + queuesFlag([]string{"high", "high", "low"}), + nil, + }, + { + "high=2,low", + queuesFlag([]string{"high", "high", "low"}), + nil, + }, + { + "low=1,high=2", + queuesFlag([]string{"low", "high", "high"}), + nil, + }, + { + "low=,high=2", + nil, + errors.New("The weight must be a numeric value."), + }, + { + "low=a,high=2", + nil, + errors.New("The weight must be a numeric value."), + }, + { + "low=", + nil, + errors.New("The weight must be a numeric value."), + }, + { + "low=a", + nil, + errors.New("The weight must be a numeric value."), + }, + { + "high=2,,,=1", + queuesFlag([]string{"high", "high"}), + nil, + }, + { + ",,,", + nil, + errors.New("You must specify at least one queue."), + }, + { + "=1", + nil, + errors.New("You must specify at least one queue."), + }, } func TestQueuesFlagSet(t *testing.T) { @@ -35,8 +85,10 @@ func TestQueuesFlagSet(t *testing.T) { if fmt.Sprint(actual) != fmt.Sprint(tt.expected) { t.Errorf("QueuesFlag: set to %s expected %v, actual %v", tt.v, tt.expected, actual) } - if tt.err != nil && err.Error() != tt.err.Error() { - t.Errorf("QueuesFlag: set to %s expected err %v, actual err %v", tt.v, tt.expected, actual) + if (err != nil && tt.err == nil) || + (err == nil && tt.err != nil) || + (err != nil && tt.err != nil && err.Error() != tt.err.Error()) { + t.Errorf("QueuesFlag: set to %s expected err %v, actual err %v", tt.v, tt.err, err) } } }