""" Orchestrate ETL: NetCDF path → normalized frames → Arrow blocks + metadata. Subsamples to one frame per hour when time axis is available (e.g. 5-min → hourly). """ from __future__ import annotations from pathlib import Path import numpy as np from .nc_reader import load_nc from .to_blocks import write_blocks_arrow def _forward_fill_beached(frames: np.ndarray) -> None: """Fem el beached persistent per partícula: al NC no és acumulatiu, i si llegim només cada hora es podrien perdre les lectures intermitges on una partícula era beached. Un cop beached=1, el mantenim 1 per a tots els passos següents (in-place).""" num_steps, num_particles = frames.shape[0], frames.shape[1] b = frames[:, :, 4] for p in range(num_particles): seen = False for s in range(num_steps): if b[s, p] == 1: seen = True elif seen: b[s, p] = 1 frames[:, :, 4] = b def _subsample_to_hourly( frames: np.ndarray, num_steps: int, num_particles: int, release_steps: list[int], time_ref_sec: np.ndarray, time_start_iso: str, time_end_iso: str, ) -> tuple[np.ndarray, int, list[int]]: """Keep one frame per hour (hora punta): step closest to each full hour. Returns (frames_hourly, new_num_steps, new_release_steps).""" start_sec = np.datetime64(time_start_iso).astype("datetime64[s]").astype(np.float64) end_sec = np.datetime64(time_end_iso).astype("datetime64[s]").astype(np.float64) num_hours = max(1, int(round((end_sec - start_sec) / 3600))) hourly_indices: list[int] = [] for h in range(num_hours): hour_center = start_sec + h * 3600 # full hour (e.g. 07:00:00) t_lo = hour_center t_hi = start_sec + (h + 1) * 3600 # Step in this hour whose time is closest to the full hour best_s: int | None = None best_dt = float("inf") for s in range(num_steps): if np.isnan(time_ref_sec[s]): continue if t_lo <= time_ref_sec[s] < t_hi: dt = abs(time_ref_sec[s] - hour_center) if dt < best_dt: best_dt = dt best_s = s if best_s is not None: hourly_indices.append(best_s) if not hourly_indices: return frames, num_steps, release_steps idx = np.array(hourly_indices, dtype=np.intp) frames_hourly = frames[idx] new_num_steps = len(hourly_indices) # Map release_steps: for each particle, first hourly index >= its original release step new_release_steps: list[int] = [] for p in range(num_particles): r = release_steps[p] i = 0 while i < new_num_steps and idx[i] < r: i += 1 new_release_steps.append(min(i, new_num_steps - 1)) return frames_hourly, new_num_steps, new_release_steps def run_etl( nc_path: str | Path, out_dir: str | Path, sim_id: str = "default", write_json: bool = False, ) -> dict: """ Run full ETL: read .nc, optionally subsample to hourly, write blocks and metadata. Returns metadata dict. """ nc_path = Path(nc_path) out_dir = Path(out_dir) sim_dir = out_dir / "simulations" / sim_id ( frames, num_particles, num_steps, release_steps, seed_names, origins, time_start_iso, time_end_iso, time_ref_sec, ) = load_nc(str(nc_path)) # Beached: once a particle is beached it stays beached (so hourly sample doesn't miss it) _forward_fill_beached(frames) # Subsample to one frame per hour when we have a time reference if ( time_ref_sec is not None and time_start_iso is not None and time_end_iso is not None and np.any(np.isfinite(time_ref_sec)) ): frames, num_steps, release_steps = _subsample_to_hourly( frames, num_steps, num_particles, release_steps, time_ref_sec, time_start_iso, time_end_iso, ) metadata = write_blocks_arrow( sim_dir, frames, num_particles, num_steps, release_steps, seed_names, origins, write_json=write_json, time_start_iso=time_start_iso, time_end_iso=time_end_iso, ) return metadata