Parallelize GRIB2 downloads using a dedicated rayon thread pool

Downloads clat/clon concurrently via rayon::join and all forecast steps
in parallel via par_iter, then renders frames sequentially. A separate
thread pool (up to 16 threads) is used for downloads to avoid blocking
the global pool during CPU-bound rendering.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Schuwi
2026-03-06 22:27:39 +01:00
parent ee9b039d56
commit 0c73e04959

View File

@@ -6,6 +6,7 @@ mod render;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use chrono::{DateTime, Datelike, Timelike, Utc}; use chrono::{DateTime, Datelike, Timelike, Utc};
use clap::Parser; use clap::Parser;
use rayon::prelude::*;
use std::path::PathBuf; use std::path::PathBuf;
const BASE_URL: &str = "https://opendata.dwd.de/weather/nwp/icon-d2/grib/"; const BASE_URL: &str = "https://opendata.dwd.de/weather/nwp/icon-d2/grib/";
@@ -77,26 +78,22 @@ fn main() -> Result<()> {
dt.hour() dt.hour()
); );
// Download & decode coordinate grids // Build a dedicated thread pool for I/O-bound downloads, separate from
// the global rayon pool used for CPU-bound rendering.
let num_download_threads = (args.prediction_hours as usize + 2).min(16);
let download_pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_download_threads)
.build()
.context("Failed to build download thread pool")?;
// Download coordinate grids (clat / clon) concurrently.
eprintln!("Downloading coordinate grids (clat / clon)..."); eprintln!("Downloading coordinate grids (clat / clon)...");
let lats = download_and_decode( let (lats, lons) = download_pool.join(
&run_url, || download_and_decode(&run_url, &date_str, "clat", "time-invariant", None, &cache_dir).context("clat"),
&date_str, || download_and_decode(&run_url, &date_str, "clon", "time-invariant", None, &cache_dir).context("clon"),
"clat", );
"time-invariant", let lats = lats?;
None, let lons = lons?;
&cache_dir,
)
.context("clat")?;
let lons = download_and_decode(
&run_url,
&date_str,
"clon",
"time-invariant",
None,
&cache_dir,
)
.context("clon")?;
if lats.len() != lons.len() { if lats.len() != lons.len() {
bail!( bail!(
@@ -107,11 +104,12 @@ fn main() -> Result<()> {
} }
eprintln!(" Grid has {} points.", lats.len()); eprintln!(" Grid has {} points.", lats.len());
// Download, decode, and render each forecast hour // Download all forecast steps in parallel, then render sequentially.
eprintln!("Rendering {} frame(s)...", args.prediction_hours); eprintln!("Downloading {} forecast frame(s) in parallel...", args.prediction_hours);
let mut output_paths = Vec::new(); let mut cloud_data: Vec<(u32, Vec<f32>)> = download_pool.install(|| {
(0..args.prediction_hours)
for step in 0..args.prediction_hours { .into_par_iter()
.map(|step| {
let cloud = download_and_decode( let cloud = download_and_decode(
&run_url, &run_url,
&date_str, &date_str,
@@ -121,7 +119,16 @@ fn main() -> Result<()> {
&cache_dir, &cache_dir,
) )
.with_context(|| format!("clct step {}", step))?; .with_context(|| format!("clct step {}", step))?;
Ok::<_, anyhow::Error>((step, cloud))
})
.collect::<Result<Vec<_>>>()
})?;
cloud_data.sort_unstable_by_key(|(step, _)| *step);
eprintln!("Rendering {} frame(s)...", args.prediction_hours);
let mut output_paths = Vec::new();
for (step, cloud) in cloud_data {
if cloud.len() != lats.len() { if cloud.len() != lats.len() {
bail!( bail!(
"clct step {} has {} values but coordinate grids have {}", "clct step {} has {} values but coordinate grids have {}",