Source code for spac._ripley

"""Functions for point patterns spatial statistics."""
from __future__ import annotations

from typing import Union  # noqa: F401
from typing import TYPE_CHECKING
from typing_extensions import Literal

from scanpy import logging as logg
from anndata import AnnData

from numpy.random import default_rng
from scipy.spatial import Delaunay, ConvexHull
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import LabelEncoder
from scipy.spatial.distance import pdist
from scipy.spatial.distance import cdist
import numpy as np
import pandas as pd

from squidpy._docs import d, inject_docs
from squidpy._utils import NDArrayA
from squidpy.gr._utils import _save_data, _assert_spatial_basis, _assert_categorical_obs
from squidpy._constants._constants import RipleyStat
from squidpy._constants._pkg_constants import Key

__all__ = ["ripley"]

# This code is refactored from squidpy (https://github.com/theislab/squidpy)
# Major changes:
# Calculate Ripley L for two phenotypes
# Pass in the precalculated area
# Pass in the number of observations used in the simulation
# https://github.com/theislab/squidpy/blob/main/src/squidpy/external/_ripley.py


#@d.dedent
#@inject_docs(key=Key.obsm.spatial, rp=RipleyStat)
[docs] def ripley( adata: AnnData, cluster_key: str, mode: Literal["F", "G", "L"] = "F", spatial_key: str = Key.obsm.spatial, metric: str = "euclidean", n_neigh: int = 2, n_simulations: int = 100, n_observations: int = 1000, max_dist: float | None = None, n_steps: int = 50, support: List[float] | None = None, seed: int | None = None, area: float | None = None, copy: bool = False, phenotypes: Tuple[str, str] | None = None ) -> dict[str, pd.DataFrame | NDArrayA]: r""" Calculate various Ripley's statistics for point processes. According to the `'mode'` argument, it calculates one of the following Ripley's statistics: `{rp.F.s!r}`, `{rp.G.s!r}` or `{rp.L.s!r}` statistics. `{rp.F.s!r}`, `{rp.G.s!r}` are defined as: .. math:: F(t),G(t)=P( d_{{i,j}} \le t ) Where :math:`d_{{i,j}}` represents: - distances to a random Spatial Poisson Point Process for `{rp.F.s!r}`. - distances to any other point of the dataset for `{rp.G.s!r}`. `{rp.L.s!r}` we first need to compute :math:`K(t)`, which is defined as: .. math:: K(t) = \frac{{1}}{{\lambda}} \sum_{{i \ne j}} \frac{{I(d_{{i,j}}<t)}}{{n}} and then we apply a variance-stabilizing transformation: .. math:: L(t) = (\frac{{K(t)}}{{\pi}})^{{1/2}} Parameters ---------- %(adata)s %(cluster_key)s mode Which Ripley's statistic to compute. %(spatial_key)s metric Which metric to use for computing distances. For available metrics, check out :class:`sklearn.neighbors.DistanceMetric`. n_neigh Number of neighbors to consider for the KNN graph. n_simulations How many simulations to run for computing p-values. n_observations How many observations to generate for the Spatial Poisson Point Process. max_dist Maximum distances for the support. If `None`, `max_dist=`:math:`\sqrt{{area \over 2}}`. n_steps Number of steps for the support. support list of bins (radiis) for the support. Overrides `max_dist` and `n_steps`. phenotypes For Ripley L, calculate the function for the cells of these two phenotypes. %(seed)s The seed for the random number generator. %(area)s Use passed value for area instead of the area of the convex hull. %(copy)s Returns ------- %(ripley_stat_returns)s References ---------- For reference, check out `Wikipedia <https://en.wikipedia.org/wiki/Spatial_descriptive_statistics#Ripley's_K_and_L_functions>`_ or :cite:`Baddeley2015-lm`. """ _assert_categorical_obs(adata, key=cluster_key) _assert_spatial_basis(adata, key=spatial_key) coordinates = adata.obsm[spatial_key] clusters = adata.obs[cluster_key].values mode = RipleyStat(mode) # type: ignore[assignment] if TYPE_CHECKING: assert isinstance(mode, RipleyStat) # old squidpy code # N = coordinates.shape[#0] hull = ConvexHull(coordinates) # pass in the area instead of convex hull # This is useful when using the same area in multiple ROIs if area is None: area = hull.volume logg.info(f"Area:{area}") if max_dist is None: max_dist = (area / 2) ** 0.5 if support is None: support = np.linspace(0, max_dist, n_steps) else: if not isinstance(support, list) or not all( isinstance(x, (int, float)) for x in support ): raise ValueError(f"Support expected to be a list of floats," f" got {support}.") support = np.array(support) n_steps = len(support) # Check that support is a list of floats # prepare labels le = LabelEncoder().fit(clusters) cluster_idx = le.transform(clusters) if phenotypes is None: obs_arr = np.empty((le.classes_.shape[0], n_steps)) else: obs_arr = np.empty((1, n_steps)) # Remove the diagnoal distances from the distances matrix remove_diagonal = False if phenotypes[0] == phenotypes[1]: remove_diagonal = True start = logg.info( f"Calculating Ripley's {mode} statistic for `{le.classes_.shape[0]}` clusters and `{n_simulations}` simulations" ) if phenotypes is None: for i in np.arange(np.max(cluster_idx) + 1): coord_c = coordinates[cluster_idx == i, :] if mode == RipleyStat.F: logg.warning( f"Running the Ripley F simulations for phenotype ID:{i} " f"with n_cells:{n_observations}" ) random = _ppp(hull, n_simulations=1, n_observations=n_observations, seed=seed) tree_c = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(coord_c) distances, _ = tree_c.kneighbors(random, n_neighbors=n_neigh) bins, obs_stats = _f_g_function(distances.squeeze(), support) elif mode == RipleyStat.G: tree_c = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(coord_c) distances, _ = tree_c.kneighbors(coordinates[cluster_idx != i, :], n_neighbors=n_neigh) bins, obs_stats = _f_g_function(distances.squeeze(), support) elif mode == RipleyStat.L: n_center = n_observations n_neighbor = n_observations distances = pdist(coord_c, metric=metric) bins, obs_stats = _l_function(distances, support, n_observations, area) else: raise NotImplementedError(f"Mode `{mode.s!r}` is not yet implemented.") obs_arr[i] = obs_stats else: if mode == RipleyStat.L: center_phenotype = phenotypes[0] neighbor_phenotype = phenotypes[1] # Index of center and neighbor cells center_idx = le.transform([center_phenotype])[0] neighbor_idx = le.transform([neighbor_phenotype])[0] # Get a boolean series center_cell_bool = cluster_idx == center_idx neighbor_cell_bool = cluster_idx == neighbor_idx n_center = center_cell_bool.sum() n_neighbor = neighbor_cell_bool.sum() center_coord = coordinates[center_cell_bool, :] neighbor_coord = coordinates[neighbor_cell_bool, :] distances = cdist(center_coord, neighbor_coord) bins, obs_stats = _l_multiple_function(distances, support, n_center, n_neighbor, area, remove_diagonal) obs_arr[0] = obs_stats else: raise NotImplementedError(f"Mode `{mode.s!r}` is not yet implemented.") sims = np.empty((n_simulations, len(bins))) pvalues = np.ones((le.classes_.shape[0], len(bins))) rng = default_rng(None if seed is None else seed) if phenotypes is None: logg.warning(f"Running the simulations with n_cells:{n_observations}") for i in range(n_simulations): random_i = _ppp( hull, n_simulations=1, n_observations=n_observations, rng=rng, seed=seed) if mode == RipleyStat.F: tree_i = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(random_i) distances_i, _ = tree_i.kneighbors(random, n_neighbors=1) _, stats_i = _f_g_function(distances_i.squeeze(), support) elif mode == RipleyStat.G: tree_i = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(random_i) distances_i, _ = tree_i.kneighbors(coordinates, n_neighbors=1) _, stats_i = _f_g_function(distances_i.squeeze(), support) elif mode == RipleyStat.L: distances_i = pdist(random_i, metric=metric) _, stats_i = _l_function(distances_i, support, N, area) else: raise NotImplementedError(f"Mode `{mode.s!r}` is not yet " "implemented.") # These lines for code runs for every single phenotype # to see if the simulation value was greater than the # actual value for that phenotype 'j' in obs_arry[j] for j in range(obs_arr.shape[0]): pvalues[j] += stats_i >= obs_arr[j] sims[i] = stats_i else: for i in range(n_simulations): if mode == RipleyStat.L: random_i = _ppp(hull, n_simulations=1, n_observations=n_center+n_neighbor, rng=rng, seed=seed) # Randomly select the frist n_center cells as center cells center_coord = random_i[0:n_center, :] if remove_diagonal: neighbor_coord = center_coord else: neighbor_coord = random_i[n_center:] distances_i = cdist(center_coord, neighbor_coord) _, stats_i = _l_multiple_function(distances_i, support, n_center, n_neighbor, area, remove_diagonal) sims[i] = stats_i else: raise NotImplementedError(f"Mode `{mode.s!r}` is not yet " "implemented for" "multipe phenotypes.") pvalues /= n_simulations + 1 pvalues = np.minimum(pvalues, 1 - pvalues) if phenotypes is None: obs_df = _reshape_res(obs_arr.T, columns=le.classes_, index=bins, var_name=cluster_key) else: obs_df = _reshape_res(obs_arr.T, columns=[f"{phenotypes[0]}_{phenotypes[1]}"], index=bins, var_name=cluster_key) sims_df = _reshape_res(sims.T, columns=np.arange(n_simulations), index=bins, var_name="simulations") res = { f"{mode}_stat": obs_df, "sims_stat": sims_df, "bins": bins, "pvalues": pvalues, "n_center": n_center, "n_neighbor": n_neighbor, 'area': area } if TYPE_CHECKING: assert isinstance(res, dict) if copy: logg.info("Finish", time=start) return res _save_data(adata, attr="uns", key=Key.uns.ripley(cluster_key, mode), data=res, time=start)
[docs] def _reshape_res(results: NDArrayA, columns: NDArrayA | list[str], index: NDArrayA, var_name: str) -> pd.DataFrame: df = pd.DataFrame(results, columns=columns, index=index) df.index.set_names(["bins"], inplace=True) df = df.melt(var_name=var_name, value_name="stats", ignore_index=False) df[var_name] = df[var_name].astype("category", copy=True) df.reset_index(inplace=True) return df
[docs] def _f_g_function(distances: NDArrayA, support: NDArrayA) -> tuple[NDArrayA, NDArrayA]: counts, bins = np.histogram(distances, bins=support) fracs = np.cumsum(counts) / counts.sum() return bins, np.concatenate((np.zeros((1,), dtype=float), fracs))
[docs] def _l_function(distances: NDArrayA, support: NDArrayA, n: int, area: float) -> tuple[NDArrayA, NDArrayA]: n_pairs_less_than_d = (distances < support.reshape(-1, 1)).sum(axis=1) # type: ignore[attr-defined] intensity = n / area k_estimate = ((n_pairs_less_than_d * 2) / n) / intensity l_estimate = np.sqrt(k_estimate / np.pi) return support, l_estimate
[docs] def _l_multiple_function(distances: NDArrayA, support: NDArrayA, n_center: int, n_neighbor: int, area: float, remove_diagonal: bool) -> tuple[NDArrayA, NDArrayA]: # break the line below less than 80 characters distances = distances.flatten() n_pairs_less_than_d = (distances < support.reshape(-1, 1)).sum(axis=1) if remove_diagonal: # Remove diagona except for when Radius = 0 n_pairs_less_than_d = n_pairs_less_than_d - n_center n_pairs_less_than_d[n_pairs_less_than_d < 0] = 0 k_estimate = n_pairs_less_than_d * area / n_center / n_neighbor l_estimate = np.sqrt(k_estimate / np.pi) return support, l_estimate
[docs] def _ppp( hull: ConvexHull, n_simulations: int, n_observations: int, rng: default_rng | None = None, seed: int | None = None) -> NDArrayA: """ Simulate Poisson Point Process on a polygon. Parameters ---------- hull Convex hull of the area of interest. n_simulations Number of simulated point processes. n_observations Number of observations to sample from each simulation. rng Random number generator, superseeds seed seed Random seed. Returns ------- An Array with shape ``(n_simulation, n_observations, 2)``. """ if rng is None: rng = default_rng(None if seed is None else seed) vxs = hull.points[hull.vertices] deln = Delaunay(vxs) bbox = np.array([*vxs.min(0), *vxs.max(0)]) result = np.empty((n_simulations, n_observations, 2)) for i_sim in range(n_simulations): i_obs = 0 while i_obs < n_observations: x, y = ( rng.uniform(bbox[0], bbox[2]), rng.uniform(bbox[1], bbox[3]), ) if deln.find_simplex((x, y)) >= 0: result[i_sim, i_obs] = (x, y) i_obs += 1 return result.squeeze()