Skip to content

Commit

Permalink
Decode tile off the main thread (#40)
Browse files Browse the repository at this point in the history
* Decode tile off the main thread

* remove builder

* pubcrate visibility
  • Loading branch information
kylebarron authored Feb 28, 2025
1 parent 937ec7b commit ed67b77
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 98 deletions.
2 changes: 2 additions & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pyo3 = { version = "0.23.0", features = ["macros"] }
pyo3-async-runtimes = "0.23"
pyo3-bytes = "0.1.2"
pyo3-object_store = { git = "https://github.com/developmentseed/obstore", rev = "28ba07a621c1c104f084fb47ae7f8d08b1eae3ea" }
rayon = "1.10.0"
tokio-rayon = "2.1.0"
thiserror = "1"

# We opt-in to using rustls as the TLS provider for reqwest, which is the HTTP
Expand Down
4 changes: 4 additions & 0 deletions python/python/async_tiff/_async_tiff.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from ._decoder import Decoder as Decoder
from ._decoder import DecoderRegistry as DecoderRegistry
from ._geo import GeoKeyDirectory as GeoKeyDirectory
from ._ifd import ImageFileDirectory as ImageFileDirectory
from ._thread_pool import ThreadPool as ThreadPool
from ._tiff import TIFF as TIFF
from ._tile import Tile as Tile
5 changes: 3 additions & 2 deletions python/python/async_tiff/_decoder.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ class Decoder(Protocol):
def __call__(buffer: Buffer) -> Buffer: ...

class DecoderRegistry:
def __init__(self) -> None: ...
def add(self, compression: CompressionMethod | int, decoder: Decoder) -> None: ...
def __init__(
self, decoders: dict[CompressionMethod | int, Decoder] | None = None
) -> None: ...
2 changes: 2 additions & 0 deletions python/python/async_tiff/_thread_pool.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ThreadPool:
def __init__(self, num_threads: int) -> None: ...
12 changes: 12 additions & 0 deletions python/python/async_tiff/_tile.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from collections.abc import Buffer

from ._decoder import DecoderRegistry
from ._thread_pool import ThreadPool

class Tile:
async def decode(
self,
*,
decoder_registry: DecoderRegistry | None = None,
pool: ThreadPool | None = None,
) -> Buffer: ...
41 changes: 31 additions & 10 deletions python/src/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,55 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_tiff::decoder::{Decoder, DecoderRegistry};
use async_tiff::error::AiocogeoError;
use async_tiff::tiff::tags::PhotometricInterpretation;
use bytes::Bytes;
use pyo3::exceptions::PyTypeError;
use pyo3::intern;
use pyo3::prelude::*;
use pyo3::sync::GILOnceCell;
use pyo3::types::{PyDict, PyTuple};
use pyo3_bytes::PyBytes;

use crate::enums::PyCompressionMethod;

#[pyclass(name = "DecoderRegistry")]
pub(crate) struct PyDecoderRegistry(DecoderRegistry);
static DEFAULT_DECODER_REGISTRY: GILOnceCell<Arc<DecoderRegistry>> = GILOnceCell::new();

pub fn get_default_decoder_registry(py: Python<'_>) -> Arc<DecoderRegistry> {
let registry =
DEFAULT_DECODER_REGISTRY.get_or_init(py, || Arc::new(DecoderRegistry::default()));
registry.clone()
}

#[pyclass(name = "DecoderRegistry", frozen)]
#[derive(Debug, Default)]
pub(crate) struct PyDecoderRegistry(Arc<DecoderRegistry>);

#[pymethods]
impl PyDecoderRegistry {
#[new]
fn new() -> Self {
Self(DecoderRegistry::default())
#[pyo3(signature = (decoders = None))]
pub(crate) fn new(decoders: Option<HashMap<PyCompressionMethod, PyDecoder>>) -> Self {
let mut decoder_registry = DecoderRegistry::default();
if let Some(decoders) = decoders {
for (compression, decoder) in decoders.into_iter() {
decoder_registry
.as_mut()
.insert(compression.into(), Box::new(decoder));
}
}
Self(Arc::new(decoder_registry))
}

fn add(&mut self, compression: PyCompressionMethod, decoder: PyDecoder) {
self.0
.as_mut()
.insert(compression.into(), Box::new(decoder));
}
impl PyDecoderRegistry {
pub(crate) fn inner(&self) -> &Arc<DecoderRegistry> {
&self.0
}
}

#[derive(Debug)]
struct PyDecoder(PyObject);
pub(crate) struct PyDecoder(PyObject);

impl PyDecoder {
fn call(&self, py: Python, buffer: Bytes) -> PyResult<PyBytes> {
Expand Down
1 change: 1 addition & 0 deletions python/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::{PyString, PyTuple};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct PyCompressionMethod(CompressionMethod);

impl From<CompressionMethod> for PyCompressionMethod {
Expand Down
4 changes: 4 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ mod decoder;
mod enums;
mod geo;
mod ifd;
mod thread_pool;
mod tiff;
mod tile;

use pyo3::prelude::*;

use crate::decoder::PyDecoderRegistry;
use crate::geo::PyGeoKeyDirectory;
use crate::ifd::PyImageFileDirectory;
use crate::thread_pool::PyThreadPool;
use crate::tiff::PyTIFF;

const VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -48,6 +51,7 @@ fn _async_tiff(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<PyDecoderRegistry>()?;
m.add_class::<PyGeoKeyDirectory>()?;
m.add_class::<PyImageFileDirectory>()?;
m.add_class::<PyThreadPool>()?;
m.add_class::<PyTIFF>()?;

pyo3_object_store::register_store_module(py, m, "async_tiff")?;
Expand Down
48 changes: 48 additions & 0 deletions python/src/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::Arc;

use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;

use pyo3::sync::GILOnceCell;
use rayon::{ThreadPool, ThreadPoolBuilder};

static DEFAULT_POOL: GILOnceCell<Arc<ThreadPool>> = GILOnceCell::new();

pub fn get_default_pool(py: Python<'_>) -> PyResult<Arc<ThreadPool>> {
let runtime = DEFAULT_POOL.get_or_try_init(py, || {
let pool = ThreadPoolBuilder::new().build().map_err(|err| {
PyValueError::new_err(format!("Could not create rayon threadpool. {}", err))
})?;
Ok::<_, PyErr>(Arc::new(pool))
})?;
Ok(runtime.clone())
}

#[pyclass(name = "ThreadPool", frozen, module = "async_tiff")]
pub(crate) struct PyThreadPool(Arc<ThreadPool>);

#[pymethods]
impl PyThreadPool {
#[new]
fn new(num_threads: usize) -> PyResult<Self> {
let pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|err| {
PyValueError::new_err(format!("Could not create rayon threadpool. {}", err))
})?;
Ok(Self(Arc::new(pool)))
}
}

impl PyThreadPool {
pub(crate) fn inner(&self) -> &Arc<ThreadPool> {
&self.0
}
}

impl AsRef<ThreadPool> for PyThreadPool {
fn as_ref(&self) -> &ThreadPool {
&self.0
}
}
40 changes: 40 additions & 0 deletions python/src/tile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use async_tiff::Tile;
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::future_into_py;
use pyo3_bytes::PyBytes;
use tokio_rayon::AsyncThreadPool;

use crate::decoder::get_default_decoder_registry;
use crate::thread_pool::{get_default_pool, PyThreadPool};
use crate::PyDecoderRegistry;

#[pyclass(name = "Tile")]
pub(crate) struct PyTile(Option<Tile>);

#[pymethods]
impl PyTile {
#[pyo3(signature = (*, decoder_registry=None, pool=None))]
fn decode_async(
&mut self,
py: Python,
decoder_registry: Option<&PyDecoderRegistry>,
pool: Option<&PyThreadPool>,
) -> PyResult<PyObject> {
let decoder_registry = decoder_registry
.map(|r| r.inner().clone())
.unwrap_or_else(|| get_default_decoder_registry(py));
let pool = pool
.map(|p| Ok(p.inner().clone()))
.unwrap_or_else(|| get_default_pool(py))?;
let tile = self.0.take().unwrap();

let result = future_into_py(py, async move {
let decoded_bytes = pool
.spawn_async(move || tile.decode(&decoder_registry))
.await
.unwrap();
Ok(PyBytes::new(decoded_bytes))
})?;
Ok(result.unbind())
}
}
22 changes: 1 addition & 21 deletions src/cog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub struct COGReader {
#[allow(dead_code)]
cursor: AsyncCursor,
ifds: ImageFileDirectories,
#[allow(dead_code)]
bigtiff: bool,
}

impl COGReader {
Expand Down Expand Up @@ -46,30 +44,12 @@ impl COGReader {

let ifds = ImageFileDirectories::open(&mut cursor, first_ifd_location, bigtiff).await?;

Ok(Self {
cursor,
ifds,
bigtiff,
})
Ok(Self { cursor, ifds })
}

pub fn ifds(&self) -> &ImageFileDirectories {
&self.ifds
}

/// Return the EPSG code representing the crs of the image
pub fn epsg(&self) -> Option<u16> {
let ifd = &self.ifds.as_ref()[0];
ifd.geo_key_directory
.as_ref()
.and_then(|gkd| gkd.epsg_code())
}

/// Return the bounds of the image in native crs
pub fn native_bounds(&self) -> Option<(f64, f64, f64, f64)> {
let ifd = &self.ifds.as_ref()[0];
ifd.native_bounds()
}
}

#[cfg(test)]
Expand Down
20 changes: 20 additions & 0 deletions src/geo/affine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::ImageFileDirectory;

/// Affine transformation values.
#[derive(Debug)]
pub struct AffineTransform(f64, f64, f64, f64, f64, f64);
Expand Down Expand Up @@ -30,4 +32,22 @@ impl AffineTransform {
pub fn f(&self) -> f64 {
self.5
}

/// Construct a new Affine Transform from the IFD
pub fn from_ifd(ifd: &ImageFileDirectory) -> Option<Self> {
if let (Some(model_pixel_scale), Some(model_tiepoint)) =
(&ifd.model_pixel_scale, &ifd.model_tiepoint)
{
Some(Self::new(
model_pixel_scale[0],
0.0,
model_tiepoint[3],
0.0,
-model_pixel_scale[1],
model_tiepoint[4],
))
} else {
None
}
}
}
Loading

0 comments on commit ed67b77

Please sign in to comment.