diff --git a/Cargo.lock b/Cargo.lock index 7096b2e7..435591e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,33 +19,33 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] [[package]] name = "anyhow" -version = "1.0.79" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" dependencies = [ "backtrace", ] [[package]] name = "autocfg" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" [[package]] name = "backtrace" -version = "0.3.69" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", "cc", @@ -58,9 +58,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" +checksum = "5d6d68c57235a3a081186990eca2867354726650f42f7516ca50c28d6281fd15" [[package]] name = "byteorder" @@ -70,12 +70,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" -dependencies = [ - "libc", -] +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" [[package]] name = "cfg-if" @@ -105,13 +102,38 @@ dependencies = [ [[package]] name = "crc32fast" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "default-boxed" version = "0.2.0" @@ -141,6 +163,12 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + [[package]] name = "flate2" version = "1.0.28" @@ -207,7 +235,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.47", + "syn 2.0.58", ] [[package]] @@ -224,9 +252,9 @@ checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" @@ -248,9 +276,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.11" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +checksum = "a06fddc2749e0528d2813f95e050e87e52c8cbbae56223b9babf73b3e53b0cc6" dependencies = [ "cfg-if", "libc", @@ -271,9 +299,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "itoa" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "lazy_static" @@ -293,6 +321,7 @@ dependencies = [ "flate2", "log", "rand", + "rayon", "rstest", "simple_logger", "wide", @@ -300,36 +329,42 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num_threads" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" dependencies = [ "libc", ] @@ -345,9 +380,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -369,9 +404,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.75" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ "unicode-ident", ] @@ -415,11 +450,31 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex" -version = "1.10.2" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -429,9 +484,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", @@ -440,9 +495,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "relative-path" @@ -475,7 +530,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.47", + "syn 2.0.58", "unicode-ident", ] @@ -505,28 +560,28 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" [[package]] name = "serde" -version = "1.0.194" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.194" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.47", + "syn 2.0.58", ] [[package]] @@ -563,9 +618,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.47" +version = "2.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1726efe18f42ae774cc644f330953a5e7b3c3003d3edcecf18850fe9d4dd9afb" +checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" dependencies = [ "proc-macro2", "quote", @@ -574,13 +629,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -596,10 +652,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -617,9 +674,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wide" -version = "0.7.13" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c68938b57b33da363195412cfc5fc37c9ed49aa9cfe2156fde64b8d2c9498242" +checksum = "89beec544f246e679fc25490e3f8e08003bc4bf612068f325120dad4cea02c1c" dependencies = [ "bytemuck", "safe_arch", diff --git a/Cargo.toml b/Cargo.toml index 9c5b85e2..e9527e8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ wide = "0.7" log = "0.4" simple_logger ="4.0" cpu-time = "1.0" +rayon = "1.10" [dev-dependencies] rstest = "0.18" diff --git a/src/structs/lepton_format.rs b/src/structs/lepton_format.rs index 2c2c694a..3e528f15 100644 --- a/src/structs/lepton_format.rs +++ b/src/structs/lepton_format.rs @@ -12,8 +12,6 @@ use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write}; use std::mem::swap; use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{Receiver, SendError}; -use std::thread; -use std::thread::ScopedJoinHandle; use std::time::Instant; use anyhow::{Context, Result}; @@ -311,7 +309,7 @@ fn run_lepton_decoder_threads( lh: &LeptonHeader, reader: &mut R, last_data_position: u64, - max_threads_to_use: usize, + _max_threads_to_use: usize, features: &EnabledFeatures, process: fn( thread_handoff: &ThreadHandoff, @@ -334,115 +332,35 @@ fn run_lepton_decoder_threads( qt.push(qtables); } - let r = thread::scope(|s| -> Result<(Metrics, Vec

)> { - let mut running_threads: Vec>> = Vec::new(); + let mut thread_results = Vec::>>::new(); + for _i in 0..lh.thread_handoff.len() { + thread_results.push(None); + } + + // track if we got an error while trying to send to a thread + let mut error_sending: Option> = None; + + rayon::in_place_scope(|s| -> Result<()> { let mut channel_to_sender = Vec::new(); let pts_ref = &pts; let q_ref = &qt[..]; - // don't use more threads than we need - let m = cmp::min(max_threads_to_use, lh.thread_handoff.len()); - - info!( - "decoding {0} multipexed streams with {1} threads", - lh.thread_handoff.len(), - m - ); - - // ratio of threads to work items - let ratio = lh.thread_handoff.len() as f32 / m as f32; - - for t in 0..m { - let start = (t as f32 * ratio) as usize; - let end = ((t + 1) as f32 * ratio) as usize; - - let iter_per_thread = end - start; - - let mut rx_channels = Vec::new(); - for _k in 0..iter_per_thread { - let (tx, rx) = channel(); - channel_to_sender.push(tx); - rx_channels.push(Some(rx)); - } - - running_threads.push(s.spawn(move || -> Result<(P, Metrics)> { - let cpu_time = ThreadTime::now(); - - // determine how much we are going to write in total to presize the buffer - let mut decoded_size = 0; - for thread_id in start..end { - decoded_size += lh.thread_handoff[thread_id].segment_size as usize; - } - - // create a combined handoff that merges all the sections we have read so we process them in one go - let combined_thread_handoff = ThreadHandoff { - luma_y_start: lh.thread_handoff[start].luma_y_start, - luma_y_end: lh.thread_handoff[end - 1].luma_y_end, - segment_offset_in_file: lh.thread_handoff[start].segment_offset_in_file, - segment_size: decoded_size as i32, - overhang_byte: lh.thread_handoff[start].overhang_byte, - num_overhang_bits: lh.thread_handoff[start].num_overhang_bits, - last_dc: lh.thread_handoff[start].last_dc.clone(), - }; - - let mut image_data = Vec::new(); - for i in 0..lh.jpeg_header.cmpc { - image_data.push(BlockBasedImage::new( - &lh.jpeg_header, - i, - combined_thread_handoff.luma_y_start, - if t == m - 1 { - // if this is the last thead, then the image should extend all the way to the bottom - lh.jpeg_header.cmp_info[0].bcv - } else { - combined_thread_handoff.luma_y_end - }, - )); - } - - let mut metrics = Metrics::default(); - - // now run the range of thread handoffs in the file that this thread is supposed to handle - for thread_id in start..end { - // get the appropriate receiver so we can read out data from it - let rx = rx_channels[thread_id - start].take().context(here!())?; - let mut reader = MessageReceiver { - thread_id: thread_id as u8, - current_buffer: Cursor::new(Vec::new()), - receiver: rx, - end_of_file: false, - }; - - metrics.merge_from( - lepton_decode_row_range( - pts_ref, - q_ref, - &lh.truncate_components, - &mut image_data, - &mut reader, - lh.thread_handoff[thread_id].luma_y_start, - lh.thread_handoff[thread_id].luma_y_end, - thread_id == lh.thread_handoff.len() - 1, - true, - features, - ) - .context(here!())?, - ); - } - - let process_result = process(&combined_thread_handoff, image_data, lh)?; + info!("decoding {0} multiplexed streams", lh.thread_handoff.len()); - metrics.record_cpu_worker_time(cpu_time.elapsed()); + // create a channel for each stream and spawn a work item to read from it + // the return value from each work item is stored in thread_results, which + // is collected at the end + for (t, result) in thread_results.iter_mut().enumerate() { + let (tx, rx) = channel(); + channel_to_sender.push(tx); - Ok((process_result, metrics)) - })); + s.spawn(move |_| { + *result = Some(decoder_thread(lh, t, rx, pts_ref, q_ref, features, process)); + }); } - // track if we got an error while trying to send to a thread - let mut error_sending: Option> = None; - - // now that the threads are waiting for inptut, read the stream and send all the buffers to their respective readers + // now that the channels are waiting for input, read the stream and send all the buffers to their respective readers while reader.stream_position().context(here!())? < last_data_position - 4 { let thread_marker = reader.read_u8().context(here!())?; let thread_id = (thread_marker & 0xf) as u8; @@ -501,34 +419,102 @@ fn run_lepton_decoder_threads( let _ = c.send(Message::Eof); } - let mut metrics = Metrics::default(); - - let mut result = Vec::new(); - for i in running_threads.drain(..) { - let (result_to_process, source_metrics) = i.join().unwrap().context(here!())?; + Ok(()) + })?; - metrics.merge_from(source_metrics); + let mut metrics = Metrics::default(); - result.push(result_to_process); + let mut result = Vec::new(); + let mut thread_not_run = false; + for i in thread_results.drain(..) { + match i { + None => thread_not_run = true, + Some(Err(e)) => { + return Err(e).context(here!()); + } + Some(Ok((m, r))) => { + metrics.merge_from(m); + result.push(r); + } } + } - // if there was an error during send, it should have resulted in an error from one of the threads above and - // we wouldn't get here, but as an extra precaution, we check here to make sure we didn't miss anything - if let Some(e) = error_sending { - return Err(e).context(here!()); - } + if thread_not_run { + return err_exit_code(ExitCode::GeneralFailure, "thread did not run").context(here!()); + } - info!( - "worker threads {0}ms of CPU time in {1}ms of wall time", - metrics.get_cpu_time_worker_time().as_millis(), - wall_time.elapsed().as_millis() - ); + // if there was an error during send, it should have resulted in an error from one of the threads above and + // we wouldn't get here, but as an extra precaution, we check here to make sure we didn't miss anything + if let Some(e) = error_sending { + return Err(e).context(here!()); + } - return Ok((metrics, result)); - }) - .context(here!())?; + info!( + "worker threads {0}ms of CPU time in {1}ms of wall time", + metrics.get_cpu_time_worker_time().as_millis(), + wall_time.elapsed().as_millis() + ); - Ok(r) + Ok((metrics, result)) +} + +fn decoder_thread

( + lh: &LeptonHeader, + thread_id: usize, + rx: Receiver, + pts_ref: &ProbabilityTablesSet, + q_ref: &[QuantizationTables], + features: &EnabledFeatures, + process: fn(&ThreadHandoff, Vec, &LeptonHeader) -> Result, +) -> Result<(Metrics, P), anyhow::Error> { + let cpu_time = ThreadTime::now(); + + let mut image_data = Vec::new(); + for i in 0..lh.jpeg_header.cmpc { + image_data.push(BlockBasedImage::new( + &lh.jpeg_header, + i, + lh.thread_handoff[thread_id].luma_y_start, + if thread_id == lh.thread_handoff.len() - 1 { + // if this is the last thread, then the image should extend all the way to the bottom + lh.jpeg_header.cmp_info[0].bcv + } else { + lh.thread_handoff[thread_id].luma_y_end + }, + )); + } + + let mut metrics = Metrics::default(); + + // get the appropriate receiver so we can read out data from it + let mut reader = MessageReceiver { + thread_id: thread_id as u8, + current_buffer: Cursor::new(Vec::new()), + receiver: rx, + end_of_file: false, + }; + + metrics.merge_from( + lepton_decode_row_range( + pts_ref, + q_ref, + &lh.truncate_components, + &mut image_data, + &mut reader, + lh.thread_handoff[thread_id].luma_y_start, + lh.thread_handoff[thread_id].luma_y_end, + thread_id == lh.thread_handoff.len() - 1, + true, + features, + ) + .context(here!())?, + ); + + let process_result = process(&lh.thread_handoff[thread_id], image_data, lh)?; + + metrics.record_cpu_worker_time(cpu_time.elapsed()); + + Ok((metrics, process_result)) } /// runs the encoding threads and returns the total amount of CPU time consumed (including worker threads) @@ -569,49 +555,29 @@ fn run_lepton_encoder_threads( let mut sizes = Vec::::new(); sizes.resize(thread_handoffs.len(), 0); - let mut merged_metrics = Metrics::default(); + let mut thread_results = Vec::>>::new(); + for _i in 0..thread_handoffs.len() { + thread_results.push(None); + } - thread::scope(|s| -> Result<()> { + rayon::in_place_scope(|s| -> Result<()> { let (tx, rx) = channel(); - let mut running_threads = Vec::new(); - - for i in 0..thread_handoffs.len() { + for (thread_id, result) in thread_results.iter_mut().enumerate() { let cloned_sender = tx.clone(); - running_threads.push(s.spawn(move || -> Result { - let cpu_time = ThreadTime::now(); - - let thread_id = i; - let mut thread_writer = MessageSender { - thread_id: thread_id as u8, - sender: cloned_sender, - buffer: Vec::with_capacity(WRITE_BUFFER_SIZE), - }; - - let mut range_metrics = lepton_encode_row_range( + s.spawn(move |_| { + *result = Some(encode_thread_action( + thread_id, + cloned_sender, pts_ref, q_ref, image_data, - &mut thread_writer, - thread_id as i32, colldata, - thread_handoffs[thread_id].luma_y_start, - thread_handoffs[thread_id].luma_y_end, - thread_id == thread_handoffs.len() - 1, - true, + thread_handoffs, features, - ) - .context(here!())?; - - thread_writer.flush().context(here!())?; - - thread_writer.sender.send(Message::Eof).context(here!())?; - - range_metrics.record_cpu_worker_time(cpu_time.elapsed()); - - Ok(range_metrics) - })); + )); + }); } // drop the sender so that the channel breaks when all the threads exit @@ -636,29 +602,32 @@ fn run_lepton_encoder_threads( sizes[thread_id as usize] += b.len() as u64; } - Err(x) => { - // get the actual error that cause the channel to - // prematurely close - for result in running_threads.drain(..) { - let r = result.join().unwrap(); - if let Err(e) = r { - return Err(e.context(here!())); - } - } - - return Err(x); + Err(_) => { + break; } } } - for result in running_threads.drain(..) { - merged_metrics.merge_from(result.join().unwrap().unwrap()); - } - return Ok(()); }) .context(here!())?; + let mut thread_not_run = false; + let mut merged_metrics = Metrics::default(); + + for result in thread_results.drain(..) { + match result { + None => thread_not_run = true, + Some(Ok(metrics)) => merged_metrics.merge_from(metrics), + // if there was an error processing anything, return it + Some(Err(e)) => return Err(e), + } + } + + if thread_not_run { + return err_exit_code(ExitCode::GeneralFailure, "thread did not run"); + } + info!( "scan portion of JPEG uncompressed size = {0}", sizes.iter().sum::() @@ -673,6 +642,48 @@ fn run_lepton_encoder_threads( Ok(merged_metrics) } +fn encode_thread_action( + thread_id: usize, + cloned_sender: Sender, + pts_ref: &ProbabilityTablesSet, + q_ref: &[QuantizationTables], + image_data: &[BlockBasedImage], + colldata: &TruncateComponents, + thread_handoffs: &[ThreadHandoff], + features: &EnabledFeatures, +) -> std::prelude::v1::Result { + let cpu_time = ThreadTime::now(); + + let mut thread_writer = MessageSender { + thread_id: thread_id as u8, + sender: cloned_sender, + buffer: Vec::with_capacity(WRITE_BUFFER_SIZE), + }; + + let mut range_metrics = lepton_encode_row_range( + pts_ref, + q_ref, + image_data, + &mut thread_writer, + thread_id as i32, + colldata, + thread_handoffs[thread_id].luma_y_start, + thread_handoffs[thread_id].luma_y_end, + thread_id == thread_handoffs.len() - 1, + true, + features, + ) + .context(here!())?; + + thread_writer.flush().context(here!())?; + + thread_writer.sender.send(Message::Eof).context(here!())?; + + range_metrics.record_cpu_worker_time(cpu_time.elapsed()); + + Ok(range_metrics) +} + #[derive(Debug)] pub struct LeptonHeader { /// raw jpeg header to be written back to the file when it is recreated