-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqueues.py
409 lines (311 loc) · 13 KB
/
queues.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
'''Queues'''
# Queues are peculiar data structures because, like sets, their functionality
# can be handled entirely using lists. However, while lists are extremely
# versatile, they are not always the most efficient data structure for container
# operations. If a program is using a small dataset (hundreds or thousands of
# elements), then lists will probably be ok. However, if you need to scale your
# data into the millions, you may need a more efficient container. Python
# provides three types of queue data structures in its queue module. All
# utilize the same API, but differ in both behavior and structure.
# FIFO queues (first in, first out)
# -----------------------------------------------------------------------------
# These are typically used as a sort of communication medium when one or more
# objects is producing data and one or more other objects is consuming the
# data in some way, probably at a different rate. Think of a messaging
# application that is receiving messages from the network, but can only
# display one message at a time to the user. The other messages can be
# buffered in a queue in the order they are received.
# The Queue class is a good choice when you don't need to access any data
# inside the data structure except the next object to be consumed. They have a
# very simple API. The primary methods are put() and get(). Both accept
# optional arguments to govern what happens if the operation can't successfully
# complete because the queue is either empty (can't get) or full (can't put).
# The default behavior is to block or idly wait until the Queue object has
# data or room available. You can have it raise exceptions instead by passing
# the block=False parameter or have it wait a defined amount of time before
# raising an exception by passing a timeout parameter.
# The Queue class also has methods to check whether the Queue is full() or
# empty() and a few more methods to deal with concurrent access.
from queue import Queue
q = Queue(maxsize=5)
q.put('one')
q.put('two')
q.put('three')
q.put('four')
q.put('five')
# q.put('six', timeout=5) # raise Full, queue.Full
print('full: ', q.full())
# full: True
q.get()
q.get()
q.get()
q.get()
last = q.get()
# q.get(block=False) # raise Empty, queue.Empty
print('last: ', last)
# last: five
print('empty: ', q.empty())
# empty: True
# LIFO queues (last in, first out)
# -----------------------------------------------------------------------------
# These queues are more frequently called stacks (think of a stack of papers
# where you can only access the top-most paper). Traditionally, operations on
# stacks are named push and pop, but since python uses the same API as for
# FIFO queues, we use put() and get() still, but they'll both operate off the
# top of the stack.
from queue import LifoQueue
stack = LifoQueue(maxsize=5)
stack.put('one')
stack.put('two')
stack.put('three')
stack.put('four')
stack.put('five')
# stack.put('six', timeout=5) # raise Full, queue.Full
print('full: ', stack.full())
# full: True
stack.get()
stack.get()
stack.get()
stack.get()
last = stack.get()
# q.get(block=False) # raise Empty, queue.Empty
print('last: ', last)
# last: one
print('empty: ', stack.empty())
# empty: True
# SimpleQueue
# -----------------------------------------------------------------------------
# New to python 3.7 are SimpleQueues which are unbounded FIFO queues.
# Simple queues lack advanced functionality such as task tracking.
# https://docs.python.org/3/library/queue.html#simplequeue-objects
# deques (double-ended queue)
# -----------------------------------------------------------------------------
# Deques (from collections.deque) are a more advanced, extended versions of a
# queue that supports adding and removing items from both ends.
from collections import deque
d = deque('abcdefg')
print(d)
# deque(['a', 'b', 'c', 'd', 'e', 'f', 'g'])
print('length: ', len(d))
# length: 7
print('left end: ', d[0])
# left end: a
print('right end: ', d[-1])
# right end: g
print('pop left: ', d.popleft())
# pop left: a
print('pop right: ', d.pop())
# pop right: g
print('pop left: ', d.popleft())
# pop left: b
print('pop right: ', d.pop())
# pop right: f
print('removed: ', d.remove('c'))
# removed: None
d.append('right')
d.appendleft('left')
print('appended: ', d)
# appended: deque(['left', 'd', 'e', 'right'])
d.extend('456')
d.extendleft('321')
print('extended :', d)
# extended : deque(['1', '2', '3', 'left', 'd', 'e', 'right', '4', '5', '6'])
def palindrome(word):
d = deque(word)
while len(d) > 1:
if d.popleft() != d.pop():
return False
return True
palindrome('racecar') # returns True
palindrome('radar') # returns True
palindrome('maam') # returns True
palindrome('what') # returns False
# The example above is just an example. If you wanted to actually check for
# palindromes you could also do:
def palindrome_better(word):
return word == word[::-1]
# Queues and threads
# -----------------------------------------------------------------------------
# Queues are 'thread-safe' so, for example, you can have a deque be consumed
# from both ends at the same time from separate threads. woah.
import threading
import time
candle = deque(range(5))
def burn(end, source):
while True:
try:
next = source()
except IndexError:
break
else:
print(f'{end:>6}: {next}')
time.sleep(0.1)
print(f'{end:>6}: done.')
return
left = threading.Thread(target=burn, args=('Left', candle.popleft))
right = threading.Thread(target=burn, args=('Right', candle.pop))
left.start()
right.start()
left.join()
right.join()
# Left: 0
# Right: 4
# Left: 1
# Right: 3
# Left: 2
# Right: done.
# Left: done.
# Priority Queues
# -----------------------------------------------------------------------------
# The priority queue enforces a very different style of ordering from the
# previous queue implementations. Once again, they follow the exact same get()
# and put() API, but instead of relying on the order that items arrive to
# determine when they should be returned, the most "important" item is
# returned. By convention, the most important, or highest priority item is the
# one that sorts lowest using the less than operator.
# A common convention is to store tuples, where the first element in the tuple
# is the priority, and the second is the data. Another common method is to
# implement the __lt__ special method. Note that it's perfectly acceptable to
# have multiple elements with the same priority but there will be no
# guarantees on which will be returned first.
from queue import PriorityQueue
heap = PriorityQueue()
heap.put((2, 'medium-level task'))
heap.put((1, 'important task 1'))
heap.put((3, 'low-level task'))
heap.put((1, 'important task 2'))
while not heap.empty():
print(heap.get())
# (1, 'important task 1')
# (1, 'important task 2')
# (2, 'medium-level task')
# (3, 'low-level task')
print(heap.qsize())
# 0
# Task Queues
# -----------------------------------------------------------------------------
# Task queues provide a convenient solution for an application to request the
# execution of a task by a worker process. Worker processes run independently
# of the application and can even be located on a different system. The
# communication between the application and the workers is done through a
# message queue. The application submits a job, and then monitors its progress
# by interacting with the queue.
# The most popular task queue for Python is Celery. This is a fairly
# sophisticated package that has many options and supports several message
# queues.
# http://www.celeryproject.org/
# https://blog.miguelgrinberg.com/post/using-celery-with-flask
# Another popular Python task queue is Redis Queue or just RQ, which sacrifices
# some flexibility, such as only supporting a Redis message queue, but in
# exchange it is much simpler to set up than Celery.
# http://python-rq.org/
# RQ (task queues)
# ----------------------------------------------------------------------------
# The communication between an application and RQ workers is going to be
# carried out in a Redis message queue, so you need to have a Redis server
# running. See also: noSQL_datastores.py, networks.py, concurrency.py
# $ pip install rq
# $ redis-server
# Example task (demo/tasks.py):
import time
def example(seconds):
print('Starting task...')
for i in range(seconds):
print(i)
time.sleep(1)
print('Task completed.')
# Run an RQ worker:
# $ rq worker test
# The worker process is now connected to Redis, and watching for any jobs that
# may be assigned to it on a queue named 'test'. In cases where you
# want multiple workers to have more throughput, all you need to do is run
# more instances of rq worker, all connected to the same queue. Then when a
# job shows up in the queue, any of the available worker processes will pick
# it up. In a production environment you will probably want to have at least
# as many workers as available CPUs.
# Execute the tasks:
# In another terminal window, start a python shell session.
# >>> from redis import Redis
# >>> import rq
# >>> q = rq.Queue('test', connection=Redis.from_url('redis://localhost:6379'))
# OR
# >>> q = rq.Queue('test', connection=Redis())
# >>> job = q.enqueue('app.tasks.example', 23)
# >>> job.get_id()
# '37469594-8f5d-41f1-91e5-ec996df18243'
# >>> job.is_finished
# True
# Back in the first terminal window, where the worker is listening, you should
# see it run example function above and then wait.
# The Queue class from RQ represents the task queue as seen from the
# application side. The arguments it takes are the queue name, and a Redis
# connection object, which in this case we initialize with a default URL.
# If you have your Redis server running on a different host or port number,
# you'll need to use a different URL.
# The enqueue() method on the queue is used to add a job to the queue. The
# first argument is the name of the task you want to execute, given directly
# as a function object, or as an import string. I find the string option much
# more convenient, as that makes it unnecessary to import the function on the
# application's side. Any remaining arguments given to enqueue() are going to
# be passed to the function running in the worker.
# The job.get_id() method can be used to obtain the unique identifier assigned
# to the task. The job.is_finished expression will report False until its
# done, then True. Test it!
# Once the function completes, the worker goes back to waiting for new jobs,
# so you can repeat the enqueue() call with different arguments if you want to
# experiment more. The data that is stored in the queue regarding a task will
# stay there for some time (500 seconds by default), but eventually will be
# removed. This is important, the task queue does not preserve a history of
# executed jobs.
# Normally, for a long running task, you will want some sort of progress
# information to be made available to the application, which in turn can show
# it to the user. RQ supports this by using the meta attribute of the job
# object. Here's the example tasks expanded out to include this:
# Example task (demos/tasks_with_info.py):
import time
from rq import get_current_job
def example(seconds):
job = get_current_job()
print('Starting task...')
for i in range(seconds):
job.meta['progress'] = 100.0 * i / seconds
job.save_meta()
print(i)
time.sleep(1)
job.meta['progress'] = 100
job.save_meta()
print('Task completed.')
# This new version of example() uses RQ's get_current_job() function to get a
# job instance, which is similar to the one returned to the application when
# it submits the task. The meta attribute of the job object is a dictionary
# where the task can write any custom data that it wants to communicate to the
# application. In this example, We're writing a 'progress' item that represents
# the percentage of completion of the task. Each time the progress is updated
# we call job.save_meta() to instruct RQ to write the data to Redis, where the
# application can find it.
# We can run this new task an monitor its progress like this:
# >>> job = q.enqueue('demos.tasks_with_info.example', 23)
# >>> job.meta
# {}
# >>> job.refresh()
# >>> job.meta
# {'progress': 69.56521739130434}
# >>> job.refresh()
# >>> job.meta
# {'progress': 100}
# >>> job.is_finished
# True
# The refresh() method needs to be invoked for the contents to be updated
# from Redis.
# Redis cleanup
# -----------------------------------------------------------------------------
# To kill your worker: ctrl + c
# If, at any time, the worker receives SIGINT (via Ctrl+C) or SIGTERM (via kill),
# the worker wait until the currently running task is finished, stop the work
# loop and gracefully register its own death.
# To shutdown the redis server: redis-cli shutdown
# Note this command must be run not in the same terminal window as the Redis
# server. Also, this is a mac command. If on linux, you would do:
# /etc/init.d/redis-server restart
# /etc/init.d/redis-server stop
# /etc/init.d/redis-server start