diff --git a/src/main.rs b/src/main.rs index 0ec9d1c..aee3083 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod render; use anyhow::{bail, Context, Result}; use chrono::{DateTime, Datelike, Timelike, Utc}; use clap::Parser; +use rayon::prelude::*; use std::path::PathBuf; const BASE_URL: &str = "https://opendata.dwd.de/weather/nwp/icon-d2/grib/"; @@ -77,26 +78,22 @@ fn main() -> Result<()> { dt.hour() ); - // Download & decode coordinate grids + // Build a dedicated thread pool for I/O-bound downloads, separate from + // the global rayon pool used for CPU-bound rendering. + let num_download_threads = (args.prediction_hours as usize + 2).min(16); + let download_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_download_threads) + .build() + .context("Failed to build download thread pool")?; + + // Download coordinate grids (clat / clon) concurrently. eprintln!("Downloading coordinate grids (clat / clon)..."); - let lats = download_and_decode( - &run_url, - &date_str, - "clat", - "time-invariant", - None, - &cache_dir, - ) - .context("clat")?; - let lons = download_and_decode( - &run_url, - &date_str, - "clon", - "time-invariant", - None, - &cache_dir, - ) - .context("clon")?; + let (lats, lons) = download_pool.join( + || download_and_decode(&run_url, &date_str, "clat", "time-invariant", None, &cache_dir).context("clat"), + || download_and_decode(&run_url, &date_str, "clon", "time-invariant", None, &cache_dir).context("clon"), + ); + let lats = lats?; + let lons = lons?; if lats.len() != lons.len() { bail!( @@ -107,21 +104,31 @@ fn main() -> Result<()> { } eprintln!(" Grid has {} points.", lats.len()); - // Download, decode, and render each forecast hour + // Download all forecast steps in parallel, then render sequentially. + eprintln!("Downloading {} forecast frame(s) in parallel...", args.prediction_hours); + let mut cloud_data: Vec<(u32, Vec)> = download_pool.install(|| { + (0..args.prediction_hours) + .into_par_iter() + .map(|step| { + let cloud = download_and_decode( + &run_url, + &date_str, + "clct", + "single-level", + Some(step), + &cache_dir, + ) + .with_context(|| format!("clct step {}", step))?; + Ok::<_, anyhow::Error>((step, cloud)) + }) + .collect::>>() + })?; + cloud_data.sort_unstable_by_key(|(step, _)| *step); + eprintln!("Rendering {} frame(s)...", args.prediction_hours); let mut output_paths = Vec::new(); - for step in 0..args.prediction_hours { - let cloud = download_and_decode( - &run_url, - &date_str, - "clct", - "single-level", - Some(step), - &cache_dir, - ) - .with_context(|| format!("clct step {}", step))?; - + for (step, cloud) in cloud_data { if cloud.len() != lats.len() { bail!( "clct step {} has {} values but coordinate grids have {}",