Source code for spac.phenotyping


import pandas as pd


[docs] def is_binary_0_1(column): """ Check if a pandas Series contains only binary values (0 and 1). Parameters ---------- column : pandas.Series The pandas Series to check. Returns ------- bool True if the Series contains only 0 and 1, False otherwise. Notes ----- The function considers a Series to be binary if it contains exactly the values 0 and 1, and no other values. """ unique_values = set(column.unique()) return unique_values == {0, 1}
[docs] def decode_phenotype(data, phenotype_code, **kwargs): """ Convert a phenotype code into a dictionary mapping feature (marker) names to values for that marker's classification as '+' or '-'. Parameters ---------- data : pandas.DataFrame The DataFrame containing the columns that will be used to decode the phenotype. phenotype_code : str The phenotype code string, which should end with '+' or '-'. **kwargs : keyword arguments Optional keyword arguments to specify prefix and suffix to be added to the column names. - prefix : str, optional Prefix to be added to the column names for the feature classification. Default is ''. - suffix : str, optional Suffix to be added to the column names for the feature classification. Default is ''. Returns ------- dict A dictionary where the keys are column names and the values are the corresponding phenotype classification. Raises ------ ValueError If the phenotype code does not end with '+' or '-' or if any columns specified in the phenotype code do not exist in the DataFrame. Notes ----- The function splits the phenotype code on '+' and '-' characters to determine the phenotype columns and values. It checks if the columns exist in the DataFrame and whether they are binary or string types to properly map values. """ import re # The phenotype code should end with '+' or '-' if not (phenotype_code.endswith('+') or phenotype_code.endswith('-')): raise ValueError( ( f'The passed phenotype code "{phenotype_code}"' ' should end with "+" or "-"' ) ) # Split the phenotype definition on '+' and '-' characters phenotypes = re.split(r'\+|-', phenotype_code) phenotypes.remove('') prefix = kwargs.get("prefix", '') suffix = kwargs.get("suffix", '') phenotypes_columns = [f"{prefix}{name}{suffix}" for name in phenotypes] existing_columns = data.columns for value in phenotypes_columns: if value not in existing_columns: raise ValueError( ( f'The feature "{value}" does not exist in the input table.' f' Existing columns are "{existing_columns.tolist()}"' ) ) phenotype_values = re.findall(r'[A-Za-z0-9]+[+-]', phenotype_code) phenotype_dict = {} for value, column in zip(phenotype_values, phenotypes_columns): if pd.api.types.is_string_dtype(data[column]): phenotype_dict[column] = value if is_binary_0_1(data[column]): if value.endswith('+'): phenotype_dict[column] = 1 elif value.endswith('-'): phenotype_dict[column] = 0 return phenotype_dict
[docs] def generate_phenotypes_dict(data_df, phenotypes_df, prefix='', suffix=''): """ Generate a dictionary of phenotype names to their corresponding decoding rules. Parameters ---------- data_df : pandas.DataFrame The DataFrame containing the columns that will be used to decode the phenotypes. phenotypes_df : pandas.DataFrame A DataFrame containing phenotype definitions with columns: - "phenotype_name" : str The name of the phenotype. - "phenotype_code" : str The code used to decode the phenotype. prefix : str, optional Prefix to be added to the column names. Default is ''. suffix : str, optional Suffix to be added to the column names. Default is ''. Returns ------- dict A dictionary where the keys are phenotype names and the values are dictionaries mapping column names to values. Notes ----- The function iterates over each row in the `phenotypes_df` DataFrame and decodes the phenotype using the `decode_phenotype` function. """ all_phenotypes = {} for index, row in phenotypes_df.iterrows(): phenotype_name = row["phenotype_name"] phenotype_code = row["phenotype_code"] all_phenotypes[phenotype_name] = decode_phenotype( data_df, phenotype_code, prefix=prefix, suffix=suffix ) return all_phenotypes
[docs] def apply_phenotypes(data_df, phenotypes_dic): """ Add binary columns to the DataFrame indicating if each cell matches a phenotype. Parameters ---------- data_df : pandas.DataFrame The DataFrame to which binary phenotype columns will be added. phenotypes_dic : dict A dictionary where the keys are phenotype names and the values are dictionaries mapping column names to values. Returns ------- dict A dictionary where the keys are phenotype names and the values are the counts of rows that match each phenotype. Notes ----- The function creates binary columns in the DataFrame for each phenotype and counts the number of rows matching each phenotype. """ return_dic = {} for phenotype, rule in phenotypes_dic.items(): matching_rows = ( data_df[list(rule.keys())] .eq(list(rule.values())) .all(axis=1) ) count_matching_rows = matching_rows.sum() return_dic[phenotype] = count_matching_rows data_df.loc[matching_rows, phenotype] = 1 data_df.loc[~matching_rows, phenotype] = 0 return return_dic
[docs] def combine_phenotypes(data_df, phenotype_columns, multiple=True): """ Combine multiple binary phenotype columns into a new column in a vectorized manner. Parameters ---------- data_df : pandas.DataFrame DataFrame containing the phenotype columns. phenotype_columns : list of str List of binary phenotype column names. multiple : bool, optional Whether to concatenate the names of multiple positive phenotypes. If False, all multiple positive phenotypes are labeled as "no_label". Default is True. Returns ------- pandas.Series A Series representing the combined phenotype for each row. """ # Create a mask for each phenotype column where values are 1 (positive) phenotype_masks = data_df[phenotype_columns].astype(bool) # Create a series of phenotype names with a comma and space # after each name. That series will be used to join names # of positive phenotypes in the vectorized operation below. phenotypes_series = pd.Index(phenotype_columns) + ", " # For each row, join the names of positive phenotypes by doing # a dot product between the mask and the series of phenotype names. combined_phenotypes = \ phenotype_masks.dot(phenotypes_series).str.rstrip(", ") # Set all with 0 positive phenotypes to "no_label" counts_positive = phenotype_masks.sum(axis=1) combined_phenotypes[counts_positive == 0] = "no_label" # Handle the case when multiple is False: if not multiple: # set all with >1 positive phenotypes to "no_label" combined_phenotypes[counts_positive > 1] = "no_label" return combined_phenotypes
[docs] def assign_manual_phenotypes( data_df, phenotypes_df, annotation="manual_phenotype", prefix='', suffix='', multiple=True, drop_binary_code=True): """ Assign manual phenotypes to the DataFrame and generate summaries. Parameters ---------- data_df : pandas.DataFrame The DataFrame to which manual phenotypes will be assigned. phenotypes_df : pandas.DataFrame A DataFrame containing phenotype definitions with columns: - "phenotype_name" : str The name of the phenotype. - "phenotype_code" : str The code used to decode the phenotype. annotation : str, optional The name of the column to store the combined phenotype. Default is "manual_phenotype". prefix : str, optional Prefix to be added to the column names. Default is ''. suffix : str, optional Suffix to be added to the column names. Default is ''. multiple : bool, optional Whether to concatenate the names of multiple positive phenotypes. Default is True. drop_binary_code : bool, optional Whether to drop the binary phenotype columns. Default is True. Returns ------- dict A dictionary with the following keys: - "phenotypes_counts": dict Counts of cells matching each defined phenotype. - "assigned_phenotype_counts": dict Counts of cells matching different numbers of phenotypes. - "multiple_phenotypes_summary": pandas.DataFrame Summary of cells with multiple phenotypes. Notes ----- The function generates a combined phenotype column, prints summaries of cells matching multiple phenotypes, and returns a dictionary with detailed counts and summaries. Examples -------- Suppose `data_df` is a DataFrame with binary phenotype columns and `phenotypes_df` contains the following definitions: >>> data_df = pd.DataFrame({ ... 'cd4_phenotype': [0, 1, 0, 1], ... 'cd8_phenotype': [0, 0, 1, 1] ... }) >>> phenotypes_df = pd.DataFrame([ ... {"phenotype_name": "cd4_cells", "phenotype_code": "cd4+"}, ... {"phenotype_name": "cd8_cells", "phenotype_code": "cd8+"}, ... {"phenotype_name": "cd4_cd8", "phenotype_code": "cd4+cd8+"} ... ]) >>> result = assign_manual_phenotypes( ... data_df, ... phenotypes_df, ... annotation="manual", ... prefix='', ... suffix='_phenotype', ... multiple=True ... ) The `data_df` DataFrame will be edited in place to include a new column `"manual"` with the combined phenotype labels: >>> print(data_df) cd4_phenotype cd8_phenotype manual 0 0 0 no_label 1 1 0 cd4_cells 2 0 1 cd8_cells 3 1 1 cd8_cells, cd4_cd8 The result dictionary contains counts and summaries as follows: >>> print(result["phenotypes_counts"]) {'cd4_cells': 1, 'cd8_cells': 2, 'cd4_cd8': 1} >>> print(result["assigned_phenotype_counts"]) 0 1 1 2 2 1 Name: num_phenotypes, dtype: int64 >>> print(result["multiple_phenotypes_summary"]) manual count 0 cd8_cells, cd4_cd8 1 """ phenotypes_dic = generate_phenotypes_dict( data_df, phenotypes_df, prefix, suffix ) phenotypes_counts = apply_phenotypes(data_df, phenotypes_dic) print("\n#####################################\n") # Print the counts of cells in every phenotype for phenotype, count in phenotypes_counts.items(): print(f"{phenotype}: {count} cell(s)") phenotypes_columns = phenotypes_dic.keys() data_df[annotation] = combine_phenotypes( data_df, phenotypes_columns, multiple) number_phenotypes_columns = "num_phenotypes" data_df[number_phenotypes_columns] = ( data_df[phenotypes_columns].sum(axis=1).astype(int) ) summary = data_df[number_phenotypes_columns].value_counts().sort_index() print("\n#####################################\n") print("Summary of cells that matched multiple phenotypes:") for num_phenotypes, count in summary.items(): print( f"Cells that matched {num_phenotypes}" f" phenotype(s): {count} cells" ) print("\n#####################################\n") print(f'Summary of cells with multiple phenotypes in "{annotation}"\n\n') multiple_phenotypes = ( data_df[data_df[number_phenotypes_columns] > 1] .groupby(annotation) .size() .sort_values(ascending=False) ) multiple_phenotypes_df = multiple_phenotypes.reset_index(name='count') print(multiple_phenotypes_df.to_string(index=False)) return_dic = {} return_dic["assigned_phenotype_counts"] = summary return_dic["multiple_phenotypes_summary"] = multiple_phenotypes_df return_dic["phenotypes_counts"] = phenotypes_counts if drop_binary_code is True: # Remove the columns defined by the keys of the dictionary phenotypes_counts data_df.drop(columns=phenotypes_counts.keys(), inplace=True) return return_dic