import re
import os
import pandas as pd
import anndata as ad
import numpy as np
import warnings
from sklearn.preprocessing import MinMaxScaler
from typing import Tuple
import logging
from collections import defaultdict
from spac.utils import regex_search_list, check_list_in_list, check_annotation
from anndata import AnnData
[docs]
def append_annotation(
data: pd.DataFrame,
annotation: dict
) -> pd.DataFrame:
"""
Append a new annotation with single value to
a Pandas DataFrame based on mapping rules.
Parameters
----------
data : pd.DataFrame
The input DataFrame to which the new observation will be appended.
annotation : dict
dictionary of string pairs representing
the new annotation and its value.
Each pair should have this format:
<new annotation column name>:<value of the annotation>
The values must be a single string or numeric value.
Returns
-------
pd.DataFrame
The DataFrame with the new observation appended.
"""
if not isinstance(annotation, dict):
error_msg = "Annotation must be provided as a dictionary."
raise ValueError(error_msg)
for new_column, value in annotation.items():
if not isinstance(new_column, str):
error_msg = f"The key {new_column} is not " + \
"a single string, please check."
raise ValueError(error_msg)
if not isinstance(value, (str, int, float)):
error_msg = f"The value {value} in {new_column} is not " + \
"a single string or numeric value, please check."
raise ValueError(error_msg)
if new_column in data.columns:
error_msg = f"'{new_column}' already exists in the DataFrame."
raise ValueError(error_msg)
data[new_column] = value
return data
[docs]
def ingest_cells(dataframe,
regex_str,
x_col=None,
y_col=None,
annotation=None):
"""
Read the csv file into an anndata object.
The function will also intialize features and spatial coordiantes.
Parameters
----------
dataframe : pandas.DataFrame
The data frame that contains cells as rows, and cells informations as
columns.
regex_str : str or list of str
A string or a list of strings representing python regular expression
for the features columns in the data frame. x_col : str The column
name for the x coordinate of the cell.
y_col : str
The column name for the y coordinate of the cell.
annotation : str or list of str
The column name for the region that the cells. If a list is passed,
multiple annotations will be created in the returned AnnData object.
Returns
-------
anndata.AnnData
The generated AnnData object
"""
if not isinstance(regex_str, list):
regex_list = [regex_str]
else:
regex_list = regex_str
all_columns = list(dataframe.columns)
all_features = []
for column in regex_list:
current_features = regex_search_list(
[column],
all_columns
)
if len(current_features) == 0:
error_message = "Provided regex pattern(s) or feature(s):\n" + \
f'"{column}"\n' + \
"does not match any in the dataset, please review the input."
raise ValueError(error_message)
all_features.extend(current_features)
features_df = dataframe[all_features]
adata = ad.AnnData(
features_df,
dtype=features_df[all_features[0]].dtype)
if annotation is not None:
if isinstance(annotation, str):
list_of_annotation = [annotation]
else:
list_of_annotation = annotation
for annotation in list_of_annotation:
# As selecting one column of the dataframe returns a series which
# AnnData converts to NaN, then I convert it to a list before
# assignment.
adata.obs[annotation] = dataframe[annotation].tolist()
if x_col is not None and y_col is not None:
numpy_array = dataframe[[x_col, y_col]].to_numpy().astype('float32')
adata.obsm["spatial"] = numpy_array
return adata
[docs]
def concatinate_regions(regions):
"""
Concatinate data from multiple regions and create new indexes.
Parameters
----------
regions : list of anndata.AnnData
AnnData objects to be concatinated.
Returns
-------
anndata.AnnData
New AnddData object with the concatinated values in AnnData.X
"""
all_adata = ad.concat(regions)
all_adata.obs_names_make_unique()
return all_adata
[docs]
def rescale_features(features, min_quantile=0.01, max_quantile=0.99):
"""
Clip and rescale features outside the minimum and maximum quantile.
The rescaled features will be between 0 and 1.
Parameters
----------
features : pandas.Dataframe
The DataRrame of features.
min_quantile : float
The minimum quantile to be consider zero.
max_quantile: float
The maximum quantile to be considerd 1.
Returns
-------
pandas.DataFrame
The created DataFrame with normalized features.
"""
markers_max_quantile = features.quantile(max_quantile)
markers_min_quantile = features.quantile(min_quantile)
features_clipped = features.clip(
markers_min_quantile,
markers_max_quantile,
axis=1)
scaler = MinMaxScaler()
np_features_scaled = scaler.fit_transform(
features_clipped.to_numpy())
features_scaled = pd.DataFrame(
np_features_scaled,
columns=features_clipped.columns)
return features_scaled
[docs]
def add_rescaled_features(adata, min_quantile, max_quantile, layer):
"""
Clip and rescale the features matrix.
The results will be added into a new layer in the AnnData object.
Parameters
----------
adata : anndata.AnnData
The AnnData object.
min_quantile : float
The minimum quantile to rescale to zero.
max_quantile : float
The maximum quantile to rescale to one.
layer : str
The name of the new layer to add to the anndata object.
"""
original = adata.to_df()
rescaled = rescale_features(original, min_quantile, max_quantile)
adata.layers[layer] = rescaled
[docs]
def subtract_min_per_region(adata, annotation, layer, min_quantile=0.01):
"""
Substract the minimum quantile of every marker per region.
Parameters
----------
adata : anndata.AnnData
The AnnData object.
annotation: str
The name of the annotation in `adata` to define batches.
min_quantile : float
The minimum quantile to rescale to zero.
layer : str
The name of the new layer to add to the AnnData object.
"""
regions = adata.obs[annotation].unique().tolist()
original = adata.to_df()
new_df_list = []
for region in regions:
region_cells = original[adata.obs[annotation] == region]
new_features = subtract_min_quantile(region_cells, min_quantile)
new_df_list.append(new_features)
new_df = pd.concat(new_df_list)
adata.layers[layer] = new_df
[docs]
def subtract_min_quantile(features, min_quantile=.01):
"""
Subtract the features defined by the minimum quantile from all columns.
Parameters
----------
features : pandas.DataFrame
The dataframe of features.
min_quantile: float
The minimum quantile to be consider zero.
Returns
-------
pandas.DataFrame
dataframe with rescaled features.
"""
columns_min_quantile = features.quantile(min_quantile)
subtracted_min = features - columns_min_quantile
# Clip negative values to zero
subtracted_min.clip(lower=0, axis=1, inplace=True)
return subtracted_min
[docs]
def load_csv_files(file_names):
"""
Read the csv file(s) into a pandas dataframe.
Parameters
----------
file_names : str or list
A list of csv file paths to be
combined into single list of dataframe output
Returns
-------
pandas.dataframe
A pandas dataframe of all the csv files. The returned dataset
will have an extra column called "loaded_file_name" containing
source file name.
"""
# meta_schema = []
dataframe_list = []
dataframe_name = []
if not isinstance(file_names, list):
if not isinstance(file_names, str):
file_name_type = type(file_names)
error_message = "file_names should be list or string" + \
", but got " + str(file_name_type) + "."
raise TypeError(error_message)
else:
file_names = [file_names]
for file_name in file_names:
# Try to load the csv into pandas DataFrame.
# Check if the file exists
if not os.path.exists(file_name):
error_message = f"The file '{file_name}' does not exist."
raise FileNotFoundError(error_message)
# Check if the file is readable
if not os.access(file_name, os.R_OK):
error_message = "The file " + file_name + \
" cannot be read due to insufficient permissions."
raise PermissionError(error_message)
try:
current_df = pd.read_csv(file_name)
except pd.errors.EmptyDataError:
error_message = "The file is empty or does not contain any data."
raise TypeError(error_message)
except pd.errors.ParserError:
error_message = "The file could not be parsed. " + \
"Please check that the file is a valid CSV."
raise TypeError(error_message)
current_df["loaded_file_name"] = file_name
# current_schema = current_df.columns.to_list()
# if len(meta_schema) == 0:
# meta_schema = current_schema
# print("Meta schema acquired. Columns are:")
# for column_name in meta_schema:
# print(column_name)
# if len(meta_schema) == len(current_schema):
# if set(meta_schema) != set(current_schema):
# error_message = "Column in current file does not match " + \
# f"the meta_schema, got:\n {current_schema}. "
# raise ValueError(error_message)
# else:
# error_message = "Column in current file does not match " + \
# f"the meta_schema, got:\n {current_schema}. "
# raise ValueError(error_message)
dataframe_list.append(current_df)
dataframe_name.append(file_name)
logging.info("CSVs are converted into dataframes and combined"
" into a list!")
logging.info("Total of " + str(len(dataframe_list)) +
" dataframes in the list.")
for i, each_file in enumerate(dataframe_list):
logging.info(f"File name: {dataframe_name[i]}")
logging.info("Info: ")
logging.info(each_file.info())
logging.info("Description: ")
logging.info(each_file.describe())
logging.info("\n")
logging.info("Combining Dataframes into Single Dataframe...")
combined_dataframe = combine_dfs(dataframe_list)
return combined_dataframe
[docs]
def select_values(data, annotation, values=None, exclude_values=None):
"""
Selects values from either a pandas DataFrame or an AnnData object based
on the annotation and values.
Parameters
----------
data : pandas.DataFrame or anndata.AnnData
The input data. Can be a DataFrame for tabular data or an AnnData
object.
annotation : str
The column name in a DataFrame or the annotation key in an AnnData
object to be used for selection.
values : str or list of str
List of values for the annotation to include. If None, all values are
considered for selection.
exclude_values : str or list of str
List of values for the annotation to exclude. Can't be combined with
values.
Returns
-------
pandas.DataFrame or anndata.AnnData
The filtered DataFrame or AnnData object containing only the selected
rows based on the annotation and values.
"""
# Make sure that either values or exclude_values is set, but not both
if values is not None and exclude_values is not None:
error_msg = "Only use with values to include or exclude, but not both."
logging.error(error_msg)
raise ValueError(error_msg)
# If values and exclude_values are both None, return the original data
if values is None and exclude_values is None:
print("No values or exclude_values provided. Returning original data.")
return data
# Ensure values are in a list format if not None
if values is not None and not isinstance(values, list):
values = [values]
# Ensure exclude_values are in a list format if not None
if exclude_values is not None and not isinstance(exclude_values, list):
exclude_values = [exclude_values]
if isinstance(data, pd.DataFrame):
return _select_values_dataframe(
data,
annotation,
values,
exclude_values)
elif isinstance(data, ad.AnnData):
return _select_values_anndata(data, annotation, values, exclude_values)
else:
error_msg = (
"Unsupported data type. Data must be either a pandas DataFrame"
" or an AnnData object."
)
logging.error(error_msg)
raise TypeError(error_msg)
[docs]
def _select_values_dataframe(data, annotation, values, exclude_values):
possible_annotations = data.columns.tolist()
# Check if the annotation exists using check_list_in_list
check_list_in_list(
input=[annotation],
input_name="annotation",
input_type="column name/annotation key",
target_list=possible_annotations,
need_exist=True
)
# Validate provided values against unique ones, if not None
unique_values = data[annotation].astype(str).unique().tolist()
check_list_in_list(
values,
"values to include",
"label",
unique_values,
need_exist=True
)
check_list_in_list(
exclude_values,
"values to exclude",
"label",
unique_values,
need_exist=True
)
# Proceed with filtering based on values or exclude_values
if values is not None:
filtered_data = data[data[annotation].isin(values)]
elif exclude_values is not None:
filtered_data = data[~data[annotation].isin(exclude_values)]
count = filtered_data.shape[0]
logging.info(
f"Summary of returned dataset: {count} cells",
" match the selected labels."
)
return filtered_data
[docs]
def _select_values_anndata(data, annotation, values, exclude_values):
possible_annotations = data.obs.columns.tolist()
# Check if the annotation exists using check_list_in_list
check_list_in_list(
input=[annotation],
input_name="annotation",
input_type="column name/annotation key",
target_list=possible_annotations,
need_exist=True
)
# Validate provided values against unique ones, if not None
unique_values = data.obs[annotation].astype(str).unique().tolist()
check_list_in_list(
values,
"values to include",
"label",
unique_values,
need_exist=True
)
check_list_in_list(
exclude_values,
"values to exclude",
"label",
unique_values,
need_exist=True
)
# Proceed with filtering based on values or exclude_values
if values is not None:
filtered_data = data[data.obs[annotation].isin(values)].copy()
elif exclude_values is not None:
filtered_data = data[~data.obs[annotation].isin(exclude_values)].copy()
count = filtered_data.n_obs
logging.info(
f"Summary of returned dataset: {count}"
" cells match the selected labels."
)
return filtered_data
[docs]
def downsample_cells(input_data, annotations, n_samples=None, stratify=False,
rand=False, combined_col_name='_combined_',
min_threshold=5):
"""
Custom downsampling of data based on one or more annotations.
This function offers two primary modes of operation:
1. **Grouping (stratify=False)**:
- For a single annotation: The data is grouped by unique values of the
annotation, and 'n_samples' rows are selected from each group.
- For multiple annotations: The data is grouped based on unique
combinations of the annotations, and 'n_samples' rows are selected
from each combined group.
2. **Stratification (stratify=True)**:
- Annotations (single or multiple) are combined into a new column.
- Proportionate stratified sampling is performed based on the unique
combinations in the new column, ensuring that the downsampled dataset
maintains the proportionate representation of each combined group
from the original dataset.
Parameters
----------
input_data : pd.DataFrame
The input data frame.
annotations : str or list of str
The column name(s) to downsample on. If multiple column names are
provided, their values are combined using an underscore as a separator.
n_samples : int, default=None
The number of samples to return. Behavior differs based on the
'stratify' parameter:
- stratify=False: Returns 'n_samples' for each unique value (or
combination) of annotations.
- stratify=True: Returns a total of 'n_samples' stratified by the
frequency of every label or combined labels in the annotation(s).
stratify : bool, default=False
If true, perform proportionate stratified sampling based on the unique
combinations of annotations. This ensures that the downsampled dataset
maintains the proportionate representation of each combined group from
the original dataset.
rand : bool, default=False
If true and stratify is True, randomly select the returned cells.
Otherwise, choose the first n cells.
combined_col_name : str, default='_combined_'
Name of the column that will store combined values when multiple
annotation columns are provided.
min_threshold : int, default=5
The minimum number of samples a combined group should have in the
original dataset to be considered in the downsampled dataset. Groups
with fewer samples than this threshold will be excluded from the
stratification process. Adjusting this parameter determines the
minimum presence a combined group should have in the original dataset
to appear in the downsampled version.
Returns
-------
output_data: pd.DataFrame
The proportionately stratified downsampled data frame.
Notes
-----
This function emphasizes proportionate stratified sampling, ensuring that
the downsampled dataset is a representative subset of the original data
with respect to the combined annotations. Due to this proportionate nature,
not all unique combinations from the original dataset might be present in
the downsampled dataset, especially if a particular combination has very
few samples in the original dataset. The `min_threshold` parameter can be
adjusted to determine the minimum number of samples a combined group
should have in the original dataset to appear in the downsampled version.
"""
logging.basicConfig(level=logging.WARNING)
# Convert annotations to list if it's a string
if isinstance(annotations, str):
annotations = [annotations]
# Check if the columns to downsample on exist
missing_columns = [
col for col in annotations if col not in input_data.columns
]
if missing_columns:
raise ValueError(
f"Columns {missing_columns} do not exist in the dataframe"
)
# If n_samples is None, return the input data without processing
if n_samples is None:
return input_data.copy()
# Combine annotations into a single column if multiple annotations
if len(annotations) > 1:
input_data[combined_col_name] = input_data[annotations].apply(
lambda row: '_'.join(row.values.astype(str)), axis=1)
grouping_col = combined_col_name
else:
grouping_col = annotations[0]
# Stratify selection
if stratify:
# Calculate proportions
freqs = input_data[grouping_col].value_counts(normalize=True)
# Exclude groups with fewer samples than the min_threshold
filtered_freqs = freqs[freqs * len(input_data) >= min_threshold]
# Log warning for groups that are excluded
excluded_groups = freqs[~freqs.index.isin(filtered_freqs.index)]
for group, count in excluded_groups.items():
frequency = freqs.get(group, 0)
logging.warning(
f"Group '{group}' with count {count} "
f"(frequency: {frequency:.4f}) "
f"is excluded due to low frequency."
)
freqs = freqs[freqs.index.isin(filtered_freqs.index)]
samples_per_group = (freqs * n_samples).round().astype(int)
# Identify groups that have non-zero frequency
# but zero samples after rounding
zero_sample_groups = samples_per_group[samples_per_group == 0]
groups_with_zero_samples = zero_sample_groups.index
group_freqs = freqs[groups_with_zero_samples]
original_counts = group_freqs * len(input_data)
# Ensure each group has at least one sample if its frequency
# is non-zero
condition = samples_per_group == 0
samples_per_group[condition] = freqs[condition].apply(
lambda x: 1 if x > 0 else 0
)
# Log a warning for the adjusted groups
if not original_counts.empty:
group_count_pairs = [
f"'{group}': {count}"
for group, count in original_counts.items()
]
summary = ', '.join(group_count_pairs)
logging.warning(
f"Groups adjusted to have at least one sample"
f" due to non-zero frequency: {summary}."
)
# If have extra samples due to rounding, remove them from the
# largest groups
removed_samples = defaultdict(int)
while samples_per_group.sum() > n_samples:
max_group = samples_per_group.idxmax()
samples_per_group[max_group] -= 1
removed_samples[max_group] += 1
# Log warning about the number of samples removed from each group
for group, count in removed_samples.items():
logging.warning(
f"{count} sample(s) were removed from group '{group}'"
f" due to rounding adjustments."
)
# Sample data
sampled_data = []
for group, group_data in input_data.groupby(grouping_col):
sample_count = samples_per_group.get(group, 0)
sample_size = min(sample_count, len(group_data))
if rand:
sampled_data.append(group_data.sample(sample_size))
else:
sampled_data.append(group_data.head(sample_size))
# Concatenate all samples
output_data = pd.concat(sampled_data)
else:
output_data = input_data.groupby(grouping_col, group_keys=False).apply(
lambda x: x.head(min(n_samples, len(x)))
).reset_index(drop=True)
# Log the final counts for each label in the downsampled dataset
label_counts = output_data[grouping_col].value_counts()
for label, count in label_counts.items():
logging.info(f"Final count for label '{label}': {count}")
# Log the total number of rows in the resulting data
logging.info(f"Number of rows in the returned data: {len(output_data)}")
return output_data
[docs]
def calculate_centroid(
data,
x_min,
x_max,
y_min,
y_max,
new_x,
new_y
):
"""
Calculate the spatial coordinates of the cell centroid as the average of
min and max coordinates.
Parameters
----------
data : pd.DataFrame
The input data frame. The dataframe should contain four columns for
x_min, x_max, y_min, and y_max for centroid calculation.
x_min : str
column name with minimum x value
x_max : str
column name with maximum x value
y_min : str
column name with minimum y value
y_max : str
column name with maximum y value
new_x : str
the new column name of the x dimension of the cientroid,
allowing characters are alphabetic, digits and underscore
new_y : str
the new column name of the y dimension of the centroid,
allowing characters are alphabetic, digits and underscore
Returns
-------
data : pd.DataFrame
dataframe with two new centroid columns addded. Note that the
dataframe is modified in place.
"""
# Check for valid column names
invalid_chars = r'[^a-zA-Z0-9_]'
for name in [new_x, new_y]:
if re.search(invalid_chars, name):
error_string = "Column name " + str(name) + \
" contains invalid characters. " + \
"Use only alphanumeric characters and underscores."
raise ValueError(error_string)
# check if the columns exist in the dataframe
for col in [x_min,
x_max,
y_min,
y_max]:
if col not in data.columns:
raise ValueError(f"Column {col} does not exist in the dataframe.")
# Calculate the centroids
x_centroid = (data[x_min] + data[x_max]) / 2
y_centroid = (data[y_min] + data[y_max]) / 2
# Assign new centroid columns to the DataFrame in one operation
data[[new_x, new_y]] = pd.concat(
[x_centroid, y_centroid], axis=1, keys=[new_x, new_y]
)
# Return the modified DataFrame
return data
[docs]
def bin2cat(data, one_hot_annotations, new_annotation):
"""
Combine a set of columns representing
a binary one hot encoding of categories
into a new categorical column.
Parameters
----------
data : pandas.DataFrame
The pandas dataframe containing the one hot encoded annotations.
one_hot_annotations : str or list of str
A string or a list of strings representing
python regular expression of the one hot encoded annotations
columns in the data frame.
new_annotation: str
The column name for new categorical annotation to be created.
Returns
-------
pandas.DataFrame
DataFrame with new categorical column added.
Example:
--------
>>> data = pd.DataFrame({
... 'A': [1, 1, 0, 0],
... 'B': [0, 0, 1, 0]
... })
>>> one_hot_annotations = ['A', 'B']
>>> new_annotation = 'new_category'
>>> result = bin2cat(data, one_hot_annotations, new_annotation)
>>> print(result[new_annotation])
0 A
1 A
2 B
3 NaN
Name: new_category, dtype: object
"""
if isinstance(one_hot_annotations, str):
one_hot_annotations = [one_hot_annotations]
elif not isinstance(one_hot_annotations, list):
error_string = "one_hot_annotations should " + \
"be a string or a list of strings."
raise ValueError(error_string)
if new_annotation in data.columns:
raise ValueError("Column name for new annotation already exists.")
if len(one_hot_annotations) > 0:
# Add regrex to find cell labels
all_columns = list(data.columns)
all_cell_labels = regex_search_list(
one_hot_annotations,
all_columns
)
if len(all_cell_labels) > 0:
cell_labels_df = data.loc[:, all_cell_labels]
def get_columns_with_1(row):
column_names = cell_labels_df.columns[row == 1]
if len(column_names) > 1:
raise ValueError(f"Multiple instance found:{column_names}")
elif len(column_names) == 1:
return column_names[0]
else:
return np.nan
column_names_with_1 = cell_labels_df.apply(
get_columns_with_1,
axis=1)
column_names_with_1 = column_names_with_1.tolist()
data[new_annotation] = column_names_with_1
return data
else:
error_string = "No column was found in the dataframe " + \
"with current regrex pattern(s)."
raise ValueError(error_string)
[docs]
def combine_dfs(dataframes: list):
"""
Combined multiple pandas dataframes into one.
Schema of the first dataframe is considered primary.
A warming will be printed if schema of current dataframe
is different than the primary.
Parameters
----------
dataframes : list[pd.DataFrame]
A list of pandas dataframe to be combined
Return
------
A pd.DataFrame of combined dataframs.
"""
# Check if input is list
if not isinstance(dataframes, list):
raise ValueError("Input is not a list, please check.")
# Check if the input list is empty
if not dataframes:
raise ValueError("Input list is empty, please check.")
# Initialize the combined dataframe with the first dataframe
combined_df = dataframes[0]
# Loop through the remaining dataframes and combine them
for i, df in enumerate(dataframes[1:], start=2):
if not combined_df.columns.equals(df.columns):
warning_message = f"Schema of DataFrame {i} " + \
"is different from the primary DataFrame."
warnings.warn(warning_message, UserWarning)
# Add missing columns to the combined dataframe and fill with NaN
for col in df.columns:
if col not in combined_df.columns:
combined_df[col] = np.nan
# Concatenate the dataframes vertically
combined_df = pd.concat([combined_df, df], ignore_index=True)
# Reset the index of the combined dataframe
combined_df.reset_index(drop=True, inplace=True)
return combined_df
[docs]
def add_pin_color_rules(
adata,
label_color_dict: dict,
color_map_name: str = "_spac_colors",
overwrite: bool = True
) -> Tuple[dict, str]:
"""
Adds pin color rules to the AnnData object and scans for matching labels.
This function scans unique labels in each adata.obs and column names in all
adata tables, to find the labels defined by the pin color rule.
Parameters
----------
adata
The anndata object containing upstream analysis.
label_color_dict : dict
Dictionary of pin color rules with label as key and color as value.
color_map_name : str
The name to use for storing pin color rules in `adata.uns`.
overwrite : bool, optional
Whether to overwrite existing pin color rules in `adata.uns` with the
same name, by default True.
Returns
-------
label_matches : dict
Dictionary with the matching labels in each
section (obs, var, X, etc.).
result_str : str
Summary string with the matching labels in each
section (obs, var, X, etc.).
Raises
------
ValueError
If `color_map_name` already exists in `adata.uns`
and `overwrite` is False.
"""
# Check if the pin color rule already exists in adata.uns
if color_map_name in adata.uns and not overwrite:
raise ValueError(
f"`{color_map_name}` already exists in `adata.uns` ",
"and `overwrite` is set to False."
)
# Add or overwrite pin color rules in adata.uns
adata.uns[color_map_name] = label_color_dict
# Initialize a dictionary to store matching labels
label_matches = {
'obs': {},
'var': {},
'X': {}
}
# Initialize the report string
result_str = "\nobs:\n"
# Scan unique labels in adata.obs
for col in adata.obs.columns:
unique_labels = adata.obs[col].unique()
matching_labels = [
label for label in unique_labels if label in label_color_dict
]
label_matches['obs'][col] = matching_labels
result_str += f"Annotation {col} in obs has matching labels: "
result_str += f"{matching_labels}\n"
result_str += "\nvar:\n"
# Scan unique labels in adata.var
for col in adata.var.columns:
unique_labels = adata.var[col].unique()
matching_labels = [
label for label in unique_labels if label in label_color_dict
]
label_matches['var'][col] = matching_labels
result_str += f"Column {col} in var has matching labels: "
result_str += f"{matching_labels}\n"
# Scan column names in adata.X
if isinstance(adata.X, pd.DataFrame):
col_names = adata.X.columns
else:
col_names = [f'feature{i+1}' for i in range(adata.X.shape[1])]
# If X is a numpy array or sparse matrix
result_str += "\nRaw data table X:\n"
matching_labels = [
label for label in col_names if label in label_color_dict
]
label_matches['X']['column_names'] = matching_labels
result_str += "Raw data table column names have matching labels: "
result_str += f"{matching_labels}\n"
result_str = "\nLabels in the analysis:\n" + result_str
# Check for labels in label_color_dict that
# do not match any labels in label_matches
unmatched_labels = set(label_color_dict.keys()) - set(
label
for section in label_matches.values()
for col in section.values()
for label in col
)
# Append warning for unmatched labels
if unmatched_labels:
for label in unmatched_labels:
result_str = f"{label}\n" + result_str
result_str = (
"\nWARNING: The following labels do not match any labels in "
"the analysis:\n" + result_str
)
for label, color in label_color_dict.items():
result_str = f"{label}: {color}\n" + result_str
result_str = "Labels with color pinned:\n" + result_str
result_str = (
f"Pin Color Rule Labels Count for `{color_map_name}`:\n" + result_str
)
adata.uns[color_map_name+"_summary"] = result_str
return label_matches, result_str
[docs]
def combine_annotations(
adata: AnnData,
annotations: list,
separator: str,
new_annotation_name: str
) -> AnnData:
"""
Combine multiple annotations into a new annotation using a defined separator.
Parameters
----------
adata : AnnData
The input AnnData object whose .obs will be modified.
annotations : list
List of annotation column names to combine.
separator : str
Separator to use when combining annotations.
new_annotation_name : str
The name of the new annotation to be created.
Returns
-------
AnnData
The AnnData object with the combined annotation added.
"""
# Check that the list is not emply
if len(annotations) == 0:
raise ValueError('Annotations list cannot be empty.')
# Validate input annotations using utility function
check_annotation(adata, annotations=annotations)
if type(annotations) is not list:
raise ValueError(
f'Annotations must be a list. Got {type(annotations)}'
)
# Ensure separator is a string
if not isinstance(separator, str):
raise ValueError(
f'Separator must be a string. Got {type(separator)}'
)
# Check if new annotation name already exists
if new_annotation_name in adata.obs.columns:
raise ValueError(
f"'{new_annotation_name}' already exists in adata.obs.")
# Combine annotations into the new column
# Convert selected annotations to string type
annotations_str = adata.obs[annotations].astype(str)
# Combine annotations using the separator
combined_annotation = annotations_str.agg(separator.join, axis=1)
# Assign the combined result to the new annotation column
adata.obs[new_annotation_name] = combined_annotation
return adata