Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async client support #26

Closed
lmmx opened this issue Jul 30, 2021 · 12 comments
Closed

Async client support #26

lmmx opened this issue Jul 30, 2021 · 12 comments
Labels
enhancement New feature or request

Comments

@lmmx
Copy link
Owner

lmmx commented Jul 30, 2021

This library doesn't support usage of async clients

import httpx
from range_streams import RangeStream, _EXAMPLE_URL

c = httpx.AsyncClient()
s = RangeStream(url=_EXAMPLE_URL, client=c)

Traceback (most recent call last):
  File "/home/louis/dev/testing/rangestreams/async/async_rng.py", line 5, in <module>
    s = RangeStream(url=_EXAMPLE_URL, client=c)
  File "/home/louis/dev/range-streams/src/range_streams/stream.py", line 127, in __init__
    self.add(byte_range=byte_range)
  File "/home/louis/dev/range-streams/src/range_streams/stream.py", line 515, in add
    self.send_head_request()
  File "/home/louis/dev/range-streams/src/range_streams/stream.py", line 439, in send_head_request
    resp.raise_for_status()
AttributeError: 'coroutine' object has no attribute 'raise_for_status'
>>> 
sys:1: RuntimeWarning: coroutine 'AsyncClient.send' was never awaited
@lmmx lmmx added the enhancement New feature or request label Jul 30, 2021
@lmmx
Copy link
Owner Author

lmmx commented Jul 30, 2021

To resolve this initialisation bug, either:

  • Rewrite the head request routine as async-friendly
  • Add a new attribute sync_client which defaults to the client if the client isinstance(httpx.Client) else makes a httpx.Client

I like the 2nd option


  • Implemented

@lmmx
Copy link
Owner Author

lmmx commented Jul 30, 2021

import httpx
from range_streams import RangeStream, _EXAMPLE_URL

c = httpx.AsyncClient()
s = RangeStream(url=_EXAMPLE_URL, client=c)
s.add((1,3))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/louis/dev/range-streams/src/range_streams/stream.py", line 550, in add
    req = self.send_request(byte_range)
  File "/home/louis/dev/range-streams/src/range_streams/stream.py", line 454, in send_request
    return RangeRequest(
  File "/home/louis/dev/range-streams/src/range_streams/request.py", line 33, in __init__
    self.setup_stream()
  File "/home/louis/dev/range-streams/src/range_streams/request.py", line 50, in setup_stream
    self.raise_for_non_partial_content()
  File "/home/louis/dev/range-streams/src/range_streams/request.py", line 57, in raise_for_non_partial_content
    if self.response.status_code != 206:
AttributeError: 'coroutine' object has no attribute 'status_code'

lmmx added a commit that referenced this issue Jul 30, 2021
@lmmx
Copy link
Owner Author

lmmx commented Jul 30, 2021

Made an attempt and undone it, leaving only the initial outline - needs to be clarified where would I want to await, and will the class itself need to be made afresh to handle async client?

@lmmx
Copy link
Owner Author

lmmx commented Aug 2, 2021

This should be made much easier by the introduction of a single stream 'mode' (see #29), as there'll be only one call to await (?)

@lmmx
Copy link
Owner Author

lmmx commented Aug 10, 2021

I think it may be desirable to leave the async requesting to the [package-external] fetch function, and just allow the creation (or modification) of a PngStream from the successfully obtained async response (in fact, are responses async?)

It's important to get the async abstraction at the right level, which is in the loop creating RangeStream objects (not within the creation of a single RangeStream). The point being that creating multiple monostreams in series will be slower than creating multiple monostreams parallelised asynchronously (as the GIL is not blocked between awaiting the response from one sent async request and sending the next)

@lmmx
Copy link
Owner Author

lmmx commented Aug 10, 2021

A force_async flag is a good choice, and retaining the single class (as for HTTPX's Response class) is too.

All httpx.Response objects have both iter_bytes and aiter_bytes methods

>>> import httpx
>>> from range_streams import _EXAMPLE_URL
>>> import asyncio
>>> c = httpx.AsyncClient()
>>> req = c.build_request("GET", _EXAMPLE_URL)
>>> req_send_coro = c.send(request=req, stream=True)
>>> resp = asyncio.run(req_send_coro)
>>> stream = resp.aiter_bytes()
>>> async def gimme(stream):
...     async for b in stream:
...             return b
...
>>> respval = asyncio.run(gimme(stream))
>>> respval
<Response [200 OK]>
>>> bytesval = asyncio.run(gimme(stream))
>>> bytesval
b'P\x00\x01\x02\x03\x04\x05\x06\x07\x08K'

where the async function gimme gives the first chunk, which for the example file at range_streams._EXAMPLE_URL is the entirety of the file's bytes

incorrect use (sync iterator with async generator/async iterator with sync generator) throws an error, e.g.

>>> sync_c = httpx.Client()
>>> sync_req = sync_c.build_request("GET", _EXAMPLE_URL)
>>> sync_resp = sync_c.send(sync_req, stream=True) # not a coroutine
>>> asyncio.run(sync_resp) # using as if it were a coroutine
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/louis/miniconda3/envs/rangestreams/lib/python3.9/asyncio/runners.py", line 37, in run
    raise ValueError("a coroutine was expected, got {!r}".format(main))
ValueError: a coroutine was expected, got <Response [200 OK]>

Note: python -m asyncio lets you use await outside of an async funcdef (rather than asyncio.run)

@lmmx
Copy link
Owner Author

lmmx commented Aug 10, 2021

The following methods will need to involve an async check:

  • __init__ (check but can't do anything async)
  • set_client
  • register_range
  • handle_overlap
  • seek -> aseek
  • read -> aread
  • send_request
  • get_monostream
  • send_head_request (but see note below on sync_client)
  • add -> add_async
  • close -> aclose

Additionally, the sync_client property will currently create a new synchronous Client every time it's called if the stream's client is async. Instead it should set an internal _sync_client for use only through this method. (The same doesn't need to be done for async clients. You could also pass one through to be shared across a session but oof that seems messy). Down the line the sync client should be taken out and whatever it was doing should be handled via the async client

@lmmx
Copy link
Owner Author

lmmx commented Aug 11, 2021

add_async now an async alternative to add: f4419e7

async def add_async(
self,
byte_range: Range | tuple[int, int] = Range("[0, 0)"),
activate: bool = True,
name: str = "",
) -> None:
byte_range = validate_range(byte_range=byte_range, allow_empty=True)
if not self.single_request:
raise NotImplementedError(
"Async RangeStreams are only available in single request mode (for now)"
)
# Do not request an empty range if total length already checked (at init)
if not self._length_checked and byte_range.isempty():
await self.get_async_monostream()
elif not byte_range.isempty():
self.add_window(byte_range=byte_range, activate=activate, name=name)
def add_window(
self,
byte_range: Range | tuple[int, int] = Range("[0, 0)"),
activate: bool = True,
name: str = "",
) -> None:
"""
Register a window onto the original range in the ``_ranges`` RangeDict rather
than add a new range entry to the dict (which would A) clash with the single
entire range B) require another request
Args:
byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range
of positions on the file to be read from the request on
the stream.
activate : (:class:`bool`) Whether to make this newly added
:class:`~ranges.Range` the active range on the stream upon
creating it.
name : (:class:`str`) A name (default: ``''``) to give to the range.
"""
req = self.simulate_request(byte_range=byte_range)
resp = RangeResponse(stream=self, range_request=req, range_name=name)
self.register_range(
rng=byte_range,
value=resp,
activate=activate,
use_windows=True,
)

I didn't like any of the options given here to make the __init__ method of a class async, so I'm happy to have this be the user's responsibility to call after initialisation, and documented as such. (More docs probably needed)

@lmmx
Copy link
Owner Author

lmmx commented Aug 11, 2021

Still to do:

  • Bring the AsyncFetcher and SignalHaltError from this gist into the library as a helpful wrapper(?) The whole point of this library is essentially so the user doesn't need to consider the lower level primitives, right?
    • To allow specification of a custom callback, separate the process_stream function into a method that can be called (like super().__init__() effectively, but for the callback rather than a class) to update the progress bar (which happens regardless of what the outcome in the stream is).
    • It could actually be a classmethod of RangeStream (hence any inheriting codecs) but rather than a constructor it could loop over an input URL list (very practical!)
  • tests (can reuse the example made while developing it in aforementioned gist)
  • docs (especially the aforementioned need to call add_async after initialisation, and the single request mode limit), probably in the module docstring and package docstring

@lmmx
Copy link
Owner Author

lmmx commented Aug 11, 2021

Implemented: 68604a7

@lmmx
Copy link
Owner Author

lmmx commented Aug 12, 2021

I wasn't using the if whence == SEEK_END block in this:

def seek(self, position, whence=SEEK_SET):
"""
File-like seeking within the range request stream.
"""
self.buf_keep()
if whence == SEEK_END:
self._load_all()
if self.is_windowed:
position = position + self.window_offset
self._bytes.seek(position, whence)
self.store_tell()

in the RangeResponse.seek method, but rather than remove, I'm going to prevent its use on an async stream to keep the seek method synchronous (else it would propagate to prepare_reading_window in _prepare_to_read

@lmmx
Copy link
Owner Author

lmmx commented Aug 12, 2021

Closing after #36 completed; remaining to be done in #37

@lmmx lmmx closed this as completed Aug 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant