Skip to content

Commit

Permalink
Merge pull request #5 from ghais:weights
Browse files Browse the repository at this point in the history
Ability to assign weights to queues.
  • Loading branch information
benmanns committed Sep 25, 2013
2 parents cbcee26 + 244ad15 commit fbf97bb
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ end

There are several flags which control the operation of the goworker client.

* `-queues="comma,delimited,queues"` — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's `*` queue).
* `-queues="comma,delimited,queues"` — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's `*` queue). If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: `-queues='high=2,low=1'`.
* `-interval=5.0` — Specifies the wait period between polling if no job was in the queue the last time one was requested.
* `-concurrency=25` — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources.
* `-connections=2` — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on `maxclients`.
Expand Down
10 changes: 9 additions & 1 deletion flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
// cause the goworker process to fail the jobs.
// Because of this, there is no default queue,
// nor is there a way to select all queues (à la
// Resque's * queue).
// Resque's * queue). Queues are processed in
// the order they are specififed.
// If you have multiple queues you can assign
// them weights. A queue with a weight of 2 will
// be checked twice as often as a queue with a
// weight of 1: -queues='high=2,low=1'.
//
// -interval=5.0
// — Specifies the wait period between polling if
Expand Down Expand Up @@ -75,6 +80,7 @@ package goworker
import (
"flag"
"os"
"strings"
)

var (
Expand All @@ -87,6 +93,7 @@ var (
uri string
namespace string
exitOnComplete bool
isStrict bool
)

func init() {
Expand Down Expand Up @@ -125,5 +132,6 @@ func flags() error {
if err := interval.SetFloat(intervalFloat); err != nil {
return err
}
isStrict = strings.IndexRune(queuesString, '=') == -1
return nil
}
2 changes: 1 addition & 1 deletion goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Work() error {
pool := newRedisPool(uri, connections, connections, time.Minute)
defer pool.Close()

poller, err := newPoller(queues)
poller, err := newPoller(queues, isStrict)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ import (

type poller struct {
process
isStrict bool
}

func newPoller(queues []string) (*poller, error) {
func newPoller(queues []string, isStrict bool) (*poller, error) {
process, err := newProcess("poller", queues)
if err != nil {
return nil, err
}
return &poller{
process: *process,
process: *process,
isStrict: isStrict,
}, nil
}

func (p *poller) getJob(conn *redisConn) (*job, error) {
for _, queue := range p.Queues {
for _, queue := range p.queues(p.isStrict) {
logger.Debugf("Checking %s", queue)

reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", namespace, queue))
Expand Down
15 changes: 15 additions & 0 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goworker

import (
"fmt"
"math/rand"
"os"
"strings"
"time"
Expand Down Expand Up @@ -73,3 +74,17 @@ func (p *process) fail(conn *redisConn) error {

return nil
}

func (p *process) queues(strict bool) []string {
// If the queues order is strict then just return them.
if strict {
return p.Queues
}

// If not then we want to to shuffle the queues before returning them.
queues := make([]string, len(p.Queues))
for i, v := range rand.Perm(len(p.Queues)) {
queues[i] = p.Queues[v]
}
return queues
}
41 changes: 37 additions & 4 deletions queues_flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,57 @@ package goworker
import (
"errors"
"fmt"
"strconv"
"strings"
)

var (
errorEmptyQueues = errors.New("You must specify at least one queue.")
errorEmptyQueues = errors.New("You must specify at least one queue.")
errorNonNumericWeight = errors.New("The weight must be a numeric value.")
)

type queuesFlag []string

func (q *queuesFlag) Set(value string) error {
if value == "" {
for _, queueAndWeight := range strings.Split(value, ",") {
if queueAndWeight == "" {
continue
}

queue, weight, err := parseQueueAndWeight(queueAndWeight)
if err != nil {
return err
}

for i := 0; i < weight; i++ {
*q = append(*q, queue)
}
}
if len(*q) == 0 {
return errorEmptyQueues
}

*q = append(*q, strings.Split(value, ",")...)
return nil
}

func (q *queuesFlag) String() string {
return fmt.Sprint(*q)
}

func parseQueueAndWeight(queueAndWeight string) (queue string, weight int, err error) {
parts := strings.SplitN(queueAndWeight, "=", 2)
queue = parts[0]

if queue == "" {
return
}

if len(parts) == 1 {
weight = 1
} else {
weight, err = strconv.Atoi(parts[1])
if err != nil {
err = errorNonNumericWeight
}
}
return
}
56 changes: 54 additions & 2 deletions queues_flag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,56 @@ var queuesFlagSetTests = []struct {
queuesFlag([]string{"high", "low"}),
nil,
},
{
"high=2,low=1",
queuesFlag([]string{"high", "high", "low"}),
nil,
},
{
"high=2,low",
queuesFlag([]string{"high", "high", "low"}),
nil,
},
{
"low=1,high=2",
queuesFlag([]string{"low", "high", "high"}),
nil,
},
{
"low=,high=2",
nil,
errors.New("The weight must be a numeric value."),
},
{
"low=a,high=2",
nil,
errors.New("The weight must be a numeric value."),
},
{
"low=",
nil,
errors.New("The weight must be a numeric value."),
},
{
"low=a",
nil,
errors.New("The weight must be a numeric value."),
},
{
"high=2,,,=1",
queuesFlag([]string{"high", "high"}),
nil,
},
{
",,,",
nil,
errors.New("You must specify at least one queue."),
},
{
"=1",
nil,
errors.New("You must specify at least one queue."),
},
}

func TestQueuesFlagSet(t *testing.T) {
Expand All @@ -35,8 +85,10 @@ func TestQueuesFlagSet(t *testing.T) {
if fmt.Sprint(actual) != fmt.Sprint(tt.expected) {
t.Errorf("QueuesFlag: set to %s expected %v, actual %v", tt.v, tt.expected, actual)
}
if tt.err != nil && err.Error() != tt.err.Error() {
t.Errorf("QueuesFlag: set to %s expected err %v, actual err %v", tt.v, tt.expected, actual)
if (err != nil && tt.err == nil) ||
(err == nil && tt.err != nil) ||
(err != nil && tt.err != nil && err.Error() != tt.err.Error()) {
t.Errorf("QueuesFlag: set to %s expected err %v, actual err %v", tt.v, tt.err, err)
}
}
}
Expand Down

0 comments on commit fbf97bb

Please sign in to comment.