Skip to content

terraflow.core

The core modules define the configuration schema and the end-to-end pipeline.

Quick Start

from terraflow.pipeline import run_pipeline
from terraflow.config import load_config

# Load and validate configuration
config = load_config("config.yml")

# Run the complete pipeline
results_df = run_pipeline("config.yml")

Module Organization

  • config: YAML schema validation and Pydantic models
  • pipeline: End-to-end orchestration and artifact generation
  • model: Suitability scoring algorithms and label assignment

API Reference

config

ClimateConfig

Bases: BaseModel

Climate data configuration for spatial matching and interpolation.

Supports two strategies for aligning climate observations to raster cells: - 'spatial': Interpolate climate values using geographic coordinates (lat/lon). - 'index': Match cells to climate records by row index or explicit cell ID.

When strategy is 'spatial', three interpolation methods are available: - 'linear': Delaunay-triangulation linear interpolation (scipy.griddata, default). - 'kriging': Ordinary Kriging — geostatistically optimal with per-cell uncertainty (requires pykrige; ≥5 stations recommended). - 'idw': Inverse Distance Weighting — fast, no uncertainty estimate.

Attributes:

Name Type Description
strategy Literal['spatial', 'index']

Matching strategy: 'spatial' (interpolation) or 'index' (direct matching).

interpolation_method Literal['linear', 'kriging', 'idw']

Interpolation algorithm when strategy='spatial'. Choices: 'linear' (default), 'kriging', 'idw'.

cell_id_column str | None

Column name in climate CSV for cell ID (used with 'index' strategy). If None with 'index' strategy, uses row order matching.

fallback_to_mean bool

If True, use global mean climate for cells outside interpolation range or when climate data is sparse (default True).

validate_config()

Validate consistency of climate configuration.

Source code in terraflow/config.py
def validate_config(self) -> None:
    """Validate consistency of climate configuration."""
    if self.strategy != "spatial" and self.variogram_mode != "standard":
        raise ValueError(
            "variogram_mode only applies when climate.strategy='spatial'"
        )
    if self.interpolation_method != "kriging" and self.variogram_mode != "standard":
        raise ValueError(
            "variogram_mode only applies when interpolation_method='kriging'"
        )

validate_interpolation_method(v) classmethod

Ensure interpolation_method is valid.

Source code in terraflow/config.py
@field_validator("interpolation_method")
@classmethod
def validate_interpolation_method(cls, v: str) -> str:
    """Ensure interpolation_method is valid."""
    if v not in ("linear", "kriging", "idw"):
        raise ValueError(
            f"interpolation_method must be 'linear', 'kriging', or 'idw', got '{v}'"
        )
    return v

validate_strategy(v) classmethod

Ensure strategy is valid.

Source code in terraflow/config.py
@field_validator("strategy")
@classmethod
def validate_strategy(cls, v: str) -> str:
    """Ensure strategy is valid."""
    if v not in ("spatial", "index"):
        raise ValueError(f"strategy must be 'spatial' or 'index', got '{v}'")
    return v

ExportConfig

Bases: BaseModel

Configuration for H3 export (Phase 4).

ModelParams

Bases: BaseModel

Parameters for normalization and weighting in the suitability model.

The weights w_v, w_t, w_r should sum to approximately 1.0 for proper normalization, though this is not strictly enforced.

Attributes:

Name Type Description
v_min, v_max

Min/max vegetation index values for normalization.

t_min, t_max

Min/max temperature values for normalization (in °C).

r_min, r_max

Min/max rainfall values for normalization (in mm).

w_v, w_t, w_r

Weights for vegetation, temperature, and rainfall (should sum to 1.0).

uncertainty_samples int

Number of Monte Carlo draws per cell for uncertainty propagation. 0 (default) disables Monte Carlo. Requires interpolation_method='kriging' to produce per-cell standard deviations. Adds score_ci_low and score_ci_high columns to features.parquet and an uncertainty block to report.json.

validate_max_values(v, info) classmethod

Ensure max values are reasonable numbers.

Source code in terraflow/config.py
@field_validator("v_max", "t_max", "r_max")
@classmethod
def validate_max_values(cls, v: float, info) -> float:
    """Ensure max values are reasonable numbers."""
    if not isinstance(v, (int, float)):
        raise ValueError(f"Max value must be numeric, got {type(v)}")
    return v

validate_min_values(v, info) classmethod

Ensure min values are reasonable numbers.

Source code in terraflow/config.py
@field_validator("v_min", "t_min", "r_min")
@classmethod
def validate_min_values(cls, v: float, info) -> float:
    """Ensure min values are reasonable numbers."""
    if not isinstance(v, (int, float)):
        raise ValueError(f"Min value must be numeric, got {type(v)}")
    return v

validate_ranges()

Validate that min < max for all ranges. Call after initialization.

Source code in terraflow/config.py
def validate_ranges(self) -> None:
    """Validate that min < max for all ranges. Call after initialization."""
    if self.v_min >= self.v_max:
        raise ValueError(
            f"v_min ({self.v_min}) must be less than v_max ({self.v_max})"
        )
    if self.t_min >= self.t_max:
        raise ValueError(
            f"t_min ({self.t_min}) must be less than t_max ({self.t_max})"
        )
    if self.r_min >= self.r_max:
        raise ValueError(
            f"r_min ({self.r_min}) must be less than r_max ({self.r_max})"
        )

    # Check weights are non-negative
    if self.w_v < 0 or self.w_t < 0 or self.w_r < 0:
        raise ValueError("Weights must be non-negative")

    # Check weights sum to approximately 1.0 (allow small tolerance)
    weight_sum = self.w_v + self.w_t + self.w_r
    if abs(weight_sum - 1.0) > 0.01:
        raise ValueError(
            f"Weights must sum to approximately 1.0, got {weight_sum:.3f} "
            f"(w_v={self.w_v}, w_t={self.w_t}, w_r={self.w_r})"
        )

validate_uncertainty_samples(v) classmethod

Ensure uncertainty_samples is non-negative.

Source code in terraflow/config.py
@field_validator("uncertainty_samples")
@classmethod
def validate_uncertainty_samples(cls, v: int) -> int:
    """Ensure uncertainty_samples is non-negative."""
    if v < 0:
        raise ValueError(f"uncertainty_samples must be >= 0, got {v}")
    return v

PipelineConfig

Bases: BaseModel

Top-level pipeline configuration.

Attributes:

Name Type Description
raster_path Path

Path to the input raster file (GeoTIFF).

climate_csv Path

Path to the climate data CSV file.

output_dir Path

Directory for output results.

roi ROI

Region of interest specification.

model_params ModelParams

Suitability model parameters.

climate ClimateConfig

Climate data configuration (strategy, fallback behavior, etc.).

max_cells int

Maximum number of cells to sample from the ROI (default 500).

validate_all()

Validate all constraints. Call after model initialization.

Source code in terraflow/config.py
def validate_all(self) -> None:
    """Validate all constraints. Call after model initialization."""
    self.roi.validate_bounds()
    self.model_params.validate_ranges()
    self.climate.validate_config()

validate_max_cells(v) classmethod

Ensure max_cells is a positive integer.

Source code in terraflow/config.py
@field_validator("max_cells")
@classmethod
def validate_max_cells(cls, v: int) -> int:
    """Ensure max_cells is a positive integer."""
    if v <= 0:
        raise ValueError(f"max_cells must be positive, got {v}")
    return v

validate_raster_band(v) classmethod

Ensure raster_band is a positive 1-based rasterio band index.

Existence of the band in the actual file is verified at pipeline runtime, because we cannot know the band count at config-load time without opening the raster (#42).

Source code in terraflow/config.py
@field_validator("raster_band")
@classmethod
def validate_raster_band(cls, v: int) -> int:
    """Ensure raster_band is a positive 1-based rasterio band index.

    Existence of the band in the actual file is verified at pipeline
    runtime, because we cannot know the band count at config-load time
    without opening the raster (#42).
    """
    if v < 1:
        raise ValueError(
            f"raster_band is a 1-based rasterio index; must be >= 1, got {v}"
        )
    return v

ROI

Bases: BaseModel

Region of interest. For now we support only a bounding box.

Attributes:

Name Type Description
type Literal['bbox']

Type of ROI (currently only 'bbox' is supported).

xmin, ymin, xmax, ymax

Bounding box coordinates expressed in roi_crs (default WGS 84 degrees).

roi_crs str

EPSG code or WKT string for the CRS of the bounding box coordinates. Defaults to "EPSG:4326" (latitude/longitude degrees). The pipeline automatically reprojects to the raster's native CRS before clipping.

validate_bounds()

Validate that bounds are sensible. Call after initialization.

Source code in terraflow/config.py
def validate_bounds(self) -> None:
    """Validate that bounds are sensible. Call after initialization."""
    if self.xmin >= self.xmax:
        raise ValueError(f"xmin ({self.xmin}) must be less than xmax ({self.xmax})")
    if self.ymin >= self.ymax:
        raise ValueError(f"ymin ({self.ymin}) must be less than ymax ({self.ymax})")

SensitivityConfig

Bases: BaseModel

Configuration for sensitivity analysis (per D-04).

ValidationConfig

Bases: BaseModel

Configuration for model validation (Phase 3).

WeightBounds

Bases: BaseModel

Bounds for a single weight parameter in sensitivity analysis.

build_config(data)

Validate a raw config dict into a PipelineConfig.

Source code in terraflow/config.py
def build_config(data: dict) -> PipelineConfig:
    """Validate a raw config dict into a PipelineConfig."""
    try:
        cfg = PipelineConfig.model_validate(data)
        cfg.validate_all()
        return cfg
    except ValueError as e:
        raise ValueError(f"Configuration validation failed: {e}") from e

load_config(path)

Load YAML config from disk and validate with Pydantic.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML configuration file.

required

Returns:

Name Type Description
PipelineConfig PipelineConfig

Validated configuration object.

Raises:

Type Description
FileNotFoundError:

If the config file does not exist.

yaml.YAMLError:

If the YAML is malformed.

ValueError:

If the configuration is invalid.

Source code in terraflow/config.py
def load_config(path: str | Path) -> PipelineConfig:
    """Load YAML config from disk and validate with Pydantic.

    Parameters
    ----------
    path:
        Path to the YAML configuration file.

    Returns
    -------
    PipelineConfig:
        Validated configuration object.

    Raises
    ------
    FileNotFoundError:
        If the config file does not exist.
    yaml.YAMLError:
        If the YAML is malformed.
    ValueError:
        If the configuration is invalid.
    """
    data = load_config_dict(path)
    return build_config(data)

load_config_dict(path)

Load YAML config from disk into a raw dict.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML configuration file.

required

Returns:

Name Type Description
dict dict

Parsed configuration as a Python dict.

Raises:

Type Description
FileNotFoundError:

If the config file does not exist.

yaml.YAMLError:

If the YAML is malformed.

Source code in terraflow/config.py
def load_config_dict(path: str | Path) -> dict:
    """Load YAML config from disk into a raw dict.

    Parameters
    ----------
    path:
        Path to the YAML configuration file.

    Returns
    -------
    dict:
        Parsed configuration as a Python dict.

    Raises
    ------
    FileNotFoundError:
        If the config file does not exist.
    yaml.YAMLError:
        If the YAML is malformed.
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Configuration file not found: {path}")

    try:
        with path.open("r", encoding="utf-8") as f:
            data = yaml.safe_load(f)
    except yaml.YAMLError as e:
        raise ValueError(f"Failed to parse YAML config: {e}") from e

    if data is None:
        raise ValueError("Configuration file is empty")

    return data

pipeline

Pipeline orchestration for TerraFlow.

All outputs are written atomically to::

output_dir/runs/<run_fingerprint>/

Every run produces exactly three artifacts:

  • features.parquet — per-cell suitability features (tidy/wide schema v1)
  • manifest.json — config snapshot, run identity, input provenance
  • report.json — QA summaries, coverage metrics, step timings

A backward-compatible results.csv is also written to the same run directory so that callers relying on the previous output path can migrate at their own pace.

resolve_run_dir(config_path)

Return the run directory for a given config without running the pipeline.

Uses the same fingerprint computation as run_pipeline() so that terraflow validate always targets the correct run, not the most recently modified one.

Parameters:

Name Type Description Default
config_path Path | str

Path to a TerraFlow YAML config file.

required

Returns:

Name Type Description
Path Path

output_dir/runs/<fingerprint> — may not exist if the pipeline has not been run yet.

Source code in terraflow/pipeline.py
def resolve_run_dir(config_path: Path | str) -> Path:
    """Return the run directory for a given config without running the pipeline.

    Uses the same fingerprint computation as ``run_pipeline()`` so that
    ``terraflow validate`` always targets the correct run, not the most
    recently modified one.

    Parameters
    ----------
    config_path:
        Path to a TerraFlow YAML config file.

    Returns
    -------
    Path:
        ``output_dir/runs/<fingerprint>`` — may not exist if the pipeline
        has not been run yet.
    """
    config_path = Path(config_path)
    config_dict = load_config_dict(config_path)
    config_dir = config_path.resolve().parent

    for _key in ("raster_path", "climate_csv", "output_dir"):
        if _key in config_dict and config_dict[_key] is not None:
            _p = Path(str(config_dict[_key]))
            if not _p.is_absolute():
                config_dict[_key] = str((config_dir / _p).resolve())

    cfg: PipelineConfig = build_config(config_dict)
    roi_hash = _resolve_roi_hash(config_dict, config_dir)
    input_paths = _collect_input_paths(config_dict, config_dir)
    input_fps = [fingerprint_file(str(p)) for p in input_paths]
    run_fingerprint = compute_run_fingerprint(config_dict, roi_hash, input_fps)

    output_dir = Path(cfg.output_dir)
    return output_dir / "runs" / run_fingerprint

run_pipeline(config_path)

Run the end-to-end pipeline and return a DataFrame of results.

Uses spatially-aware climate data matching to apply per-cell climate values based on the configured strategy (spatial interpolation or index-based matching).

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML configuration file.

required

Returns:

Type Description
pd.DataFrame:

Results table with columns: run_id, cell_id, lat, lon, v_index, mean_temp, total_rain, score, label. df.attrs["run_fingerprint"] is set so callers can locate the run directory.

Raises:

Type Description
FileNotFoundError:

If config file, raster file, or climate CSV does not exist.

ValueError:

If configuration is invalid or no valid raster cells found in ROI.

Notes

All artifacts are written atomically under::

output_dir/runs/<run_fingerprint>/

If that directory already contains all three required artifacts (features.parquet, manifest.json, report.json) the pipeline detects the identical run and returns early (no-op rerun).

Source code in terraflow/pipeline.py
def run_pipeline(config_path: str | Path) -> pd.DataFrame:
    """Run the end-to-end pipeline and return a DataFrame of results.

    Uses spatially-aware climate data matching to apply per-cell climate values
    based on the configured strategy (spatial interpolation or index-based
    matching).

    Parameters
    ----------
    config_path:
        Path to YAML configuration file.

    Returns
    -------
    pd.DataFrame:
        Results table with columns: run_id, cell_id, lat, lon, v_index,
        mean_temp, total_rain, score, label.  ``df.attrs["run_fingerprint"]``
        is set so callers can locate the run directory.

    Raises
    ------
    FileNotFoundError:
        If config file, raster file, or climate CSV does not exist.
    ValueError:
        If configuration is invalid or no valid raster cells found in ROI.

    Notes
    -----
    All artifacts are written atomically under::

        output_dir/runs/<run_fingerprint>/

    If that directory already contains all three required artifacts
    (``features.parquet``, ``manifest.json``, ``report.json``) the pipeline
    detects the identical run and returns early (no-op rerun).
    """
    _t_total_start = time.perf_counter()
    config_path = Path(config_path)
    config_dict = load_config_dict(config_path)
    config_dir = config_path.resolve().parent

    for _key in ("raster_path", "climate_csv", "output_dir"):
        if _key in config_dict and config_dict[_key] is not None:
            _p = Path(str(config_dict[_key]))
            if not _p.is_absolute():
                config_dict[_key] = str((config_dir / _p).resolve())

    cfg: PipelineConfig = build_config(config_dict)
    logger.info(f"Loaded config from {config_path}")
    config_bytes_hash = hashlib.sha256(canonicalize_config(config_dict)).hexdigest()
    roi_hash = _resolve_roi_hash(config_dict, config_dir)
    input_paths = _collect_input_paths(config_dict, config_dir)
    input_fps = [fingerprint_file(str(path)) for path in input_paths]
    run_fingerprint = compute_run_fingerprint(config_dict, roi_hash, input_fps)
    logger.info(
        f"Computed run fingerprint {run_fingerprint} (config={config_bytes_hash}, inputs={len(input_fps)})"
    )

    output_dir = ensure_dir(cfg.output_dir)
    run_dir = ensure_dir(output_dir / "runs" / run_fingerprint)

    _required_artifacts = [
        run_dir / "features.parquet",
        run_dir / "manifest.json",
        run_dir / "report.json",
    ]
    if all(p.exists() for p in _required_artifacts):
        cached_version = _read_features_schema_version(run_dir / "features.parquet")
        if cached_version == FEATURES_SCHEMA_VERSION:
            logger.info(
                f"Detected identical run {run_fingerprint} — all artifacts present, returning cached result."
            )
            df = pd.read_parquet(run_dir / "features.parquet")
            df.attrs["run_fingerprint"] = run_fingerprint
            df.attrs["run_dir"] = str(run_dir)
            return df
        logger.warning(
            "Cached features.parquet at %s has schema_version=%r but current "
            "FEATURES_SCHEMA_VERSION=%r; invalidating cache and recomputing (#39).",
            run_dir,
            cached_version,
            FEATURES_SCHEMA_VERSION,
        )

    _t_catalog_start = time.perf_counter()
    catalog = build_data_catalog(cfg.raster_path, cfg.climate_csv)
    _t_catalog = time.perf_counter() - _t_catalog_start

    _t_load_start = time.perf_counter()
    raster = load_raster(cfg.raster_path)
    raster_crs = raster.crs
    try:
        climate_df = load_climate_csv(cfg.climate_csv)
    except Exception:
        raster.close()
        raise
    _t_load = time.perf_counter() - _t_load_start

    _crs_str = (
        f"EPSG:{raster_crs.to_epsg()}"
        if raster_crs is not None and raster_crs.to_epsg()
        else (str(raster_crs) if raster_crs else "None")
    )
    logger.info(f"Loaded raster: {cfg.raster_path} (CRS: {_crs_str})")
    logger.info(f"Loaded climate data: {cfg.climate_csv}")

    _climate_crs = CRS.from_epsg(4326)
    _climate_crs_str = "EPSG:4326"
    if raster_crs is None:
        raise CRSMismatchError(
            f"Raster '{cfg.raster_path}' has no CRS (raster_crs=None). "
            f"Expected a raster reprojectable to climate CRS '{_climate_crs_str}'."
        )
    try:
        Transformer.from_crs(raster_crs, _climate_crs, always_xy=True)
    except _PyProjCRSError as exc:
        raise CRSMismatchError(
            f"Raster CRS '{raster_crs.to_string()}' is incompatible with "
            f"climate CRS '{_climate_crs_str}': {exc}"
        ) from exc

    _t_clip_start = time.perf_counter()
    clipped_data, clipped_transform = clip_raster_to_roi(
        raster,
        cfg.roi.model_dump(),
        roi_crs=cfg.roi.roi_crs,
        band=cfg.raster_band,
    )
    _t_clip = time.perf_counter() - _t_clip_start
    logger.info("Clipped raster to ROI")

    rows, cols = clipped_data.shape
    n_total_cells = rows * cols
    valid_indices: List[tuple[int, int]] = [
        (r, c)
        for r in range(rows)
        for c in range(cols)
        if not np.ma.is_masked(clipped_data[r, c])
    ]
    n_valid_cells = len(valid_indices)
    n_nodata_cells = n_total_cells - n_valid_cells

    if not valid_indices:
        raise ValueError("No valid raster cells found in the specified ROI")

    interpolator = ClimateInterpolator(
        climate_df=climate_df,
        strategy=cfg.climate.strategy,
        interpolation_method=cfg.climate.interpolation_method,
        variogram_mode=cfg.climate.variogram_mode,
        cell_id_column=cfg.climate.cell_id_column,
        fallback_to_mean=cfg.climate.fallback_to_mean,
    )
    logger.info(
        f"Initialized climate interpolator with strategy='{cfg.climate.strategy}', method='{interpolator.interpolation_method}'"
    )

    # Derive a deterministic seed from the run fingerprint so that cell
    # selection is bit-reproducible across independent executions with
    # identical inputs — closing the reproducibility gap from v0.2.1.
    _fp_seed = int.from_bytes(
        hashlib.sha256(run_fingerprint.encode()).digest()[:8], "little"
    )
    rng = np.random.default_rng(_fp_seed)
    max_cells = min(cfg.max_cells, n_valid_cells)
    _sample_idx = rng.choice(n_valid_cells, size=max_cells, replace=False)
    sampled_indices = [valid_indices[i] for i in _sample_idx]
    logger.info(f"Sampled {max_cells} cells from {n_valid_cells} valid cells in ROI")

    cell_lats, cell_lons = _project_cells_to_wgs84(
        sampled_indices, clipped_transform, raster_crs
    )

    _t_interp_start = time.perf_counter()
    cell_climate_df = interpolator.interpolate(np.array(cell_lats), np.array(cell_lons))
    _t_interp = time.perf_counter() - _t_interp_start
    logger.info(
        f"Interpolated climate for {len(sampled_indices)} cells using strategy='{cfg.climate.strategy}'"
    )

    _t_score_start = time.perf_counter()
    df, krig_std_cols = _score_cells(
        clipped_data,
        sampled_indices,
        cell_lats,
        cell_lons,
        cell_climate_df,
        cfg,
        run_fingerprint,
    )
    df, mc_ci_cols = _apply_monte_carlo(df, krig_std_cols, cfg, rng)
    df = df[FEATURES_COLUMNS_ORDERED + krig_std_cols + mc_ci_cols]
    _t_score = time.perf_counter() - _t_score_start

    raster.close()
    logger.info("Closed raster dataset")

    _t_write_start = time.perf_counter()

    _atomic_write_parquet(
        run_dir / "features.parquet",
        df,
        schema_meta={"run_fingerprint": run_fingerprint},
    )
    _atomic_write_text(run_dir / "results.csv", df.to_csv(index=False))

    git_sha = _get_git_sha()
    input_fp_records = [
        {
            "path": fp["path"],
            "sha256": fp["sha256"],
            "size_bytes": fp["size_bytes"],
            # mtime kept for human-readable provenance; NOT part of fingerprint
            "mtime": fp.get("mtime"),
        }
        for fp in input_fps
    ]
    manifest: Dict[str, Any] = {
        "schema_version": MANIFEST_SCHEMA_VERSION,
        "run_fingerprint": run_fingerprint,
        "code_version": _get_package_version(),
        "git_sha": git_sha,
        "created_at_utc": datetime.now(timezone.utc).isoformat(),
        "config": config_dict,
        "input_fingerprints": input_fp_records,
        "catalog": catalog.to_provenance(),
        "output_files": [
            "features.parquet",
            "results.csv",
            "manifest.json",
            "report.json",
        ],
    }
    _atomic_write_text(
        run_dir / "manifest.json", json.dumps(manifest, indent=2, default=str)
    )

    _t_total = time.perf_counter() - _t_total_start
    _t_write = time.perf_counter() - _t_write_start

    timings = {
        "build_catalog": round(_t_catalog, 4),
        "load_inputs": round(_t_load, 4),
        "clip_roi": round(_t_clip, 4),
        "interpolate_climate": round(_t_interp, 4),
        "score_cells": round(_t_score, 4),
        "write_outputs": round(_t_write, 4),
        "total": round(_t_total, 4),
    }
    report = _build_report(
        run_fingerprint,
        clipped_data,
        climate_df,
        df,
        n_cells_sampled=len(df),
        n_total_cells=n_total_cells,
        n_valid_cells=n_valid_cells,
        n_nodata_cells=n_nodata_cells,
        interpolator=interpolator,
        mc_ci_cols=mc_ci_cols,
        n_mc=cfg.model_params.uncertainty_samples,
        timings=timings,
    )
    _atomic_write_text(
        run_dir / "report.json", json.dumps(report, indent=2, default=str)
    )

    logger.info(
        f"Artifacts written to {run_dir} (fingerprint={run_fingerprint}, cells={len(df)}, total={_t_total:.2f}s)"
    )

    df.attrs["run_fingerprint"] = run_fingerprint
    df.attrs["run_dir"] = str(run_dir)
    return df

model

suitability_label(score)

Bucket suitability score into qualitative labels.

Parameters:

Name Type Description Default
score float

Suitability score in [0, 1] range.

required

Returns:

Name Type Description
str str

One of 'low', 'medium', or 'high' based on score thresholds.

Notes
  • 'low': score < 0.33
  • 'medium': 0.33 <= score < 0.66
  • 'high': score >= 0.66
Source code in terraflow/model.py
def suitability_label(score: float) -> str:
    """
    Bucket suitability score into qualitative labels.

    Parameters
    ----------
    score:
        Suitability score in [0, 1] range.

    Returns
    -------
    str:
        One of 'low', 'medium', or 'high' based on score thresholds.

    Notes
    -----
    - 'low': score < 0.33
    - 'medium': 0.33 <= score < 0.66
    - 'high': score >= 0.66
    """
    if score < 0.33:
        return "low"
    if score < 0.66:
        return "medium"
    return "high"

suitability_score(v_index, mean_temp, total_rain, params)

Compute a simple suitability score in [0, 1].

Combines normalized vegetation index, temperature, and rainfall using weighted linear combination. All inputs are normalized to [0, 1] range based on the parameter min/max bounds, then combined using the weights.

Parameters:

Name Type Description Default
v_index float

Vegetation index value (typically 0-1, but can be outside range).

required
mean_temp float

Mean temperature in degrees Celsius.

required
total_rain float

Total rainfall in millimeters.

required
params ModelParams

Model parameters containing min/max bounds and weights.

required

Returns:

Name Type Description
float float

Suitability score in [0, 1] range.

Notes
  • Out-of-range inputs are clipped to [0, 1] during normalization
  • Weights should sum to 1.0 for proper combination
  • Final result is clipped to [0, 1] range
Source code in terraflow/model.py
def suitability_score(
    v_index: float,
    mean_temp: float,
    total_rain: float,
    params: ModelParams,
) -> float:
    """
    Compute a simple suitability score in [0, 1].

    Combines normalized vegetation index, temperature, and rainfall using
    weighted linear combination. All inputs are normalized to [0, 1] range
    based on the parameter min/max bounds, then combined using the weights.

    Parameters
    ----------
    v_index:
        Vegetation index value (typically 0-1, but can be outside range).
    mean_temp:
        Mean temperature in degrees Celsius.
    total_rain:
        Total rainfall in millimeters.
    params:
        Model parameters containing min/max bounds and weights.

    Returns
    -------
    float:
        Suitability score in [0, 1] range.

    Notes
    -----
    - Out-of-range inputs are clipped to [0, 1] during normalization
    - Weights should sum to 1.0 for proper combination
    - Final result is clipped to [0, 1] range
    """
    v_n = normalize(v_index, params.v_min, params.v_max)
    t_n = normalize(mean_temp, params.t_min, params.t_max)
    r_n = normalize(total_rain, params.r_min, params.r_max)

    score = params.w_v * v_n + params.w_t * t_n + params.w_r * r_n
    return max(0.0, min(1.0, score))

suitability_score_array(v_index, mean_temp, total_rain, params)

Vectorized suitability scoring over numpy arrays.

Equivalent to calling :func:suitability_score element-wise but operates on arbitrary-shape numpy arrays. Used internally for Monte Carlo uncertainty propagation.

Parameters:

Name Type Description Default
v_index ndarray

Vegetation index values (any shape).

required
mean_temp ndarray

Mean temperature values in °C (same shape as v_index).

required
total_rain ndarray

Total rainfall values in mm (same shape as v_index).

required
params ModelParams

Model parameters containing min/max bounds and weights.

required

Returns:

Type Description
np.ndarray:

Suitability scores clipped to [0, 1], same shape as inputs.

Source code in terraflow/model.py
def suitability_score_array(
    v_index: "np.ndarray",
    mean_temp: "np.ndarray",
    total_rain: "np.ndarray",
    params: ModelParams,
) -> "np.ndarray":
    """Vectorized suitability scoring over numpy arrays.

    Equivalent to calling :func:`suitability_score` element-wise but operates
    on arbitrary-shape numpy arrays.  Used internally for Monte Carlo
    uncertainty propagation.

    Parameters
    ----------
    v_index:
        Vegetation index values (any shape).
    mean_temp:
        Mean temperature values in °C (same shape as *v_index*).
    total_rain:
        Total rainfall values in mm (same shape as *v_index*).
    params:
        Model parameters containing min/max bounds and weights.

    Returns
    -------
    np.ndarray:
        Suitability scores clipped to [0, 1], same shape as inputs.
    """

    def _norm(x: "np.ndarray", lo: float, hi: float) -> "np.ndarray":
        if hi == lo:
            return np.zeros_like(x, dtype=float)
        return np.clip((x - lo) / (hi - lo), 0.0, 1.0)

    score = (
        params.w_v * _norm(v_index, params.v_min, params.v_max)
        + params.w_t * _norm(mean_temp, params.t_min, params.t_max)
        + params.w_r * _norm(total_rain, params.r_min, params.r_max)
    )
    return np.clip(score, 0.0, 1.0)