import numpy as np
import scanpy as sc
import pandas as pd
import anndata
import warnings
import logging
import scanpy.external as sce
from spac.utils import check_table, check_annotation, check_feature
from scipy import stats
import umap as umap_lib
from scipy.sparse import issparse
from typing import List, Union, Optional
from numpy.lib import NumpyVersion
import multiprocessing
import parmap
from spac.utag_functions import utag
# Configure logging
logging.basicConfig(level=logging.INFO,
format='SPAC:%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
[docs]
def phenograph_clustering(
adata,
features,
layer=None,
k=50,
seed=None,
output_annotation="phenograph",
associated_table=None,
**kwargs):
"""
Calculate automatic phenotypes using phenograph.
The function will add these two attributes to `adata`:
`.obs["phenograph"]`
The assigned int64 class by phenograph
`.uns["phenograph_features"]`
The features used to calculate the phenograph clusters
Parameters
----------
adata : anndata.AnnData
The AnnData object.
features : list of str
The variables that would be included in creating the phenograph
clusters.
layer : str, optional
The layer to be used in calculating the phengraph clusters.
k : int, optional
The number of nearest neighbor to be used in creating the graph.
seed : int, optional
Random seed for reproducibility.
output_annotation : str, optional
The name of the output layer where the clusters are stored.
associated_table : str, optional
If set, use the corresponding key `adata.obsm` to calcuate the
Phenograph. Takes priority over the layer argument.
Returns
-------
adata : anndata.AnnData
Updated AnnData object with the phenograph clusters
stored in `adata.obs[output_annotation]`
"""
_validate_transformation_inputs(
adata=adata,
layer=layer,
associated_table=associated_table,
features=features
)
if not isinstance(k, int) or k <= 0:
raise ValueError("`k` must be a positive integer")
if seed is not None:
np.random.seed(seed)
data = _select_input_features(
adata=adata,
layer=layer,
associated_table=associated_table,
features=features
)
phenograph_out = sce.tl.phenograph(data,
clustering_algo="leiden",
k=k,
seed=seed,
**kwargs)
adata.obs[output_annotation] = pd.Categorical(phenograph_out[0])
adata.uns["phenograph_features"] = features
[docs]
def get_cluster_info(adata, annotation, features=None, layer=None):
"""
Retrieve information about clusters based on specific annotation.
Parameters
----------
adata : anndata.AnnData
The AnnData object.
annotation : str
Annotation in adata.obs for cluster info.
features : list of str, optional
Features (e.g., markers) for cluster metrics.
Defaults to all features in adata.var_names.
layer : str, optional
The layer to be used in the aggregate summaries.
If None, uses adata.X.
Returns
-------
pd.DataFrame
DataFrame with metrics for each cluster including the percentage of
each cluster to the whole sample.
"""
# Use utility functions for input validation
check_annotation(adata, annotations=annotation)
if features is None:
features = list(adata.var_names)
else:
check_feature(adata, features=features)
if layer:
check_table(adata, tables=layer)
# Convert data matrix or specified layer to DataFrame directly
if layer:
data_df = adata.to_df(layer=layer)
else:
data_df = adata.to_df()
# Count cells in each cluster
cluster_counts = adata.obs[annotation].value_counts().reset_index()
cluster_counts.columns = [annotation, "Number of Cells"]
# Calculate the percentage of cells in each cluster
total_cells = adata.obs.shape[0]
cluster_counts['Percentage'] = (
cluster_counts['Number of Cells'] / total_cells
) * 100
# Initialize DataFrame for cluster metrics
cluster_metrics = pd.DataFrame({annotation: cluster_counts[annotation]})
# Add cluster annotation
data_df[annotation] = adata.obs[annotation].values
# Calculate statistics for each feature in each cluster
for feature in features:
grouped = data_df.groupby(annotation)[feature]\
.agg(["mean", "median"])\
.reset_index()
grouped.columns = [
f"{col}_{feature}" if col != annotation else annotation
for col in grouped.columns
]
cluster_metrics = cluster_metrics.merge(
grouped, on=annotation, how="left"
)
# Merge cluster counts and percentage
cluster_metrics = pd.merge(
cluster_metrics, cluster_counts,
on=annotation, how="left"
)
return cluster_metrics
[docs]
def tsne(adata, layer=None, **kwargs):
"""
Perform t-SNE transformation on specific layer information.
Parameters
----------
adata : anndata.AnnData
The AnnData object.
layer : str
Layer for phenograph cluster calculation.
**kwargs
Parameters for scanpy.tl.tsne function.
Returns
-------
adata : anndata.AnnData
Updated AnnData object with t-SNE coordinates.
"""
# If a layer is provided, it's transferred to 'obsm' for t-SNE computation
# in scanpy.tl.tsne, which defaults to using the 'X' data matrix if not.
if not isinstance(adata, anndata.AnnData):
raise ValueError("adata must be an AnnData object.")
if layer is not None:
if layer not in adata.layers:
raise ValueError(f"Layer '{layer}' not found in adata.layers.")
tsne_obsm_name = layer + "_tsne"
X_tsne = adata.to_df(layer=layer)
adata.obsm[tsne_obsm_name] = X_tsne
else:
tsne_obsm_name = None
sc.tl.tsne(adata, use_rep=tsne_obsm_name, random_state=7)
return adata
[docs]
def run_umap(
adata,
n_neighbors=75,
min_dist=0.1,
n_components=2,
metric='euclidean',
random_state=0,
transform_seed=42,
layer=None,
output_derived_feature='X_umap',
associated_table=None,
**kwargs
):
"""
Perform UMAP analysis on the specific layer of the AnnData object
or the default data.
Parameters
----------
adata : AnnData
Annotated data matrix.
n_neighbors : int, default=75
Number of neighbors to consider when constructing the UMAP. This
influences the balance between preserving local and global structures
in the data.
min_dist : float, default=0.1
Minimum distance between points in the UMAP space. Controls how
tightly the embedding is allowed to compress points together.
n_components : int, default=2
Number of dimensions for embedding.
metric : str, optional
Metric to compute distances in high dimensional space.
Check `https://umap-learn.readthedocs.io/en/latest/api.html` for
options. The default is 'euclidean'.
random_state : int, default=0
Seed used by the random number generator(RNG) during UMAP fitting.
transform_seed : int, default=42
RNG seed during UMAP transformation.
layer : str, optional
Layer of AnnData object for UMAP. Defaults to `adata.X`.
output_derived_feature : str, default='X_umap'
The name of the column in adata.obsm that will contain the
umap coordinates.
associated_table : str, optional
If set, use the corresponding key `adata.obsm` to calcuate the
UMAP. Takes priority over the layer argument.
Returns
-------
adata : anndata.AnnData
Updated AnnData object with UMAP coordinates stored in the `obsm`
attribute. The key for the UMAP embedding in `obsm` is "X_umap" by
default.
"""
_validate_transformation_inputs(
adata=adata,
layer=layer,
associated_table=associated_table
)
data = _select_input_features(
adata=adata,
layer=layer,
associated_table=associated_table
)
# Convert data to pandas DataFrame for better memory handling
data = pd.DataFrame(data.astype(np.float32))
# Create and configure the UMAP model
umap_model = umap_lib.UMAP(
n_neighbors=n_neighbors,
min_dist=min_dist,
n_components=n_components,
metric=metric,
low_memory=True,
random_state=random_state,
transform_seed=transform_seed,
**kwargs
)
# Fit and transform the data with the UMAP model
embedding = umap_model.fit_transform(data)
# Store the UMAP coordinates back into the AnnData object under the
# output_derived_feature key
adata.obsm[output_derived_feature] = embedding
return adata
[docs]
def batch_normalize(adata, annotation, output_layer,
input_layer=None, method="median", log=False):
"""
Adjust the features of every marker using a normalization method.
The normalization methods are summarized here:
https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8723144/
Parameters
----------
adata : anndata.AnnData
The AnnData object.
annotation: str
The name of the annotation in `adata` to define batches.
output_layer : str
The name of the new layer to add to the anndata object for storing
normalized data.
input_layer : str, optional
The name of the layer from which to read data. If None, read from `.X`.
method : {"median", "Q50", "Q75", "z-score"}, default "median"
The normalization method to use.
log : bool, default False
If True, take the log2 of features before normalization.
Ensure this is boolean.
"""
# Use utility functions for input validation
check_annotation(adata, annotations=annotation)
if input_layer:
check_table(adata, tables=input_layer)
if not isinstance(log, bool):
logger.error("Argument 'log' must be of type bool.")
raise ValueError("Argument 'log' must be of type bool.")
allowed_methods = ["median", "Q50", "Q75", "z-score"]
if method not in allowed_methods:
raise ValueError(
f"Unsupported normalization method '{method}', "
f"allowed methods = {allowed_methods}"
)
# Create a copy of the input layer or '.X'
if input_layer:
original = pd.DataFrame(adata.layers[input_layer],
index=adata.obs.index).copy()
else:
original = pd.DataFrame(adata.X, index=adata.obs.index).copy()
# Logarithmic transformation if required
if log:
original = np.log2(1+original)
logger.info("Data transformed with log2")
# Initialize the batch annotation values
batches = adata.obs[annotation].unique().tolist()
if method == "median" or method == "Q50":
all_batch_quantile = original.quantile(q=0.5)
logger.info("Median for al cells: %s", all_batch_quantile)
elif method == "Q75":
all_batch_quantile = original.quantile(q=0.75)
logger.info("Q75 for all cells: %s", all_batch_quantile)
elif method == "z-score":
logger.info("Z-score setup is handled in batch processing loop.")
# Normalize each batch
for batch in batches:
batch_cells = original[adata.obs[annotation] == batch]
logger.info(f"Processing batch: {batch}, "
f"original values:\n{batch_cells}")
if method == "median":
batch_median = batch_cells.quantile(q=0.5)
logger.info(f"Median for {batch}: %s", batch_median)
original.loc[
(adata.obs[annotation] == batch)
] = batch_cells + (all_batch_quantile - batch_median)
elif method == "Q50":
batch_50quantile = batch_cells.quantile(q=0.5)
logger.info(f"Q50 for {batch}: %s", batch_50quantile)
original.loc[adata.obs[annotation] == batch] = (
batch_cells * all_batch_quantile / batch_50quantile
)
elif method == "Q75":
batch_75quantile = batch_cells.quantile(q=0.75)
logger.info(f"Q75 for {batch}: %s", batch_75quantile)
original.loc[adata.obs[annotation] == batch] = (
batch_cells * all_batch_quantile / batch_75quantile
)
elif method == "z-score":
mean = batch_cells.mean()
std = batch_cells.std(ddof=0) # DataFrame.std() by default ddof=1
logger.info(f"mean for {batch}: %s", mean)
logger.info(f"std for {batch}: %s", std)
# Ensure std is not zero by using a minimal threshold (e.g., 1e-8)
std = np.maximum(std, 1e-8)
original.loc[adata.obs[annotation] == batch] = \
(batch_cells - mean) / std
# Store normalized data in the specified output layer
adata.layers[output_layer] = original
logging.info(f"Normalization completed. Data in layer '{output_layer}'.")
[docs]
def rename_annotations(adata, src_annotation, dest_annotation, mappings):
"""
Rename labels in a given annotation in an AnnData object based on a
provided dictionary. This function modifies the adata object in-place
and creates a new annotation column.
Parameters
----------
adata : anndata.AnnData
The AnnData object.
src_annotation : str
Name of the column in adata.obs containing the original
labels of the source annotation.
dest_annotation : str
The name of the new column to be created in the AnnData object
containing the renamed labels.
mappings : dict
A dictionary mapping the original annotation labels to
the new labels.
Examples
--------
>>> adata = your_anndata_object
>>> src_annotation = "phenograph"
>>> mappings = {
... "0": "group_8",
... "1": "group_2",
... "2": "group_6",
... # ...
... "37": "group_5",
... }
>>> dest_annotation = "renamed_annotations"
>>> adata = rename_annotations(
... adata, src_annotation, dest_annotation, mappings)
"""
# Use utility functions for input validation
check_annotation(adata, annotations=src_annotation)
# Inform the user about the data type of the original column
original_dtype = adata.obs[src_annotation].dtype
print(f"The data type of the original column '{src_annotation}' is "
f"{original_dtype}.")
# Convert the keys in mappings to the same data type as the unique values
unique_values = adata.obs[src_annotation].unique()
mappings = {
type(unique_values[0])(key): value for key, value in mappings.items()
}
# Identify and handle unmapped labels
missing_mappings = [
key for key in unique_values if key not in mappings.keys()
]
if missing_mappings:
warnings.warn(
f"Missing mappings for the following labels: {missing_mappings}. "
f"They will be set to NaN in the '{dest_annotation}' column."
)
for missing in missing_mappings:
mappings[missing] = np.nan # Assign NaN for missing mappings
# Create a new column based on the mappings
adata.obs[dest_annotation] = (
adata.obs[src_annotation].map(mappings).astype("category")
)
[docs]
def normalize_features(
adata,
low_quantile=0.02,
high_quantile=0.98,
interpolation="linear",
input_layer=None,
output_layer="normalized_feature",
per_batch=False,
annotation=None
):
"""
Normalize the features stored in an AnnData object.
Any entry lower than the value corresponding to low_quantile of the
column will be assigned a value of low_quantile, and entry that
are greater than high_quantile value will be assigned as the value of
high_quantile.
Other entries will be normalized with
(values - quantile min)/(quantile max - quantile min).
Resulting column will have value ranged between [0, 1].
"""
# Check if the provided input_layer exists in the AnnData object
if input_layer is not None:
check_table(adata, tables=input_layer)
# Check if output_layer already exists and issue a warning if it does
if output_layer in adata.layers:
warnings.warn(
f"Layer '{output_layer}' already exists. It will be overwritten "
"with the new transformed data."
)
if not isinstance(high_quantile, (int, float)):
raise TypeError(
"The high quantile should be a numeric value, currently got {}"
.format(type(high_quantile))
)
if not isinstance(low_quantile, (int, float)):
raise TypeError(
"The low quantile should be a numeric value, currently got {}"
.format(type(low_quantile))
)
if low_quantile >= high_quantile:
raise ValueError(
"The low quantile should be smaller than the high quantile, "
"current values are: low quantile: {}, high_quantile: {}"
.format(low_quantile, high_quantile)
)
if high_quantile <= 0 or high_quantile > 1:
raise ValueError(
"The high quantile value should be within (0, 1], current value:"
" {}".format(high_quantile)
)
if low_quantile < 0 or low_quantile >= 1:
raise ValueError(
"The low quantile value should be within [0, 1), current value: {}"
.format(low_quantile)
)
if interpolation not in ["nearest", "linear"]:
raise ValueError(
"interpolation must be either 'nearest' or 'linear', passed value"
" is: {}".format(interpolation)
)
# Extract the data to transform
if input_layer is not None:
data_to_transform = adata.layers[input_layer]
else:
data_to_transform = adata.X
# Ensure data is in numpy array format
data_to_transform = (data_to_transform.toarray()
if issparse(data_to_transform)
else data_to_transform)
if per_batch:
if annotation is None:
raise ValueError(
"annotation must be provided if per_batch is True."
)
transformed_data = apply_per_batch(
data_to_transform, adata.obs[annotation].values,
method='normalize_features',
low_quantile=low_quantile,
high_quantile=high_quantile,
interpolation=interpolation
)
else:
transformed_data = normalize_features_core(
data_to_transform,
low_quantile=low_quantile,
high_quantile=high_quantile,
interpolation=interpolation
)
# Store the transformed data in the specified output_layer
adata.layers[output_layer] = pd.DataFrame(
transformed_data,
index=adata.obs_names,
columns=adata.var_names
)
return adata
[docs]
def normalize_features_core(data, low_quantile=0.02, high_quantile=0.98,
interpolation='linear'):
"""
Normalize the features in a numpy array.
Any entry lower than the value corresponding to low_quantile of the column
will be assigned a value of low_quantile, and entries that are greater than
high_quantile value will be assigned as value of high_quantile.
Other entries will be normalized with
(values - quantile min)/(quantile max - quantile min).
Resulting column will have values ranged between [0, 1].
Parameters
----------
data : np.ndarray
The data to be normalized.
low_quantile : float, optional (default: 0.02)
The lower quantile to use for normalization. Determines the
minimum value after normalization.
Must be a positive float between [0,1).
high_quantile : float, optional (default: 0.98)
The higher quantile to use for normalization. Determines the
maximum value after normalization.
Must be a positive float between (0,1].
interpolation: str, optional (default: "linear")
The interpolation method to use when selecting the value for
low and high quantile. Values can be "nearest" or "linear".
Returns
-------
np.ndarray
The normalized data.
Raises
------
TypeError
If low_quantile or high_quantile are not numeric.
ValueError
If low_quantile is not less than high_quantile, or if they are
out of the range [0, 1] and (0, 1], respectively.
ValueError
If interpolation is not one of the allowed values.
"""
if not isinstance(high_quantile, (int, float)):
raise TypeError(
"The high quantile should be a numeric value, "
f"currently got {str(type(high_quantile))}")
if not isinstance(low_quantile, (int, float)):
raise TypeError(
"The low quantile should be a numeric value, "
f"currently got {str(type(low_quantile))}")
if low_quantile < high_quantile:
if high_quantile <= 0 or high_quantile > 1:
raise ValueError(
"The high quantile value should be within "
f"(0, 1], current value: {high_quantile}")
if low_quantile < 0 or low_quantile >= 1:
raise ValueError(
"The low quantile value should be within "
f"[0, 1), current value: {low_quantile}")
else:
raise ValueError(
"The low quantile should be smaller than "
"the high quantile, current values are:\n"
f"low quantile: {low_quantile}\n"
f"high quantile: {high_quantile}")
if interpolation not in ["nearest", "linear"]:
raise ValueError(
"Interpolation must be either 'nearest' or 'linear', "
f"passed value is: {interpolation}")
# Version check for numpy using NumpyVersion
numpy_version = np.__version__
if NumpyVersion(numpy_version) >= NumpyVersion('1.22.0'):
# Use 'method' argument for newer versions
quantiles = np.quantile(
data, [low_quantile, high_quantile], axis=0,
method=interpolation
)
else:
# Use 'interpolation' argument for older versions
quantiles = np.quantile(
data, [low_quantile, high_quantile], axis=0,
interpolation=interpolation
)
qmin = quantiles[0]
qmax = quantiles[1]
# Calculate range_values and identify zero ranges
range_values = qmax - qmin
zero_range_mask = range_values == 0
# Prevent division by zero by setting zero ranges to 1 temporarily
range_values = range_values.astype(float)
range_values[zero_range_mask] = 1.0
# Clip raw values to the quantile range before normalization
clipped_data = np.clip(data, qmin, qmax)
# Normalize the clipped values
normalized_data = (clipped_data - qmin) / range_values
# Correct normalized data for zero ranges
normalized_data[:, zero_range_mask] = 0.0
return normalized_data
[docs]
def z_score_normalization(adata, output_layer, input_layer=None, **kwargs):
"""
Compute z-scores for the provided AnnData object.
Parameters
----------
adata : anndata.AnnData
The AnnData object containing the data to normalize.
output_layer : str
The name of the layer to store the computed z-scores.
input_layer : str, optional
The name of the layer in the AnnData object to normalize.
If None, the main data matrix .X is used.
**kwargs : dict, optional
Additional arguments to pass to scipy.stats.zscore.
"""
# Check if the provided input_layer exists in the AnnData object
if input_layer:
check_table(adata, tables=input_layer)
data_to_normalize = adata.layers[input_layer]
else:
data_to_normalize = adata.X
# Ensure data is in numpy array format
data_to_normalize = data_to_normalize.toarray() \
if issparse(data_to_normalize) else data_to_normalize
# Compute z-scores using scipy.stats.zscore
normalized_data = stats.zscore(
data_to_normalize, axis=0, nan_policy='omit', **kwargs
)
# Store the computed z-scores in the specified output_layer
adata.layers[output_layer] = pd.DataFrame(
normalized_data,
index=adata.obs_names,
columns=adata.var_names
)
# Print a message indicating that normalization is complete
print(
f'Z-score normalization completed. '
f'Data stored in layer "{output_layer}".'
)
[docs]
def apply_per_batch(data, annotation, method, **kwargs):
"""
Apply a given function to data per batch, with additional parameters.
Parameters
----------
data : np.ndarray
The data to transform.
annotation : np.ndarray
Batch annotations for each row in the data.
method : str
The function to apply to each batch. Options: 'arcsinh_transformation'
or 'normalize_features'.
kwargs:
Additional parameters to pass to the function.
Returns
-------
np.ndarray
The transformed data.
"""
# Check data integrity
if not isinstance(data, np.ndarray) or not isinstance(annotation,
np.ndarray):
raise ValueError("data and annotation must be numpy arrays")
if len(data) != len(annotation):
raise ValueError("data and annotation must have the same number of "
"rows")
if data.ndim != 2:
raise ValueError("data must be a 2D array")
method_dict = {
'arcsinh_transformation': arcsinh_transformation_core,
'normalize_features': normalize_features_core
}
if method not in method_dict:
raise ValueError("method must be 'arcsinh_transformation' or "
"'normalize_features'")
transform_function = method_dict[method]
transformed_data = np.zeros_like(data)
unique_batches = np.unique(annotation)
for batch in unique_batches:
batch_indices = np.where(annotation == batch)[0]
batch_data = data[batch_indices]
normalized_batch_data = transform_function(batch_data, **kwargs)
transformed_data[batch_indices] = normalized_batch_data
return transformed_data
# utag clustering
[docs]
def run_utag_clustering(
adata,
features=None,
k=15,
resolution=1,
max_dist=20,
n_pcs=10,
random_state=42,
n_jobs=1,
n_iterations=5,
slide_key="Slide",
layer=None,
output_annotation="UTAG",
associated_table=None,
parallel=False,
**kwargs
):
"""
Run UTAG clustering on the AnnData object.
Parameters
----------
adata : anndata.AnnData
The AnnData object.
features : list
List of features to use for clustering or for PCA. Default
(None) is to use all.
k : int
The number of nearest neighbor to be used in creating the graph.
Default is 15.
resolution : float
Resolution parameter for the clustering, higher resolution produces
more clusters. Default is 1.
max_dist : float
Maximum distance to cut edges within a graph. Default is 20.
n_principal_components : int
Number of principal components to use for clustering.
random_state : int
Random state for reproducibility.
n_jobs : int
Number of jobs to run in parallel. Default is 5.
n_iterations : int
Number of iterations for the clustering.
slide_key: str
Key of adata.obs containing information on the batch structure
of the data.In general, for image data this will often be a variable
indicating the imageb so image-specific effects are removed from data.
Default is "Slide".
Returns
-------
adata : anndata.AnnData
Updated AnnData object with clustering results.
"""
resolutions = [resolution]
_validate_transformation_inputs(
adata=adata,
layer=layer,
associated_table=associated_table,
features=features
)
# add print the current k value
if not isinstance(k, int) or k <= 0:
raise ValueError(f"`k` must be a positive integer, but received {k}.")
if random_state is not None:
np.random.seed(random_state)
if features is not None:
data = _select_input_features(
adata=adata,
layer=layer,
associated_table=associated_table,
features=features,)
adata_utag = adata[:, features].copy()
adata_utag.X = data
else:
adata_utag = adata.copy()
utag_results = utag(
adata_utag,
slide_key=slide_key,
max_dist=max_dist,
normalization_mode='l1_norm',
apply_clustering=True,
clustering_method="leiden",
resolutions=resolutions,
leiden_kwargs={"n_iterations": n_iterations,
"random_state": random_state},
n_pcs=n_pcs,
parallel=parallel,
processes=n_jobs,
k=k,
)
# change camel case to snake
curClusterCol = 'UTAG Label_leiden_' + str(resolution)
cluster_list = utag_results.obs[curClusterCol].copy()
adata.obs[output_annotation] = cluster_list.copy()
adata.uns["utag_features"] = features