Skip to content

terraflow.core

The core modules define the configuration schema and the end-to-end pipeline.

Quick Start

from terraflow.pipeline import run_pipeline
from terraflow.config import load_config

# Load and validate configuration
config = load_config("config.yml")

# Run the complete pipeline
results_df = run_pipeline("config.yml")

Module Organization

  • config: YAML schema validation and Pydantic models
  • pipeline: End-to-end orchestration and artifact generation
  • model: Suitability scoring algorithms and label assignment

API Reference

config

ClimateConfig

Bases: BaseModel

Climate data configuration for spatial matching and interpolation.

Supports two strategies for aligning climate observations to raster cells: - 'spatial': Interpolate climate values using geographic coordinates (lat/lon). - 'index': Match cells to climate records by row index or explicit cell ID.

When strategy is 'spatial', three interpolation methods are available: - 'linear': Delaunay-triangulation linear interpolation (scipy.griddata, default). - 'kriging': Ordinary Kriging — geostatistically optimal with per-cell uncertainty (requires pykrige; ≥5 stations recommended). - 'idw': Inverse Distance Weighting — fast, no uncertainty estimate.

Attributes:

Name Type Description
strategy Literal['spatial', 'index']

Matching strategy: 'spatial' (interpolation) or 'index' (direct matching).

interpolation_method Literal['linear', 'kriging', 'idw']

Interpolation algorithm when strategy='spatial'. Choices: 'linear' (default), 'kriging', 'idw'.

cell_id_column str | None

Column name in climate CSV for cell ID (used with 'index' strategy). If None with 'index' strategy, uses row order matching.

fallback_to_mean bool

If True, use global mean climate for cells outside interpolation range or when climate data is sparse (default True).

validate_config()

Validate consistency of climate configuration.

Source code in terraflow/config.py
def validate_config(self) -> None:
    """Validate consistency of climate configuration."""
    # interpolation_method only applies to spatial strategy
    pass

validate_interpolation_method(v) classmethod

Ensure interpolation_method is valid.

Source code in terraflow/config.py
@field_validator("interpolation_method")
@classmethod
def validate_interpolation_method(cls, v: str) -> str:
    """Ensure interpolation_method is valid."""
    if v not in ("linear", "kriging", "idw"):
        raise ValueError(
            f"interpolation_method must be 'linear', 'kriging', or 'idw', got '{v}'"
        )
    return v

validate_strategy(v) classmethod

Ensure strategy is valid.

Source code in terraflow/config.py
@field_validator("strategy")
@classmethod
def validate_strategy(cls, v: str) -> str:
    """Ensure strategy is valid."""
    if v not in ("spatial", "index"):
        raise ValueError(f"strategy must be 'spatial' or 'index', got '{v}'")
    return v

ModelParams

Bases: BaseModel

Parameters for normalization and weighting in the suitability model.

The weights w_v, w_t, w_r should sum to approximately 1.0 for proper normalization, though this is not strictly enforced.

Attributes:

Name Type Description
v_min, v_max

Min/max vegetation index values for normalization.

t_min, t_max

Min/max temperature values for normalization (in °C).

r_min, r_max

Min/max rainfall values for normalization (in mm).

w_v, w_t, w_r

Weights for vegetation, temperature, and rainfall (should sum to 1.0).

uncertainty_samples int

Number of Monte Carlo draws per cell for uncertainty propagation. 0 (default) disables Monte Carlo. Requires interpolation_method='kriging' to produce per-cell standard deviations. Adds score_ci_low and score_ci_high columns to features.parquet and an uncertainty block to report.json.

validate_max_values(v, info) classmethod

Ensure max values are reasonable numbers.

Source code in terraflow/config.py
@field_validator("v_max", "t_max", "r_max")
@classmethod
def validate_max_values(cls, v: float, info) -> float:
    """Ensure max values are reasonable numbers."""
    if not isinstance(v, (int, float)):
        raise ValueError(f"Max value must be numeric, got {type(v)}")
    return v

validate_min_values(v, info) classmethod

Ensure min values are reasonable numbers.

Source code in terraflow/config.py
@field_validator("v_min", "t_min", "r_min")
@classmethod
def validate_min_values(cls, v: float, info) -> float:
    """Ensure min values are reasonable numbers."""
    if not isinstance(v, (int, float)):
        raise ValueError(f"Min value must be numeric, got {type(v)}")
    return v

validate_ranges()

Validate that min < max for all ranges. Call after initialization.

Source code in terraflow/config.py
def validate_ranges(self) -> None:
    """Validate that min < max for all ranges. Call after initialization."""
    if self.v_min >= self.v_max:
        raise ValueError(
            f"v_min ({self.v_min}) must be less than v_max ({self.v_max})"
        )
    if self.t_min >= self.t_max:
        raise ValueError(
            f"t_min ({self.t_min}) must be less than t_max ({self.t_max})"
        )
    if self.r_min >= self.r_max:
        raise ValueError(
            f"r_min ({self.r_min}) must be less than r_max ({self.r_max})"
        )

    # Check weights are non-negative
    if self.w_v < 0 or self.w_t < 0 or self.w_r < 0:
        raise ValueError("Weights must be non-negative")

    # Check weights sum to approximately 1.0 (allow small tolerance)
    weight_sum = self.w_v + self.w_t + self.w_r
    if abs(weight_sum - 1.0) > 0.01:
        raise ValueError(
            f"Weights must sum to approximately 1.0, got {weight_sum:.3f} "
            f"(w_v={self.w_v}, w_t={self.w_t}, w_r={self.w_r})"
        )

validate_uncertainty_samples(v) classmethod

Ensure uncertainty_samples is non-negative.

Source code in terraflow/config.py
@field_validator("uncertainty_samples")
@classmethod
def validate_uncertainty_samples(cls, v: int) -> int:
    """Ensure uncertainty_samples is non-negative."""
    if v < 0:
        raise ValueError(f"uncertainty_samples must be >= 0, got {v}")
    return v

PipelineConfig

Bases: BaseModel

Top-level pipeline configuration.

Attributes:

Name Type Description
raster_path Path

Path to the input raster file (GeoTIFF).

climate_csv Path

Path to the climate data CSV file.

output_dir Path

Directory for output results.

roi ROI

Region of interest specification.

model_params ModelParams

Suitability model parameters.

climate ClimateConfig

Climate data configuration (strategy, fallback behavior, etc.).

max_cells int

Maximum number of cells to sample from the ROI (default 500).

validate_all()

Validate all constraints. Call after model initialization.

Source code in terraflow/config.py
def validate_all(self) -> None:
    """Validate all constraints. Call after model initialization."""
    self.roi.validate_bounds()
    self.model_params.validate_ranges()
    self.climate.validate_config()

validate_max_cells(v) classmethod

Ensure max_cells is a positive integer.

Source code in terraflow/config.py
@field_validator("max_cells")
@classmethod
def validate_max_cells(cls, v: int) -> int:
    """Ensure max_cells is a positive integer."""
    if v <= 0:
        raise ValueError(f"max_cells must be positive, got {v}")
    return v

ROI

Bases: BaseModel

Region of interest. For now we support only a bounding box.

Attributes:

Name Type Description
type Literal['bbox']

Type of ROI (currently only 'bbox' is supported).

xmin, ymin, xmax, ymax

Bounding box coordinates expressed in roi_crs (default WGS 84 degrees).

roi_crs str

EPSG code or WKT string for the CRS of the bounding box coordinates. Defaults to "EPSG:4326" (latitude/longitude degrees). The pipeline automatically reprojects to the raster's native CRS before clipping.

validate_bounds()

Validate that bounds are sensible. Call after initialization.

Source code in terraflow/config.py
def validate_bounds(self) -> None:
    """Validate that bounds are sensible. Call after initialization."""
    if self.xmin >= self.xmax:
        raise ValueError(f"xmin ({self.xmin}) must be less than xmax ({self.xmax})")
    if self.ymin >= self.ymax:
        raise ValueError(f"ymin ({self.ymin}) must be less than ymax ({self.ymax})")

SensitivityConfig

Bases: BaseModel

Configuration for sensitivity analysis (per D-04).

ValidationConfig

Bases: BaseModel

Configuration for model validation (Phase 3).

WeightBounds

Bases: BaseModel

Bounds for a single weight parameter in sensitivity analysis.

build_config(data)

Validate a raw config dict into a PipelineConfig.

Source code in terraflow/config.py
def build_config(data: dict) -> PipelineConfig:
    """Validate a raw config dict into a PipelineConfig."""
    try:
        cfg = PipelineConfig.model_validate(data)
        cfg.validate_all()
        return cfg
    except ValueError as e:
        raise ValueError(f"Configuration validation failed: {e}") from e

load_config(path)

Load YAML config from disk and validate with Pydantic.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML configuration file.

required

Returns:

Name Type Description
PipelineConfig PipelineConfig

Validated configuration object.

Raises:

Type Description
FileNotFoundError:

If the config file does not exist.

yaml.YAMLError:

If the YAML is malformed.

ValueError:

If the configuration is invalid.

Source code in terraflow/config.py
def load_config(path: str | Path) -> PipelineConfig:
    """Load YAML config from disk and validate with Pydantic.

    Parameters
    ----------
    path:
        Path to the YAML configuration file.

    Returns
    -------
    PipelineConfig:
        Validated configuration object.

    Raises
    ------
    FileNotFoundError:
        If the config file does not exist.
    yaml.YAMLError:
        If the YAML is malformed.
    ValueError:
        If the configuration is invalid.
    """
    data = load_config_dict(path)
    return build_config(data)

load_config_dict(path)

Load YAML config from disk into a raw dict.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML configuration file.

required

Returns:

Name Type Description
dict dict

Parsed configuration as a Python dict.

Raises:

Type Description
FileNotFoundError:

If the config file does not exist.

yaml.YAMLError:

If the YAML is malformed.

Source code in terraflow/config.py
def load_config_dict(path: str | Path) -> dict:
    """Load YAML config from disk into a raw dict.

    Parameters
    ----------
    path:
        Path to the YAML configuration file.

    Returns
    -------
    dict:
        Parsed configuration as a Python dict.

    Raises
    ------
    FileNotFoundError:
        If the config file does not exist.
    yaml.YAMLError:
        If the YAML is malformed.
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Configuration file not found: {path}")

    try:
        with path.open("r", encoding="utf-8") as f:
            data = yaml.safe_load(f)
    except yaml.YAMLError as e:
        raise ValueError(f"Failed to parse YAML config: {e}") from e

    if data is None:
        raise ValueError("Configuration file is empty")

    return data

pipeline

Pipeline orchestration for TerraFlow.

All outputs are written atomically to::

output_dir/runs/<run_fingerprint>/

Every run produces exactly three artifacts:

  • features.parquet — per-cell suitability features (tidy/wide schema v1)
  • manifest.json — config snapshot, run identity, input provenance
  • report.json — QA summaries, coverage metrics, step timings

A backward-compatible results.csv is also written to the same run directory so that callers relying on the previous output path can migrate at their own pace.

resolve_run_dir(config_path)

Return the run directory for a given config without running the pipeline.

Uses the same fingerprint computation as run_pipeline() so that terraflow validate always targets the correct run, not the most recently modified one.

Parameters:

Name Type Description Default
config_path Path | str

Path to a TerraFlow YAML config file.

required

Returns:

Name Type Description
Path Path

output_dir/runs/<fingerprint> — may not exist if the pipeline has not been run yet.

Source code in terraflow/pipeline.py
def resolve_run_dir(config_path: Path | str) -> Path:
    """Return the run directory for a given config without running the pipeline.

    Uses the same fingerprint computation as ``run_pipeline()`` so that
    ``terraflow validate`` always targets the correct run, not the most
    recently modified one.

    Parameters
    ----------
    config_path:
        Path to a TerraFlow YAML config file.

    Returns
    -------
    Path:
        ``output_dir/runs/<fingerprint>`` — may not exist if the pipeline
        has not been run yet.
    """
    config_path = Path(config_path)
    config_dict = load_config_dict(config_path)
    config_dir = config_path.resolve().parent

    for _key in ("raster_path", "climate_csv", "output_dir"):
        if _key in config_dict and config_dict[_key] is not None:
            _p = Path(str(config_dict[_key]))
            if not _p.is_absolute():
                config_dict[_key] = str((config_dir / _p).resolve())

    cfg: PipelineConfig = build_config(config_dict)
    roi_hash = _resolve_roi_hash(config_dict, config_dir)
    input_paths = _collect_input_paths(config_dict, config_dir)
    input_fps = [fingerprint_file(str(p)) for p in input_paths]
    run_fingerprint = compute_run_fingerprint(config_dict, roi_hash, input_fps)

    output_dir = Path(cfg.output_dir)
    return output_dir / "runs" / run_fingerprint

run_pipeline(config_path)

Run the end-to-end pipeline and return a DataFrame of results.

Uses spatially-aware climate data matching to apply per-cell climate values based on the configured strategy (spatial interpolation or index-based matching).

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML configuration file.

required

Returns:

Type Description
pd.DataFrame:

Results table with columns: run_id, cell_id, lat, lon, v_index, mean_temp, total_rain, score, label. df.attrs["run_fingerprint"] is set so callers can locate the run directory.

Raises:

Type Description
FileNotFoundError:

If config file, raster file, or climate CSV does not exist.

ValueError:

If configuration is invalid or no valid raster cells found in ROI.

Notes

All artifacts are written atomically under::

output_dir/runs/<run_fingerprint>/

If that directory already contains all three required artifacts (features.parquet, manifest.json, report.json) the pipeline detects the identical run and returns early (no-op rerun).

Source code in terraflow/pipeline.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
def run_pipeline(config_path: str | Path) -> pd.DataFrame:
    """Run the end-to-end pipeline and return a DataFrame of results.

    Uses spatially-aware climate data matching to apply per-cell climate values
    based on the configured strategy (spatial interpolation or index-based
    matching).

    Parameters
    ----------
    config_path:
        Path to YAML configuration file.

    Returns
    -------
    pd.DataFrame:
        Results table with columns: run_id, cell_id, lat, lon, v_index,
        mean_temp, total_rain, score, label.  ``df.attrs["run_fingerprint"]``
        is set so callers can locate the run directory.

    Raises
    ------
    FileNotFoundError:
        If config file, raster file, or climate CSV does not exist.
    ValueError:
        If configuration is invalid or no valid raster cells found in ROI.

    Notes
    -----
    All artifacts are written atomically under::

        output_dir/runs/<run_fingerprint>/

    If that directory already contains all three required artifacts
    (``features.parquet``, ``manifest.json``, ``report.json``) the pipeline
    detects the identical run and returns early (no-op rerun).
    """
    _t_total_start = time.perf_counter()
    config_path = Path(config_path)
    config_dict = load_config_dict(config_path)
    config_dir = config_path.resolve().parent

    # Resolve relative input/output paths against the config file's directory.
    for _key in ("raster_path", "climate_csv", "output_dir"):
        if _key in config_dict and config_dict[_key] is not None:
            _p = Path(str(config_dict[_key]))
            if not _p.is_absolute():
                config_dict[_key] = str((config_dir / _p).resolve())

    cfg: PipelineConfig = build_config(config_dict)
    logger.info(f"Loaded config from {config_path}")

    config_bytes_hash = hashlib.sha256(canonicalize_config(config_dict)).hexdigest()
    roi_hash = _resolve_roi_hash(config_dict, config_dir)
    input_paths = _collect_input_paths(config_dict, config_dir)
    input_fps = [fingerprint_file(str(path)) for path in input_paths]
    run_fingerprint = compute_run_fingerprint(config_dict, roi_hash, input_fps)
    logger.info(
        f"Computed run fingerprint {run_fingerprint} (config={config_bytes_hash}, inputs={len(input_fps)})"
    )

    output_dir = ensure_dir(cfg.output_dir)
    run_dir = ensure_dir(output_dir / "runs" / run_fingerprint)

    _required_artifacts = [
        run_dir / "features.parquet",
        run_dir / "manifest.json",
        run_dir / "report.json",
    ]
    if all(p.exists() for p in _required_artifacts):
        logger.info(f"Detected identical run {run_fingerprint} — all artifacts present, returning cached result.")
        df = pd.read_parquet(run_dir / "features.parquet")
        df.attrs["run_fingerprint"] = run_fingerprint
        df.attrs["run_dir"] = str(run_dir)
        return df

    _t_catalog_start = time.perf_counter()
    catalog = build_data_catalog(cfg.raster_path, cfg.climate_csv)
    _t_catalog = time.perf_counter() - _t_catalog_start

    _t_load_start = time.perf_counter()
    raster = load_raster(cfg.raster_path)
    raster_crs = raster.crs
    try:
        climate_df = load_climate_csv(cfg.climate_csv)
    except Exception:
        raster.close()
        raise
    _t_load = time.perf_counter() - _t_load_start

    _crs_str = f"EPSG:{raster_crs.to_epsg()}" if raster_crs is not None and raster_crs.to_epsg() else (str(raster_crs) if raster_crs else "None")
    logger.info(f"Loaded raster: {cfg.raster_path} (CRS: {_crs_str})")
    logger.info(f"Loaded climate data: {cfg.climate_csv}")

    _climate_crs = CRS.from_epsg(4326)
    _climate_crs_str = "EPSG:4326"
    if raster_crs is None:
        raise CRSMismatchError(
            f"Raster '{cfg.raster_path}' has no CRS (raster_crs=None). "
            f"Expected a raster reprojectable to climate CRS '{_climate_crs_str}'."
        )
    try:
        Transformer.from_crs(raster_crs, _climate_crs, always_xy=True)
    except _PyProjCRSError as exc:
        raise CRSMismatchError(
            f"Raster CRS '{raster_crs.to_string()}' is incompatible with "
            f"climate CRS '{_climate_crs_str}': {exc}"
        ) from exc

    _t_clip_start = time.perf_counter()
    clipped_data, clipped_transform = clip_raster_to_roi(
        raster,
        cfg.roi.model_dump(),
        roi_crs=cfg.roi.roi_crs,
    )
    _t_clip = time.perf_counter() - _t_clip_start
    logger.info("Clipped raster to ROI")

    rows: int
    cols: int
    rows, cols = clipped_data.shape
    n_total_cells = rows * cols

    valid_indices: List[tuple[int, int]] = [
        (r, c)
        for r in range(rows)
        for c in range(cols)
        if not np.ma.is_masked(clipped_data[r, c])
    ]
    n_valid_cells = len(valid_indices)
    n_nodata_cells = n_total_cells - n_valid_cells

    if not valid_indices:
        raise ValueError("No valid raster cells found in the specified ROI")

    interpolator = ClimateInterpolator(
        climate_df=climate_df,
        strategy=cfg.climate.strategy,
        interpolation_method=cfg.climate.interpolation_method,
        cell_id_column=cfg.climate.cell_id_column,
        fallback_to_mean=cfg.climate.fallback_to_mean,
    )
    logger.info(
        f"Initialized climate interpolator with strategy='{cfg.climate.strategy}', method='{interpolator.interpolation_method}'"
    )

    # Derive a deterministic seed from the run fingerprint so that cell
    # selection is bit-reproducible across independent executions with
    # identical inputs — closing the reproducibility gap from v0.2.1.
    _fp_seed = int.from_bytes(
        hashlib.sha256(run_fingerprint.encode()).digest()[:8], "little"
    )
    rng = np.random.default_rng(_fp_seed)
    max_cells = min(cfg.max_cells, n_valid_cells)
    _sample_idx = rng.choice(n_valid_cells, size=max_cells, replace=False)
    sampled_indices = [valid_indices[i] for i in _sample_idx]
    logger.info(f"Sampled {max_cells} cells from {n_valid_cells} valid cells in ROI")

    # Pre-compute cell centre coordinates in native raster CRS.
    _native_xs: List[float] = []
    _native_ys: List[float] = []
    for row, col in sampled_indices:
        x, y = xy(clipped_transform, row, col, offset="center")
        _native_xs.append(float(x))
        _native_ys.append(float(y))

    # Reproject to WGS84 (EPSG:4326).
    _wgs84 = CRS.from_epsg(4326)
    if raster_crs != _wgs84:
        _coord_tf = Transformer.from_crs(raster_crs, _wgs84, always_xy=True)
        _lons, _lats = _coord_tf.transform(_native_xs, _native_ys)
        cell_lons: List[float] = list(_lons)
        cell_lats: List[float] = list(_lats)
    else:
        cell_lons = _native_xs
        cell_lats = _native_ys

    _t_interp_start = time.perf_counter()
    cell_climate_df = interpolator.interpolate(np.array(cell_lats), np.array(cell_lons))
    _t_interp = time.perf_counter() - _t_interp_start
    logger.info(f"Interpolated climate for {len(sampled_indices)} cells using strategy='{cfg.climate.strategy}'")

    _t_score_start = time.perf_counter()
    records: List[Dict[str, Any]] = []

    # Detect kriging std columns (present when interpolation_method="kriging").
    _krig_std_cols = [c for c in cell_climate_df.columns if c.endswith("_krig_std")]

    for cell_id, (row, col) in enumerate(sampled_indices):
        v_index = float(clipped_data[row, col])
        lat = cell_lats[cell_id]
        lon = cell_lons[cell_id]
        mean_temp = float(cell_climate_df.iloc[cell_id]["mean_temp"])
        total_rain = float(cell_climate_df.iloc[cell_id]["total_rain"])

        score = suitability_score(
            v_index=v_index,
            mean_temp=mean_temp,
            total_rain=total_rain,
            params=cfg.model_params,
        )
        label = suitability_label(score)

        record: Dict[str, Any] = {
            "run_id": run_fingerprint,
            "cell_id": cell_id,
            "lat": lat,
            "lon": lon,
            "v_index": v_index,
            "mean_temp": mean_temp,
            "total_rain": total_rain,
            "score": score,
            "label": label,
        }
        # Append per-cell kriging std columns when kriging was used.
        for ksc in _krig_std_cols:
            record[ksc] = float(cell_climate_df.iloc[cell_id][ksc])

        records.append(record)

    df = pd.DataFrame.from_records(records)

    # --- Monte Carlo uncertainty propagation ---------------------------------
    # Requires: uncertainty_samples > 0 AND kriging std columns present.
    # Perturbs mean_temp and total_rain independently using their per-cell
    # kriging standard deviations, then scores each draw to derive 5th/95th
    # percentile confidence intervals on the suitability score.
    _mc_ci_cols: List[str] = []
    _n_mc = cfg.model_params.uncertainty_samples
    if _n_mc > 0:
        if _krig_std_cols:
            _mc_ci_cols = ["score_ci_low", "score_ci_high"]
            _n_cells = len(df)
            _v_arr = df["v_index"].to_numpy()
            _temp_arr = df["mean_temp"].to_numpy()
            _rain_arr = df["total_rain"].to_numpy()
            # Clip std to ≥ 0 (kriging variance is non-negative but may have
            # tiny floating-point negatives at station locations).
            _temp_std = np.maximum(df["mean_temp_krig_std"].to_numpy(), 0.0)
            _rain_std = np.maximum(df["total_rain_krig_std"].to_numpy(), 0.0)

            # Shape: (n_cells, n_mc) — rng continues deterministic stream.
            _temp_mc = rng.normal(
                _temp_arr[:, None], _temp_std[:, None], (_n_cells, _n_mc)
            )
            _rain_mc = rng.normal(
                _rain_arr[:, None], _rain_std[:, None], (_n_cells, _n_mc)
            )
            # v_index has no per-cell uncertainty estimate; broadcast constant.
            _v_mc = np.broadcast_to(_v_arr[:, None], (_n_cells, _n_mc))

            _scores_mc = suitability_score_array(
                _v_mc, _temp_mc, _rain_mc, cfg.model_params
            )  # (n_cells, n_mc), values in [0, 1]

            df["score_ci_low"] = np.percentile(_scores_mc, 5, axis=1)
            df["score_ci_high"] = np.percentile(_scores_mc, 95, axis=1)
            logger.info(f"Monte Carlo uncertainty propagation complete: {_n_mc} samples per cell")
        else:
            logger.warning(
                f"uncertainty_samples={_n_mc} but no kriging std columns available; "
                "skipping Monte Carlo. Set interpolation_method='kriging' to enable."
            )

    # Enforce stable base column order, then append kriging std and CI columns.
    df = df[FEATURES_COLUMNS_ORDERED + _krig_std_cols + _mc_ci_cols]
    _t_score = time.perf_counter() - _t_score_start

    raster.close()
    logger.info("Closed raster dataset")

    _t_write_start = time.perf_counter()

    _atomic_write_parquet(
        run_dir / "features.parquet",
        df,
        schema_meta={"run_fingerprint": run_fingerprint},
    )

    _atomic_write_text(run_dir / "results.csv", df.to_csv(index=False))

    git_sha = _get_git_sha()
    input_fp_records = [
        {
            "path": fp["path"],
            "sha256": fp["sha256"],
            "size_bytes": fp["size_bytes"],
            # mtime kept for human-readable provenance; NOT part of fingerprint
            "mtime": fp.get("mtime"),
        }
        for fp in input_fps
    ]
    manifest: Dict[str, Any] = {
        "schema_version": MANIFEST_SCHEMA_VERSION,
        "run_fingerprint": run_fingerprint,
        "code_version": _get_package_version(),
        "git_sha": git_sha,
        "created_at_utc": datetime.now(timezone.utc).isoformat(),
        "config": config_dict,
        "input_fingerprints": input_fp_records,
        "catalog": catalog.to_provenance(),
        "output_files": [
            "features.parquet",
            "results.csv",
            "manifest.json",
            "report.json",
        ],
    }
    _atomic_write_text(
        run_dir / "manifest.json",
        json.dumps(manifest, indent=2, default=str),
    )

    raster_data_for_report = clipped_data.compressed().astype("float64")
    climate_var_cols = [c for c in climate_df.columns if c not in ("lat", "lon")]
    climate_var_stats: Dict[str, Any] = {}
    for var in climate_var_cols:
        col = climate_df[var].dropna()
        climate_var_stats[var] = {
            "mean": float(col.mean()) if len(col) else None,
            "min": float(col.min()) if len(col) else None,
            "max": float(col.max()) if len(col) else None,
            "n_nodata": int(climate_df[var].isna().sum()),
        }

    scores_arr = df["score"].to_numpy()
    _t_total = time.perf_counter() - _t_total_start
    _t_write = time.perf_counter() - _t_write_start

    report: Dict[str, Any] = {
        "schema_version": REPORT_SCHEMA_VERSION,
        "run_fingerprint": run_fingerprint,
        "coverage": {
            "n_raster_cells_total": n_total_cells,
            "n_roi_valid_cells": n_valid_cells,
            "n_roi_nodata_cells": n_nodata_cells,
            "roi_coverage_fraction": (
                round(n_valid_cells / n_total_cells, 6) if n_total_cells else 0.0
            ),
            "roi_nodata_fraction": (
                round(n_nodata_cells / n_total_cells, 6) if n_total_cells else 0.0
            ),
        },
        "raster_stats": {
            "count": int(raster_data_for_report.size),
            "mean": float(raster_data_for_report.mean()) if raster_data_for_report.size else None,
            "std": float(raster_data_for_report.std(ddof=1)) if raster_data_for_report.size > 1 else 0.0,
            "min": float(raster_data_for_report.min()) if raster_data_for_report.size else None,
            "max": float(raster_data_for_report.max()) if raster_data_for_report.size else None,
        },
        "climate_stats": {
            "n_rows": len(climate_df),
            "variables": climate_var_stats,
        },
        "n_cells_sampled": len(records),
        "score_stats": {
            "mean": float(scores_arr.mean()) if len(scores_arr) else None,
            "std": float(scores_arr.std(ddof=1)) if len(scores_arr) > 1 else 0.0,
            "min": float(scores_arr.min()) if len(scores_arr) else None,
            "max": float(scores_arr.max()) if len(scores_arr) else None,
        },
        "timings_sec": {
            "build_catalog": round(_t_catalog, 4),
            "load_inputs": round(_t_load, 4),
            "clip_roi": round(_t_clip, 4),
            "interpolate_climate": round(_t_interp, 4),
            "score_cells": round(_t_score, 4),
            "write_outputs": round(_t_write, 4),
            "total": round(_t_total, 4),
        },
    }
    # Include LOOCV cross-validation metrics when kriging was used.
    if interpolator.cv_metrics:
        # VALD-03: expose per-variable LOOCV RMSE under 'kriging_loocv'
        report["kriging_loocv"] = {
            var: round(stats["rmse"], 6)
            for var, stats in interpolator.cv_metrics.get("per_variable", {}).items()
            if stats.get("rmse") is not None
        }
        report["interpolation_cv"] = interpolator.cv_metrics  # retain for compat

    # Include variogram diagnostics when kriging was used.
    if interpolator.variogram_params:
        report["kriging_diagnostics"] = interpolator.variogram_params

    # Include Monte Carlo uncertainty summary when CI columns were produced.
    if _mc_ci_cols:
        _ci_low = df["score_ci_low"].to_numpy()
        _ci_high = df["score_ci_high"].to_numpy()
        report["uncertainty"] = {
            "method": "monte_carlo",
            "n_samples": _n_mc,
            "perturbed_variables": ["mean_temp", "total_rain"],
            "ci_low_pct": 5,
            "ci_high_pct": 95,
            "score_ci_low_mean": float(_ci_low.mean()),
            "score_ci_high_mean": float(_ci_high.mean()),
            "mean_ci_width": float((_ci_high - _ci_low).mean()),
        }

    _atomic_write_text(
        run_dir / "report.json",
        json.dumps(report, indent=2, default=str),
    )

    logger.info(
        f"Artifacts written to {run_dir} (fingerprint={run_fingerprint}, cells={len(records)}, total={_t_total:.2f}s)"
    )

    df.attrs["run_fingerprint"] = run_fingerprint
    df.attrs["run_dir"] = str(run_dir)
    return df

model

suitability_label(score)

Bucket suitability score into qualitative labels.

Parameters:

Name Type Description Default
score float

Suitability score in [0, 1] range.

required

Returns:

Name Type Description
str str

One of 'low', 'medium', or 'high' based on score thresholds.

Notes
  • 'low': score < 0.33
  • 'medium': 0.33 <= score < 0.66
  • 'high': score >= 0.66
Source code in terraflow/model.py
def suitability_label(score: float) -> str:
    """
    Bucket suitability score into qualitative labels.

    Parameters
    ----------
    score:
        Suitability score in [0, 1] range.

    Returns
    -------
    str:
        One of 'low', 'medium', or 'high' based on score thresholds.

    Notes
    -----
    - 'low': score < 0.33
    - 'medium': 0.33 <= score < 0.66
    - 'high': score >= 0.66
    """
    if score < 0.33:
        return "low"
    if score < 0.66:
        return "medium"
    return "high"

suitability_score(v_index, mean_temp, total_rain, params)

Compute a simple suitability score in [0, 1].

Combines normalized vegetation index, temperature, and rainfall using weighted linear combination. All inputs are normalized to [0, 1] range based on the parameter min/max bounds, then combined using the weights.

Parameters:

Name Type Description Default
v_index float

Vegetation index value (typically 0-1, but can be outside range).

required
mean_temp float

Mean temperature in degrees Celsius.

required
total_rain float

Total rainfall in millimeters.

required
params ModelParams

Model parameters containing min/max bounds and weights.

required

Returns:

Name Type Description
float float

Suitability score in [0, 1] range.

Notes
  • Out-of-range inputs are clipped to [0, 1] during normalization
  • Weights should sum to 1.0 for proper combination
  • Final result is clipped to [0, 1] range
Source code in terraflow/model.py
def suitability_score(
    v_index: float,
    mean_temp: float,
    total_rain: float,
    params: ModelParams,
) -> float:
    """
    Compute a simple suitability score in [0, 1].

    Combines normalized vegetation index, temperature, and rainfall using
    weighted linear combination. All inputs are normalized to [0, 1] range
    based on the parameter min/max bounds, then combined using the weights.

    Parameters
    ----------
    v_index:
        Vegetation index value (typically 0-1, but can be outside range).
    mean_temp:
        Mean temperature in degrees Celsius.
    total_rain:
        Total rainfall in millimeters.
    params:
        Model parameters containing min/max bounds and weights.

    Returns
    -------
    float:
        Suitability score in [0, 1] range.

    Notes
    -----
    - Out-of-range inputs are clipped to [0, 1] during normalization
    - Weights should sum to 1.0 for proper combination
    - Final result is clipped to [0, 1] range
    """
    v_n = normalize(v_index, params.v_min, params.v_max)
    t_n = normalize(mean_temp, params.t_min, params.t_max)
    r_n = normalize(total_rain, params.r_min, params.r_max)

    score = params.w_v * v_n + params.w_t * t_n + params.w_r * r_n
    return max(0.0, min(1.0, score))

suitability_score_array(v_index, mean_temp, total_rain, params)

Vectorized suitability scoring over numpy arrays.

Equivalent to calling :func:suitability_score element-wise but operates on arbitrary-shape numpy arrays. Used internally for Monte Carlo uncertainty propagation.

Parameters:

Name Type Description Default
v_index ndarray

Vegetation index values (any shape).

required
mean_temp ndarray

Mean temperature values in °C (same shape as v_index).

required
total_rain ndarray

Total rainfall values in mm (same shape as v_index).

required
params ModelParams

Model parameters containing min/max bounds and weights.

required

Returns:

Type Description
np.ndarray:

Suitability scores clipped to [0, 1], same shape as inputs.

Source code in terraflow/model.py
def suitability_score_array(
    v_index: "np.ndarray",
    mean_temp: "np.ndarray",
    total_rain: "np.ndarray",
    params: ModelParams,
) -> "np.ndarray":
    """Vectorized suitability scoring over numpy arrays.

    Equivalent to calling :func:`suitability_score` element-wise but operates
    on arbitrary-shape numpy arrays.  Used internally for Monte Carlo
    uncertainty propagation.

    Parameters
    ----------
    v_index:
        Vegetation index values (any shape).
    mean_temp:
        Mean temperature values in °C (same shape as *v_index*).
    total_rain:
        Total rainfall values in mm (same shape as *v_index*).
    params:
        Model parameters containing min/max bounds and weights.

    Returns
    -------
    np.ndarray:
        Suitability scores clipped to [0, 1], same shape as inputs.
    """

    def _norm(x: "np.ndarray", lo: float, hi: float) -> "np.ndarray":
        if hi == lo:
            return np.zeros_like(x, dtype=float)
        return np.clip((x - lo) / (hi - lo), 0.0, 1.0)

    score = (
        params.w_v * _norm(v_index, params.v_min, params.v_max)
        + params.w_t * _norm(mean_temp, params.t_min, params.t_max)
        + params.w_r * _norm(total_rain, params.r_min, params.r_max)
    )
    return np.clip(score, 0.0, 1.0)