Skip to content

terraflow.core

The core modules define the configuration schema and the end-to-end pipeline.

Quick Start

from terraflow.pipeline import main
from terraflow.config import load_config

# Load and validate configuration
config = load_config("config.yml")

# Run the complete pipeline
main("config.yml")

Module Organization

  • config: YAML schema validation and Pydantic models
  • pipeline: End-to-end orchestration and artifact generation
  • model: Suitability scoring algorithms and label assignment

API Reference

config

ClimateConfig

Bases: BaseModel

Climate data configuration for spatial matching and interpolation.

Supports two strategies for aligning climate observations to raster cells: - 'spatial': Interpolate climate values using geographic coordinates (lat/lon). - 'index': Match cells to climate records by row index or explicit cell ID.

Attributes:

Name Type Description
strategy Literal['spatial', 'index']

Matching strategy: 'spatial' (interpolation) or 'index' (direct matching).

cell_id_column str | None

Column name in climate CSV for cell ID (used with 'index' strategy). If None with 'index' strategy, uses row order matching.

fallback_to_mean bool

If True, use global mean climate for cells outside interpolation range or when climate data is sparse (default True).

validate_config()

Validate consistency of climate configuration.

Source code in terraflow/config.py
def validate_config(self) -> None:
    """Validate consistency of climate configuration."""
    if self.strategy == "index" and self.cell_id_column is not None:
        # cell_id_column is optional; OK to have it
        pass
    elif self.strategy == "spatial" and self.cell_id_column is not None:
        # cell_id_column not used in spatial strategy
        pass

validate_strategy(v) classmethod

Ensure strategy is valid.

Source code in terraflow/config.py
@field_validator("strategy")
@classmethod
def validate_strategy(cls, v: str) -> str:
    """Ensure strategy is valid."""
    if v not in ("spatial", "index"):
        raise ValueError(f"strategy must be 'spatial' or 'index', got '{v}'")
    return v

ModelParams

Bases: BaseModel

Parameters for normalization and weighting in the suitability model.

The weights w_v, w_t, w_r should sum to approximately 1.0 for proper normalization, though this is not strictly enforced.

Attributes:

Name Type Description
v_min, v_max

Min/max vegetation index values for normalization.

t_min, t_max

Min/max temperature values for normalization (in °C).

r_min, r_max

Min/max rainfall values for normalization (in mm).

w_v, w_t, w_r

Weights for vegetation, temperature, and rainfall (should sum to 1.0).

validate_max_values(v, info) classmethod

Ensure max values are reasonable numbers.

Source code in terraflow/config.py
@field_validator("v_max", "t_max", "r_max")
@classmethod
def validate_max_values(cls, v: float, info) -> float:
    """Ensure max values are reasonable numbers."""
    if not isinstance(v, (int, float)):
        raise ValueError(f"Max value must be numeric, got {type(v)}")
    return v

validate_min_values(v, info) classmethod

Ensure min values are reasonable numbers.

Source code in terraflow/config.py
@field_validator("v_min", "t_min", "r_min")
@classmethod
def validate_min_values(cls, v: float, info) -> float:
    """Ensure min values are reasonable numbers."""
    if not isinstance(v, (int, float)):
        raise ValueError(f"Min value must be numeric, got {type(v)}")
    return v

validate_ranges()

Validate that min < max for all ranges. Call after initialization.

Source code in terraflow/config.py
def validate_ranges(self) -> None:
    """Validate that min < max for all ranges. Call after initialization."""
    if self.v_min >= self.v_max:
        raise ValueError(
            f"v_min ({self.v_min}) must be less than v_max ({self.v_max})"
        )
    if self.t_min >= self.t_max:
        raise ValueError(
            f"t_min ({self.t_min}) must be less than t_max ({self.t_max})"
        )
    if self.r_min >= self.r_max:
        raise ValueError(
            f"r_min ({self.r_min}) must be less than r_max ({self.r_max})"
        )

    # Check weights are non-negative
    if self.w_v < 0 or self.w_t < 0 or self.w_r < 0:
        raise ValueError("Weights must be non-negative")

    # Check weights sum to approximately 1.0 (allow small tolerance)
    weight_sum = self.w_v + self.w_t + self.w_r
    if abs(weight_sum - 1.0) > 0.01:
        raise ValueError(
            f"Weights must sum to approximately 1.0, got {weight_sum:.3f} "
            f"(w_v={self.w_v}, w_t={self.w_t}, w_r={self.w_r})"
        )

PipelineConfig

Bases: BaseModel

Top-level pipeline configuration.

Attributes:

Name Type Description
raster_path Path

Path to the input raster file (GeoTIFF).

climate_csv Path

Path to the climate data CSV file.

output_dir Path

Directory for output results.

roi ROI

Region of interest specification.

model_params ModelParams

Suitability model parameters.

climate ClimateConfig

Climate data configuration (strategy, fallback behavior, etc.).

max_cells int

Maximum number of cells to sample from the ROI (default 500).

validate_all()

Validate all constraints. Call after model initialization.

Source code in terraflow/config.py
def validate_all(self) -> None:
    """Validate all constraints. Call after model initialization."""
    self.roi.validate_bounds()
    self.model_params.validate_ranges()
    self.climate.validate_config()

validate_max_cells(v) classmethod

Ensure max_cells is a positive integer.

Source code in terraflow/config.py
@field_validator("max_cells")
@classmethod
def validate_max_cells(cls, v: int) -> int:
    """Ensure max_cells is a positive integer."""
    if v <= 0:
        raise ValueError(f"max_cells must be positive, got {v}")
    return v

ROI

Bases: BaseModel

Region of interest. For now we support only a bounding box.

Attributes:

Name Type Description
type Literal['bbox']

Type of ROI (currently only 'bbox' is supported).

xmin, ymin, xmax, ymax

Bounding box coordinates expressed in roi_crs (default WGS 84 degrees).

roi_crs str

EPSG code or WKT string for the CRS of the bounding box coordinates. Defaults to "EPSG:4326" (latitude/longitude degrees). The pipeline automatically reprojects to the raster's native CRS before clipping.

validate_bounds()

Validate that bounds are sensible. Call after initialization.

Source code in terraflow/config.py
def validate_bounds(self) -> None:
    """Validate that bounds are sensible. Call after initialization."""
    if self.xmin >= self.xmax:
        raise ValueError(f"xmin ({self.xmin}) must be less than xmax ({self.xmax})")
    if self.ymin >= self.ymax:
        raise ValueError(f"ymin ({self.ymin}) must be less than ymax ({self.ymax})")

build_config(data)

Validate a raw config dict into a PipelineConfig.

Source code in terraflow/config.py
def build_config(data: dict) -> PipelineConfig:
    """Validate a raw config dict into a PipelineConfig."""
    try:
        cfg = PipelineConfig.model_validate(data)
        cfg.validate_all()
        return cfg
    except ValueError as e:
        raise ValueError(f"Configuration validation failed: {e}") from e

load_config(path)

Load YAML config from disk and validate with Pydantic.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML configuration file.

required

Returns:

Name Type Description
PipelineConfig PipelineConfig

Validated configuration object.

Raises:

Type Description
FileNotFoundError:

If the config file does not exist.

yaml.YAMLError:

If the YAML is malformed.

ValueError:

If the configuration is invalid.

Source code in terraflow/config.py
def load_config(path: str | Path) -> PipelineConfig:
    """Load YAML config from disk and validate with Pydantic.

    Parameters
    ----------
    path:
        Path to the YAML configuration file.

    Returns
    -------
    PipelineConfig:
        Validated configuration object.

    Raises
    ------
    FileNotFoundError:
        If the config file does not exist.
    yaml.YAMLError:
        If the YAML is malformed.
    ValueError:
        If the configuration is invalid.
    """
    data = load_config_dict(path)
    return build_config(data)

load_config_dict(path)

Load YAML config from disk into a raw dict.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML configuration file.

required

Returns:

Name Type Description
dict dict

Parsed configuration as a Python dict.

Raises:

Type Description
FileNotFoundError:

If the config file does not exist.

yaml.YAMLError:

If the YAML is malformed.

Source code in terraflow/config.py
def load_config_dict(path: str | Path) -> dict:
    """Load YAML config from disk into a raw dict.

    Parameters
    ----------
    path:
        Path to the YAML configuration file.

    Returns
    -------
    dict:
        Parsed configuration as a Python dict.

    Raises
    ------
    FileNotFoundError:
        If the config file does not exist.
    yaml.YAMLError:
        If the YAML is malformed.
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Configuration file not found: {path}")

    try:
        with path.open("r", encoding="utf-8") as f:
            data = yaml.safe_load(f)
    except yaml.YAMLError as e:
        raise ValueError(f"Failed to parse YAML config: {e}") from e

    if data is None:
        raise ValueError("Configuration file is empty")

    return data

pipeline

run_pipeline(config_path)

Run the end-to-end pipeline and return a DataFrame of results.

Uses spatially-aware climate data matching to apply per-cell climate values based on the configured strategy (spatial interpolation or index-based matching).

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML configuration file.

required

Returns:

Type Description
pd.DataFrame:

Results table with columns: cell_id, lat, lon, v_index, mean_temp, total_rain, score, label.

Raises:

Type Description
FileNotFoundError:

If config file, raster file, or climate CSV does not exist.

ValueError:

If configuration is invalid or no valid raster cells found in ROI.

Source code in terraflow/pipeline.py
def run_pipeline(config_path: str | Path) -> pd.DataFrame:
    """Run the end-to-end pipeline and return a DataFrame of results.

    Uses spatially-aware climate data matching to apply per-cell climate values
    based on the configured strategy (spatial interpolation or index-based matching).

    Parameters
    ----------
    config_path:
        Path to YAML configuration file.

    Returns
    -------
    pd.DataFrame:
        Results table with columns: cell_id, lat, lon, v_index, mean_temp,
        total_rain, score, label.

    Raises
    ------
    FileNotFoundError:
        If config file, raster file, or climate CSV does not exist.
    ValueError:
        If configuration is invalid or no valid raster cells found in ROI.
    """
    config_path = Path(config_path)
    config_dict = load_config_dict(config_path)
    config_dir = config_path.resolve().parent

    # Resolve relative input/output paths against the config file's directory so
    # that configs are portable regardless of the caller's working directory.
    for _key in ("raster_path", "climate_csv", "output_dir"):
        if _key in config_dict and config_dict[_key] is not None:
            _p = Path(str(config_dict[_key]))
            if not _p.is_absolute():
                config_dict[_key] = str((config_dir / _p).resolve())

    cfg: PipelineConfig = build_config(config_dict)
    logger.info("Loaded config from %s", config_path)

    config_bytes_hash = hashlib.sha256(canonicalize_config(config_dict)).hexdigest()
    roi_hash = _resolve_roi_hash(config_dict, config_dir)
    input_paths = _collect_input_paths(config_dict, config_dir)
    input_fps = [fingerprint_file(str(path)) for path in input_paths]
    run_fingerprint = compute_run_fingerprint(config_dict, roi_hash, input_fps)
    logger.info(
        "Computed run fingerprint %s (config=%s, inputs=%d)",
        run_fingerprint,
        config_bytes_hash,
        len(input_fps),
    )

    raster = load_raster(cfg.raster_path)
    raster_crs = raster.crs  # capture before any error path closes the dataset
    try:
        climate_df = load_climate_csv(cfg.climate_csv)
    except Exception:
        raster.close()
        raise

    logger.info(
        "Loaded raster: %s (CRS: EPSG:%s)",
        cfg.raster_path,
        raster_crs.to_epsg() or "custom",
    )
    logger.info("Loaded climate data: %s", cfg.climate_csv)

    # Clip raster to ROI; roi_crs tells clip_raster_to_roi what CRS the bbox
    # coordinates are expressed in so it can reproject them if needed.
    clipped_data, clipped_transform = clip_raster_to_roi(
        raster,
        cfg.roi.model_dump(),
        roi_crs=cfg.roi.roi_crs,
    )
    logger.info("Clipped raster to ROI")

    # Initialize climate interpolator with configured strategy
    interpolator = ClimateInterpolator(
        climate_df=climate_df,
        strategy=cfg.climate.strategy,
        cell_id_column=cfg.climate.cell_id_column,
        fallback_to_mean=cfg.climate.fallback_to_mean,
    )
    logger.info(
        "Initialized climate interpolator with strategy='%s'", cfg.climate.strategy
    )

    rows: int
    cols: int
    rows, cols = clipped_data.shape

    # Collect indices of valid (non-masked) cells.
    valid_indices: List[tuple[int, int]] = [
        (r, c)
        for r in range(rows)
        for c in range(cols)
        if not np.ma.is_masked(clipped_data[r, c])
    ]

    if not valid_indices:
        raise ValueError("No valid raster cells found in the specified ROI")

    # Respect max_cells from config to avoid generating huge tables.
    # Use random sampling for unbiased spatial representation (not just top-left corner).
    max_cells = min(cfg.max_cells, len(valid_indices))
    sampled_indices = random.sample(valid_indices, max_cells)
    logger.info(
        "Sampled %d cells from %d valid cells in ROI", max_cells, len(valid_indices)
    )

    # Pre-compute cell centre coordinates in native raster CRS.
    _native_xs: List[float] = []
    _native_ys: List[float] = []
    for row, col in sampled_indices:
        x, y = xy(clipped_transform, row, col, offset="center")
        _native_xs.append(float(x))
        _native_ys.append(float(y))

    # Reproject to WGS84 (EPSG:4326) so that:
    #   - lat/lon output columns always contain geographic degrees, and
    #   - spatial climate interpolation operates in the same coordinate space
    #     as the weather station lat/lon values.
    _wgs84 = CRS.from_epsg(4326)
    if not raster_crs.equals(_wgs84):
        _coord_tf = Transformer.from_crs(raster_crs, _wgs84, always_xy=True)
        _lons, _lats = _coord_tf.transform(_native_xs, _native_ys)
        cell_lons: List[float] = list(_lons)
        cell_lats: List[float] = list(_lats)
    else:
        cell_lons = _native_xs
        cell_lats = _native_ys

    # Interpolate climate values for all cells at once
    cell_climate_df = interpolator.interpolate(np.array(cell_lats), np.array(cell_lons))
    logger.info(
        "Interpolated climate for %d cells using strategy='%s'",
        len(sampled_indices),
        cfg.climate.strategy,
    )

    records: List[Dict[str, float | int | str]] = []

    for cell_id, (row, col) in enumerate(sampled_indices):
        v_index = float(clipped_data[row, col])

        # Get pre-computed geographic coordinates
        lat = cell_lats[cell_id]
        lon = cell_lons[cell_id]

        # Get per-cell climate values from interpolator
        mean_temp = float(cell_climate_df.iloc[cell_id]["mean_temp"])
        total_rain = float(cell_climate_df.iloc[cell_id]["total_rain"])

        score = suitability_score(
            v_index=v_index,
            mean_temp=mean_temp,
            total_rain=total_rain,
            params=cfg.model_params,
        )
        label = suitability_label(score)

        records.append(
            {
                "cell_id": cell_id,
                "lat": lat,
                "lon": lon,
                "v_index": v_index,
                "mean_temp": mean_temp,
                "total_rain": total_rain,
                "score": score,
                "label": label,
            }
        )

    df = pd.DataFrame.from_records(records)

    # Ensure raster is closed to avoid resource leak
    raster.close()
    logger.info("Closed raster dataset")

    out_dir = ensure_dir(cfg.output_dir)
    out_csv = out_dir / "results.csv"
    df.to_csv(out_csv, index=False)
    logger.info("Saved results to %s", out_csv)

    df.attrs["run_fingerprint"] = run_fingerprint
    return df

model

suitability_label(score)

Bucket suitability score into qualitative labels.

Parameters:

Name Type Description Default
score float

Suitability score in [0, 1] range.

required

Returns:

Name Type Description
str str

One of 'low', 'medium', or 'high' based on score thresholds.

Notes
  • 'low': score < 0.33
  • 'medium': 0.33 <= score < 0.66
  • 'high': score >= 0.66
Source code in terraflow/model.py
def suitability_label(score: float) -> str:
    """
    Bucket suitability score into qualitative labels.

    Parameters
    ----------
    score:
        Suitability score in [0, 1] range.

    Returns
    -------
    str:
        One of 'low', 'medium', or 'high' based on score thresholds.

    Notes
    -----
    - 'low': score < 0.33
    - 'medium': 0.33 <= score < 0.66
    - 'high': score >= 0.66
    """
    if score < 0.33:
        return "low"
    if score < 0.66:
        return "medium"
    return "high"

suitability_score(v_index, mean_temp, total_rain, params)

Compute a simple suitability score in [0, 1].

Combines normalized vegetation index, temperature, and rainfall using weighted linear combination. All inputs are normalized to [0, 1] range based on the parameter min/max bounds, then combined using the weights.

Parameters:

Name Type Description Default
v_index float

Vegetation index value (typically 0-1, but can be outside range).

required
mean_temp float

Mean temperature in degrees Celsius.

required
total_rain float

Total rainfall in millimeters.

required
params ModelParams

Model parameters containing min/max bounds and weights.

required

Returns:

Name Type Description
float float

Suitability score in [0, 1] range.

Notes
  • Out-of-range inputs are clipped to [0, 1] during normalization
  • Weights should sum to 1.0 for proper combination
  • Final result is clipped to [0, 1] range
Source code in terraflow/model.py
def suitability_score(
    v_index: float,
    mean_temp: float,
    total_rain: float,
    params: ModelParams,
) -> float:
    """
    Compute a simple suitability score in [0, 1].

    Combines normalized vegetation index, temperature, and rainfall using
    weighted linear combination. All inputs are normalized to [0, 1] range
    based on the parameter min/max bounds, then combined using the weights.

    Parameters
    ----------
    v_index:
        Vegetation index value (typically 0-1, but can be outside range).
    mean_temp:
        Mean temperature in degrees Celsius.
    total_rain:
        Total rainfall in millimeters.
    params:
        Model parameters containing min/max bounds and weights.

    Returns
    -------
    float:
        Suitability score in [0, 1] range.

    Notes
    -----
    - Out-of-range inputs are clipped to [0, 1] during normalization
    - Weights should sum to 1.0 for proper combination
    - Final result is clipped to [0, 1] range
    """
    v_n = normalize(v_index, params.v_min, params.v_max)
    t_n = normalize(mean_temp, params.t_min, params.t_max)
    r_n = normalize(total_rain, params.r_min, params.r_max)

    score = params.w_v * v_n + params.w_t * t_n + params.w_r * r_n
    return max(0.0, min(1.0, score))