Skip to content

Latest commit

 

History

History
111 lines (79 loc) · 2.9 KB

NetworkSafeConnection.mkd

File metadata and controls

111 lines (79 loc) · 2.9 KB

NetworkSafeConnection

It provides non-interrupted execution of calls to servers, when servers does down or become unreachable over network. Call is simply blocked until server can be reached again. This class remembers what tubes it was using or watching and restores it after reconnect.

Handling of network errors

>>> import os, signal, threading, time
>>> import beanstalkc

To get things going, we'll need to have a beanstalkd running:

>>> import tempfile
>>> tmp_dir = tempfile.mkdtemp(prefix='beanstalkd_test')

>>> def launch_beanstalkd(tmp_dir):
...     pid = os.spawnlp(os.P_NOWAIT,
...                      'beanstalkd',
...                      'beanstalkd', '-l', '127.0.0.1', '-p', '14711', '-b', tmp_dir)
...     time.sleep(0.2) # Give beanstalkd some time to start up.
...     return pid

>>> def kill_beanstalkd(pid):
...     os.kill(pid, signal.SIGTERM)
...     time.sleep(0.2) # Also give beanstalkd some time to terminate.

>>> pid = launch_beanstalkd(tmp_dir)

Let's connect to this beanstalkd:

>>> b = beanstalkc.NetworkSafeConnection(port=14711)

Let's use and watch some non-default tube:

>>> b.use('test')
'test'
>>> b.using()
'test'
>>> b.watch('test')
2
>>> b.watching()
['default', 'test']

Put some data into 'test':

>>> b.put('first')
1
>>> job = b.reserve()
>>> job.body
'first'

Now, if beanstalkd goes away, the NetworkSafeConnection will be not affected:

>>> kill_beanstalkd(pid)

Instead, it will happily continue to work, once beanstalkd comes back up:

>>> pid = launch_beanstalkd(tmp_dir)

All data is there since we launch benstalkd with persistent jobs mode (-b parameter):

>>> from pprint import pprint
>>> pprint(b.stats())                           # doctest: +ELLIPSIS
{...
 'current-connections': 1,
 'current-jobs-buried': 0,
 'current-jobs-delayed': 0,
 'current-jobs-ready': 1,
 'current-jobs-reserved': 0,
 'current-jobs-urgent': 0,
 ...}

But we should be still using and watching the same tubes as before:

>>> b.using()
'test'
>>> b.watching()
['default', 'test']

You can see that the job we have reserved before is not reserved anymore, but we can delete it anyway.

>>> pprint(job.stats())                         # doctest: +ELLIPSIS
{'age': ...
 ...
 'state': 'ready',
 ...
 'tube': 'test'}

Let's try to delete the job we have reserved before server died.

>>> job.delete()
>>> job.stats()                                 # doctest: +ELLIPSIS
Traceback (most recent call last):
...
CommandFailed: ('stats-job', 'NOT_FOUND', [])

We don't have anything left in jobs tube

>>> b.reserve(timeout=0) is None
True

Clean beanstalkd and remove temp dir:

>>> kill_beanstalkd(pid)
>>> import shutil
>>> shutil.rmtree(tmp_dir)