2026-05-04 09:44:56 +02:00

134 lines
4.2 KiB
Python

"""
Orchestrate ETL: NetCDF path → normalized frames → Arrow blocks + metadata.
Subsamples to one frame per hour when time axis is available (e.g. 5-min → hourly).
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
from .nc_reader import load_nc
from .to_blocks import write_blocks_arrow
def _forward_fill_beached(frames: np.ndarray) -> None:
"""Fem el beached persistent per partícula: al NC no és acumulatiu, i si llegim només cada hora
es podrien perdre les lectures intermitges on una partícula era beached. Un cop beached=1,
el mantenim 1 per a tots els passos següents (in-place)."""
num_steps, num_particles = frames.shape[0], frames.shape[1]
b = frames[:, :, 4]
for p in range(num_particles):
seen = False
for s in range(num_steps):
if b[s, p] == 1:
seen = True
elif seen:
b[s, p] = 1
frames[:, :, 4] = b
def _subsample_to_hourly(
frames: np.ndarray,
num_steps: int,
num_particles: int,
release_steps: list[int],
time_ref_sec: np.ndarray,
time_start_iso: str,
time_end_iso: str,
) -> tuple[np.ndarray, int, list[int]]:
"""Keep one frame per hour (hora punta): step closest to each full hour. Returns (frames_hourly, new_num_steps, new_release_steps)."""
start_sec = np.datetime64(time_start_iso).astype("datetime64[s]").astype(np.float64)
end_sec = np.datetime64(time_end_iso).astype("datetime64[s]").astype(np.float64)
num_hours = max(1, int(round((end_sec - start_sec) / 3600)))
hourly_indices: list[int] = []
for h in range(num_hours):
hour_center = start_sec + h * 3600 # full hour (e.g. 07:00:00)
t_lo = hour_center
t_hi = start_sec + (h + 1) * 3600
# Step in this hour whose time is closest to the full hour
best_s: int | None = None
best_dt = float("inf")
for s in range(num_steps):
if np.isnan(time_ref_sec[s]):
continue
if t_lo <= time_ref_sec[s] < t_hi:
dt = abs(time_ref_sec[s] - hour_center)
if dt < best_dt:
best_dt = dt
best_s = s
if best_s is not None:
hourly_indices.append(best_s)
if not hourly_indices:
return frames, num_steps, release_steps
idx = np.array(hourly_indices, dtype=np.intp)
frames_hourly = frames[idx]
new_num_steps = len(hourly_indices)
# Map release_steps: for each particle, first hourly index >= its original release step
new_release_steps: list[int] = []
for p in range(num_particles):
r = release_steps[p]
i = 0
while i < new_num_steps and idx[i] < r:
i += 1
new_release_steps.append(min(i, new_num_steps - 1))
return frames_hourly, new_num_steps, new_release_steps
def run_etl(
nc_path: str | Path,
out_dir: str | Path,
sim_id: str = "default",
write_json: bool = False,
) -> dict:
"""
Run full ETL: read .nc, optionally subsample to hourly, write blocks and metadata.
Returns metadata dict.
"""
nc_path = Path(nc_path)
out_dir = Path(out_dir)
sim_dir = out_dir / "simulations" / sim_id
(
frames,
num_particles,
num_steps,
release_steps,
seed_names,
origins,
time_start_iso,
time_end_iso,
time_ref_sec,
) = load_nc(str(nc_path))
# Beached: once a particle is beached it stays beached (so hourly sample doesn't miss it)
_forward_fill_beached(frames)
# Subsample to one frame per hour when we have a time reference
if (
time_ref_sec is not None
and time_start_iso is not None
and time_end_iso is not None
and np.any(np.isfinite(time_ref_sec))
):
frames, num_steps, release_steps = _subsample_to_hourly(
frames, num_steps, num_particles, release_steps,
time_ref_sec, time_start_iso, time_end_iso,
)
metadata = write_blocks_arrow(
sim_dir,
frames,
num_particles,
num_steps,
release_steps,
seed_names,
origins,
write_json=write_json,
time_start_iso=time_start_iso,
time_end_iso=time_end_iso,
)
return metadata