Skip to content

Commit

Permalink
Exit on critical errors to allow process to restart.
Browse files Browse the repository at this point in the history
  • Loading branch information
benmanns committed Mar 16, 2015
1 parent 0ea6ffb commit a61600b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
9 changes: 8 additions & 1 deletion poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) <-chan *job {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in poller %s", p)
close(jobs)
return jobs
} else {
p.open(conn)
p.start(conn)
Expand All @@ -70,6 +72,7 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) <-chan *job {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in poller %s", p)
return
} else {
p.finish(conn)
p.close(conn)
Expand All @@ -85,11 +88,14 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) <-chan *job {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in poller %s", p)
return
}

job, err := p.getJob(conn)
if err != nil {
logger.Errorf("Error on %v getting job from %v: %v", p, p.Queues, err)
logger.Criticalf("Error on %v getting job from %v: %v", p, p.Queues, err)
PutConn(conn)
return
}
if job != nil {
conn.Send("INCR", fmt.Sprintf("%sstat:processed:%v", namespace, p))
Expand All @@ -106,6 +112,7 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) <-chan *job {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in poller %s", p)
return
}

conn.Send("LPUSH", fmt.Sprintf("%squeue:%s", namespace, job.Queue), buf)
Expand Down
9 changes: 7 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (w *worker) work(jobs <-chan *job, monitor *sync.WaitGroup) {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.open(conn)
PutConn(conn)
Expand All @@ -91,15 +92,16 @@ func (w *worker) work(jobs <-chan *job, monitor *sync.WaitGroup) {

go func() {
defer func() {
defer monitor.Done()

conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.close(conn)
PutConn(conn)
}

monitor.Done()
}()
for job := range jobs {
if workerFunc, ok := workers[job.Payload.Class]; ok {
Expand All @@ -113,6 +115,7 @@ func (w *worker) work(jobs <-chan *job, monitor *sync.WaitGroup) {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.finish(conn, job, errors.New(errorLog))
PutConn(conn)
Expand All @@ -128,6 +131,7 @@ func (w *worker) run(job *job, workerFunc workerFunc) {
conn, errCon := GetConn()
if errCon != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.finish(conn, job, err)
PutConn(conn)
Expand All @@ -142,6 +146,7 @@ func (w *worker) run(job *job, workerFunc workerFunc) {
conn, err := GetConn()
if err != nil {
logger.Criticalf("Error on getting connection in worker %v", w)
return
} else {
w.start(conn, job)
PutConn(conn)
Expand Down

0 comments on commit a61600b

Please sign in to comment.