Source code for cellphonedb.utils.db_utils

from typing import Tuple
import pandas as pd
import numpy as np
import os
import io
import re
import time
import zipfile
import itertools
import pathlib
from cellphonedb.utils.file_utils import dbg
from cellphonedb.utils import file_utils, unique_id_generator
import urllib.request
import urllib.error
import urllib.parse
from zipfile import ZipFile
from cellphonedb.src.core.exceptions.DatabaseCreationException import DatabaseCreationException

MULTIDATA_TABLE_BOOLEAN_COLS = ['receptor', 'other', 'secreted_highlight',
                                'transmembrane', 'secreted', 'peripheral', 'integrin', 'is_complex']

PROTEIN_INFO_FIELDS_FOR_WEB = ['transmembrane', 'secreted', 'secreted_desc', 'receptor', 'integrin', 'other_desc']
COMPLEX_INFO_FIELDS_FOR_WEB = ['transmembrane', 'peripheral', 'secreted', 'secreted_desc', 'receptor', 'integrin',
                               'other_desc']
COMPLEX_CROSSREFERENCE_FIELDS_FOR_WEB = ['reactome_reaction', 'reactome_complex', 'complexPortal_complex',
                                         'rhea_reaction']

INPUT_FILE_NAMES = ['complex_input', 'gene_input', 'interaction_input', 'protein_input', 'transcription_factor_input']
# This is used to indicate CellPhoneDB released data (as opposed to user-added data
# when they create their own CellPhoneDB file)
CORE_CELLPHONEDB_DATA = "CellPhoneDBcore"


def get_protein_and_complex_data_for_web(cpdb_file_path) -> Tuple[dict, dict, dict, dict]:
    # Extract csv files from db_files_path/cellphonedb.zip into dbTableDFs
    dbTableDFs = extract_dataframes_from_db(cpdb_file_path)
    mtTable = dbTableDFs['multidata_table'].copy()
    cpxTable = dbTableDFs['complex_table']
    proteinTable = dbTableDFs['protein_table']

    for col in set(PROTEIN_INFO_FIELDS_FOR_WEB + COMPLEX_INFO_FIELDS_FOR_WEB):
        mtTable = mtTable.astype({col: 'str'})
        mtTable.loc[mtTable[col] == "True", col] = col.capitalize()
        mtTable.loc[mtTable[col] == "False", col] = np.nan
        if col in ['other_desc']:
            # Sanitize values for displaying to the user
            mtTable[col] = mtTable[col].str.replace("_", " ").str.capitalize()

    mtp = mtTable[~mtTable['is_complex']]
    aux = pd.merge(mtp, proteinTable, left_on='id_multidata', right_on='protein_multidata_id')
    proteinAcc2Name = dict(zip(aux['name'], aux['protein_name']))

    mtc = mtTable[mtTable['is_complex']]

    aux = dict(zip(mtp['name'], mtp[PROTEIN_INFO_FIELDS_FOR_WEB].values))
    protein2Info = {k: [x for x in aux[k] if str(x) != 'nan'] for k in aux}
    aux = dict(zip(mtc['name'], mtc[COMPLEX_INFO_FIELDS_FOR_WEB].values))
    complex2Info = {k: [x for x in aux[k] if str(x) != 'nan'] for k in aux}

    aux = pd.merge(mtc, cpxTable, left_on='id_multidata', right_on='complex_multidata_id')
    complex_cols = list(aux.columns.values.tolist())
    resource2Complex2Acc = {}
    for col in COMPLEX_CROSSREFERENCE_FIELDS_FOR_WEB:
        if col in complex_cols:
            # The above test is in case the user created their own CellphoneDB database and had chosen to remove
            # COMPLEX_CROSSREFERENCE_FIELDS_FOR_WEB fields from complex_input.csv
            aux1 = aux.loc[pd.notna(aux[col])]
            resource2Complex2Acc[col.replace("_", " ").capitalize()] = dict(zip(aux1['name'], aux1[col]))

    return protein2Info, complex2Info, resource2Complex2Acc, proteinAcc2Name


[docs]def get_interactions_genes_complex(cpdb_file_path) -> \ Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, dict, dict]: """ Returns a tuple of four DataFrames containing data from <cpdb_dir>/cellphonedb.zip. Parameters ---------- cpdb_file_path: str CellphoneDB database file path Returns ------- Tuple - interactions: pd.DataFrame - genes: pd.DataFrame - complex_composition: pd.DataFrame - complex_expanded: pd.DataFrame - gene_synonym2gene_name: dict - receptor2tfs: dict """ # Extract csv files from db_files_path/cellphonedb.zip into dbTableDFs dbTableDFs = extract_dataframes_from_db(cpdb_file_path) # Convert dbTableDFs into interactions, genes, complex_composition, complex_expanded data frames # and gene_synonym2gene_name dict gene_synonym2gene_name = {} # Cater for DB version-dependent input files if 'gene_synonym_to_gene_name' in dbTableDFs: gs2gn = dbTableDFs['gene_synonym_to_gene_name'] gene_synonym2gene_name = dict(zip(gs2gn['Gene Synonym'], gs2gn['Gene Name'])) mtTable = dbTableDFs['multidata_table'] dbg(mtTable.dtypes) # Convert all MULTIDATA_TABLE_BOOLEAN_COLS from Integer (0/1) to Boolean for col in MULTIDATA_TABLE_BOOLEAN_COLS: mtTable[col] = mtTable[col].astype(bool) # Read genes 'table' - c.f. old CellphoneDB: GeneRepository.get_all_expanded() # Drop 'protein_name' column from dbTableDFs['gene_table'] as dbTableDFs['protein_table'] already has it if 'protein_name' in dbTableDFs['gene_table'].columns: dbTableDFs['gene_table'] = dbTableDFs['gene_table'].drop('protein_name', axis=1) # First filter out entries where gene_name = X with no hgnc_symbol, e.g. in the case of IGF2, filter out ENSG00000284779 dbTableDFs['gene_table'] = dbTableDFs['gene_table'][~dbTableDFs['gene_table']['hgnc_symbol'].isnull()] # Now merge gene_table with protein_table and multidata_table genes = pd.merge(dbTableDFs['gene_table'], dbTableDFs['protein_table'], left_on='protein_id', right_on='id_protein') genes = pd.merge(genes, mtTable, left_on='protein_multidata_id', right_on='id_multidata') dbg("genes columns: ", genes.columns) # Read complex compositions complex_composition = dbTableDFs['complex_composition_table'] # Read 'interactions' - c.f. old CellphoneDB: InteractionRepository.get_all_expanded() suffixes = ('_1', '_2') includeGene = False if includeGene: multidata_simple = genes.copy() else: multidata_simple = pd.merge(dbTableDFs['protein_table'], mtTable, left_on='protein_multidata_id', right_on='id_multidata') multidata_complex = pd.merge(mtTable, dbTableDFs['complex_table'], left_on='id_multidata', right_on='complex_multidata_id') if multidata_complex.empty: multidata_expanded = multidata_simple else: multidata_expanded = pd.concat([multidata_simple, multidata_complex], ignore_index=True, sort=True) # C.f. old CellphoneDB: InteractionRepository.get_all_expanded() multidata_expanded = multidata_expanded.astype({'id_multidata': 'int64'}) dbg(multidata_expanded.columns) interactions = pd.merge(dbTableDFs['interaction_table'], multidata_expanded, how='left', left_on=['multidata_1_id'], right_on=['id_multidata']) interactions = pd.merge(interactions, multidata_expanded, how='left', left_on=['multidata_2_id'], right_on=['id_multidata'], suffixes=suffixes) dbg("interactions columns: ", interactions.columns) # Generate complex_expanded - c.f. ComplexRepository.get_all_expanded() complex_expanded = pd.merge(dbTableDFs['complex_table'], mtTable, left_on='complex_multidata_id', right_on='id_multidata') # dbg(complexes_expanded.columns) # index interactions and complex data frames # C.f. old CellphoneDB: method_launcher.get_interactions_genes_complex() interactions.set_index('id_interaction', drop=True, inplace=True) complex_composition.set_index('id_complex_composition', inplace=True, drop=True) # Cater for DB version-dependent input files receptor2tfs = {} if 'receptor_to_transcription_factor' in dbTableDFs: receptor_to_tf_df = dbTableDFs['receptor_to_transcription_factor'][['Receptor', 'TF']] for receptor, tf in receptor_to_tf_df.values: receptor2tfs.update({receptor: receptor2tfs.get(receptor, []) + [tf]}) return interactions, genes, complex_composition, complex_expanded, gene_synonym2gene_name, receptor2tfs
def extract_dataframes_from_db(cpdb_file_path): dfs = {} start = time.time() for tuple in unzip(cpdb_file_path): file_name = tuple[0] file_handle = tuple[1] dbg("Retrieving from zip file: " + file_name) dfs[file_name.replace('.csv', '')] = pd.read_csv(file_handle) duration = time.time() - start dbg("Loaded DB into memory in " + str(round(duration, 2)) + "s") return dfs def unzip(zip_file_path): """ Download a ZIP file and extract its contents in memory yields (filename, file-like object) pairs """ with open(zip_file_path, 'br') as file: # Note - eval unescapes the double-quotes content = file.read() dbg(type(content)) with zipfile.ZipFile(io.BytesIO(content)) as thezip: for zipinfo in thezip.infolist(): with thezip.open(zipinfo) as thefile: yield zipinfo.filename, thefile
[docs]def get_db_path(user_dir_root, db_version): """ Retrieves the path to the local database file corresponding to db_version Parameters ---------- user_dir_root: str The directory in which user stores CellphoneDB files db_version: str CellphoneDB version (and the name of the subdirectory containing the curated input files from https://github.com/ventolab/cellphonedb-data) Returns ------- str The path to the local database file corresponding to db_version """ return os.path.join(user_dir_root, "releases", db_version)
# Cater for DB version-dependent column names def get_column_names_for_db_version(complex_db_df, interactions_df, protein_df) -> tuple: protein_column_names = ['uniprot_1', 'uniprot_2', 'uniprot_3', 'uniprot_4'] interaction_column_names1 = [] interaction_column_names2 = [] complex_columns = [] version = [] if 'directionality' in interactions_df.columns: interaction_column_names1 = ['directionality', 'classification'] interaction_column_names2 = ['is_ppi', 'curator'] if 'uniprot_5' in complex_db_df.columns: protein_column_names += ['uniprot_5'] complex_columns = COMPLEX_CROSSREFERENCE_FIELDS_FOR_WEB if 'version' in protein_df.columns: version = ['version'] return (protein_column_names, interaction_column_names1, interaction_column_names2, version, complex_columns) def collect_protein_data(data_dfs: dict) -> pd.DataFrame: # Collect protein data protein_db_df = data_dfs['protein_input'][['protein_name', 'tags', 'tags_reason', 'tags_description', 'uniprot']] num_proteins = protein_db_df.shape[0] multidata_id_list_so_far = list(range(num_proteins)) protein_db_df.insert(0, 'id_protein', multidata_id_list_so_far, False) protein_db_df.insert(len(protein_db_df.columns), 'protein_multidata_id', protein_db_df['id_protein'].tolist(), True) # dbg(protein_db_df.info) return protein_db_df, multidata_id_list_so_far def collect_gene_data(data_dfs: dict, protein_db_df: pd.DataFrame) -> pd.DataFrame: gene_db_df = data_dfs['gene_input'][['ensembl', 'gene_name', 'hgnc_symbol', 'uniprot']] num_genes = gene_db_df.shape[0] gene_db_df.insert(0, 'id_gene', list(range(num_genes)), False) # Assign values from protein_db_df['protein_multidata_id'] into gene_db_df['protein_id'] # via join between 'uniprot' and 'protein_name' gene_db_df = pd.merge(gene_db_df, protein_db_df[['protein_name', 'protein_multidata_id', 'uniprot']], on='uniprot') gene_db_df = gene_db_df.drop('uniprot', axis=1) protein_db_df = protein_db_df.drop('uniprot', axis=1) gene_db_df.rename(columns={'protein_multidata_id': 'protein_id'}, inplace=True) # print(gene_db_df.info) return gene_db_df def collect_receptor_to_tf_mapping(data_dfs: dict) -> pd.DataFrame: receptor_to_tf_df = None # Cater for DB version-dependent input files if data_dfs['transcription_factor_input'] is not None: receptor_to_tf_df = data_dfs['transcription_factor_input'][['receptor_id', 'TF_symbol']].copy() receptor_to_tf_df.columns = ['Receptor', 'TF'] # Strip any leading or trailing spaces receptor_to_tf_df.replace(r'\s*(.*?)\s*', r'\1', regex=True, inplace=True) return receptor_to_tf_df def collect_gene_synonym_to_gene_name_mapping(data_dfs: dict, gene_db_df: pd.DataFrame) -> pd.DataFrame: gene_synonym_to_gene_name_db_df = None # Cater for DB version-dependent input files if data_dfs['gene_synonyms_input'] is not None: gene_synonym_to_gene_name = {} for gene_names in data_dfs['gene_synonyms_input']\ .filter(regex=("Gene Names.*")).dropna().agg(' '.join, axis=1).tolist(): gene_names_arr = re.split(';\\s*|\\s+', gene_names) for gene_name in gene_db_df[gene_db_df['gene_name'].isin(gene_names_arr)]['gene_name'].tolist(): for gene_synonym in gene_names_arr: if gene_synonym != gene_name: gene_synonym_to_gene_name[gene_synonym] = gene_name gene_synonym_to_gene_name_db_df = pd.DataFrame(gene_synonym_to_gene_name.items(), columns=['Gene Synonym', 'Gene Name']) return gene_synonym_to_gene_name_db_df
[docs]def create_db(target_dir) -> None: """ Creates CellphoneDB databases file (cellphonedb.zip) in <target_dir> directory. The assumption is that <target_dir> contains the four *_input.csv files required to create the database file. This simple zip file contains a number of CSV files that CellphoneDB package reads into memory and uses to drive its analysis of user data. Parameters ---------- target_dir: str The directory in which to place the database file (and which contains the four *_input.csv files required to create the database file Returns ------- """ gene_input = os.path.join(target_dir, "gene_input.csv") protein_input = os.path.join(target_dir, "protein_input.csv") complex_input = os.path.join(target_dir, "complex_input.csv") interaction_input = os.path.join(target_dir, "interaction_input.csv") transcription_factor_input = os.path.join(target_dir, "transcription_factor_input.csv") gene_synonyms_input = os.path.join(target_dir, "sources/uniprot_synonyms.tsv") pathlib.Path(target_dir).mkdir(parents=True, exist_ok=True) data_dfs = get_dfs(gene_input=gene_input, protein_input=protein_input, complex_input=complex_input, interaction_input=interaction_input, transcription_factor_input=transcription_factor_input, gene_synonyms_input=gene_synonyms_input) (protein_column_names, interaction_column_names1, interaction_column_names2, version, complex_columns) = \ get_column_names_for_db_version( data_dfs['complex_input'], data_dfs['interaction_input'], data_dfs['protein_input']) # Perform sanity tests on *_input files and report any issues to the user as warnings run_sanity_tests(data_dfs, protein_column_names, version) # Collect protein data protein_db_df, multidata_id_list_so_far = collect_protein_data(data_dfs) # Collect gene data gene_db_df = collect_gene_data(data_dfs, protein_db_df) # Collect mapping: (receptor) gene name -> TF gene name (in transcription_factor_input.tsv) receptor_to_tf_df = collect_receptor_to_tf_mapping(data_dfs) # Collect mapping: gene synonym (not in gene_input.csv) -> gene name (in gene_input.csv) gene_synonym_to_gene_name_db_df = collect_gene_synonym_to_gene_name_mapping(data_dfs, gene_db_df) # Collect complex data cols = protein_column_names + ['pdb_structure', 'pdb_id', 'stoichiometry', 'comments_complex'] + complex_columns complex_db_df = data_dfs['complex_input'][cols] # Note that uniprot_* cols will be dropped after complex_composition_df has been constructed num_complexes = complex_db_df.shape[0] complex_db_df.insert(0, 'id_complex', list(range(num_complexes)), False) next_md_id = multidata_id_list_so_far[-1] + 1 complex_multidata_ids = list(range(next_md_id, next_md_id + num_complexes)) complex_db_df.insert(1, 'complex_multidata_id', complex_multidata_ids, False) multidata_id_list_so_far.extend(complex_multidata_ids) # dbg(multidata_id_list_so_far[-1]) # dbg(complex_db_df.info) # Collect multidata # Insert proteins into multidata multidata_db_df = \ data_dfs['protein_input'][['uniprot', 'receptor', 'receptor_desc', 'other', 'other_desc', 'secreted_highlight', 'secreted_desc', 'transmembrane', 'secreted', 'peripheral', 'integrin']].copy() multidata_db_df.rename(columns={'uniprot': 'name'}, inplace=True) multidata_ids = pd.merge( data_dfs['protein_input'][['protein_name']], protein_db_df[['protein_name', 'protein_multidata_id']], on='protein_name')['protein_multidata_id'].tolist() multidata_db_df.insert(0, 'id_multidata', multidata_ids, False) multidata_db_df.insert(len(multidata_db_df.columns), 'is_complex', list(itertools.repeat(False, multidata_db_df.shape[0])), True) dbg(multidata_db_df.shape, multidata_db_df.index, multidata_db_df.columns) # Insert complexes into multidata cols = ['complex_name', 'receptor', 'receptor_desc', 'other', 'other_desc', 'secreted_highlight', 'secreted_desc', 'transmembrane', 'secreted', 'peripheral', 'integrin'] complex_aux_df = data_dfs['complex_input'][cols].copy() complex_aux_df.rename(columns={'complex_name': 'name'}, inplace=True) complex_aux_df.insert(0, 'id_multidata', complex_multidata_ids, False) complex_aux_df.insert(len(complex_aux_df.columns), 'is_complex', list(itertools.repeat(True, complex_aux_df.shape[0])), True) dbg(complex_aux_df.shape, complex_aux_df.index, complex_aux_df.columns) # Append complex_aux_df to multidata_db_df multidata_db_df = pd.concat([multidata_db_df, complex_aux_df], ignore_index=True, verify_integrity=True) dbg(multidata_db_df.shape, multidata_db_df.index, multidata_db_df.columns) # First collect total_protein counts for each complex in complex_db_df total_protein_cnt_list = np.apply_along_axis( lambda s: sum(isinstance(x, str) for x in s), 1, complex_db_df[protein_column_names].values).tolist() complex_db_df.insert(len(complex_db_df.columns), 'total_protein', total_protein_cnt_list, True) dbg(complex_db_df.info) # Next collect all complex_composition data into cc_list cc_list = [] pos = len(protein_column_names) for r in complex_db_df[protein_column_names + ['complex_multidata_id', 'total_protein']].values.tolist(): for acc in filter(lambda x: isinstance(x, str), r): protein_multidata_id = \ multidata_db_df.loc[(~multidata_db_df['is_complex']) & (multidata_db_df['name'] == acc), ['id_multidata']].iat[0, 0] complex_multidata_id = r[pos] total_protein = r[pos+1] cc_list.append([complex_multidata_id, protein_multidata_id, total_protein]) complex_composition_df = pd.DataFrame(cc_list, columns=['complex_multidata_id', 'protein_multidata_id', 'total_protein']) complex_composition_df.insert(0, 'id_complex_composition', list(range(len(cc_list))), False) dbg(complex_composition_df.shape, complex_composition_df.index, complex_composition_df.columns, complex_composition_df.info) # Next drop the auxiliary columns from complex_db_df: protein_column_names and 'total_protein' for col in protein_column_names + ['total_protein']: complex_db_df = complex_db_df.drop(col, axis=1) # Collect interaction data interactions_aux_df = pd.merge(data_dfs['interaction_input'], multidata_db_df, left_on=['partner_a'], right_on=['name']) interactions_aux_df = pd.merge(interactions_aux_df, multidata_db_df, left_on=['partner_b'], right_on=['name'], suffixes=['_x', '_y']) dbg(interactions_aux_df.shape) # Remove interactions non-CPDB interactors interactions_aux_df = interactions_aux_df[ interactions_aux_df.apply( lambda interaction: file_utils.is_cellphonedb_interactor(interaction, ('_x', '_y')), axis=1)] interactions_aux_df['id_cp_interaction'] = interactions_aux_df.apply( lambda interaction: unique_id_generator.interaction(interaction, ('_x', '_y')), axis=1) dbg(interactions_aux_df.info) dbg(interactions_aux_df.columns) interactions_df = interactions_aux_df[['id_cp_interaction', 'id_multidata_x', 'id_multidata_y', 'source', 'annotation_strategy'] + interaction_column_names2 + interaction_column_names1].copy() interactions_df.rename(columns={'id_multidata_x': 'multidata_1_id', 'id_multidata_y': 'multidata_2_id'}, inplace=True) interactions_df.insert(0, 'id_interaction', list(range(interactions_df.shape[0])), False) dbg(interactions_df.shape, interactions_df.index, interactions_df.columns) # Save all DFs as csv files inside a DB zip file zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: zip_file.writestr('protein_table.csv', protein_db_df.to_csv(index=False, sep=',').encode('utf-8')) zip_file.writestr('gene_table.csv', gene_db_df.to_csv(index=False, sep=',').encode('utf-8')) zip_file.writestr('complex_table.csv', complex_db_df.to_csv(index=False, sep=',').encode('utf-8')) zip_file.writestr('complex_composition_table.csv', complex_composition_df.to_csv(index=False, sep=',').encode('utf-8')) zip_file.writestr('multidata_table.csv', multidata_db_df.to_csv(index=False, sep=',').encode('utf-8')) zip_file.writestr('interaction_table.csv', interactions_df.to_csv(index=False, sep=',').encode('utf-8')) if gene_synonym_to_gene_name_db_df is not None: # Cater for DB version-dependent input files zip_file.writestr('gene_synonym_to_gene_name.csv', gene_synonym_to_gene_name_db_df.to_csv(index=False, sep=',').encode('utf-8')) if receptor_to_tf_df is not None: # Cater for DB version-dependent input files zip_file.writestr('receptor_to_transcription_factor.csv', receptor_to_tf_df.to_csv(index=False, sep=',').encode('utf-8')) file_suffix = file_utils.get_timestamp_suffix() file_path = os.path.join(target_dir, 'cellphonedb_{}.zip'.format(file_suffix)) with open(file_path, 'wb') as f: f.write(zip_buffer.getvalue()) print("Created {} successfully".format(file_path))
def download_database(target_dir, cpdb_version): download_released_files(target_dir, cpdb_version, "cellphonedb.zip|_input|sources\\/uniprot_synonyms") def download_released_files(target_dir, cpdb_version, regex): r = urllib.request.urlopen('https://github.com/ventolab/cellphonedb-data/archive/refs/tags/{}.zip'.format(cpdb_version)) zipContent = ZipFile(io.BytesIO(r.read())) for fpath in zipContent.namelist(): if re.search(regex, fpath): fname = fpath.split("/")[-1] if fname: if re.search("sources", fpath): target_dir = os.path.join(target_dir, "sources") pathlib.Path(target_dir).mkdir(parents=True, exist_ok=True) with open(os.path.join(target_dir, fname), 'wb') as f: f.write(zipContent.read(fpath)) print("Downloaded {} into {}".format(fname, target_dir)) def get_dfs(gene_input=None, protein_input=None, complex_input=None, interaction_input=None, transcription_factor_input=None, gene_synonyms_input=None): dfs = {} dfs['gene_input'] = file_utils.read_data_table_from_file(gene_input) dfs['protein_input'] = file_utils.read_data_table_from_file(protein_input) dfs['complex_input'] = file_utils.read_data_table_from_file(complex_input) dfs['interaction_input'] = file_utils.read_data_table_from_file(interaction_input) dfs['transcription_factor_input'] = file_utils.read_data_table_from_file(transcription_factor_input, optional=True) dfs['gene_synonyms_input'] = file_utils.read_data_table_from_file(gene_synonyms_input, optional=True) return dfs def sanity_test_uniprot_accessions_map_to_multiple_gene_names(gene_db_df: pd.DataFrame): gene_names_uniprot_df = gene_db_df[['gene_name', 'uniprot']].copy() gene_names_uniprot_df.drop_duplicates(inplace=True) dups = gene_names_uniprot_df[gene_names_uniprot_df['uniprot'].duplicated()] if not dups.empty: # data_errors_found = True print("WARNING: The following UniProt ids map to multiple gene names (it is expected that " + "they should map to only one):") print(", ".join(dups['uniprot'].tolist())) def sanity_test_report_complex_duplicates(complex_db_df: pd.DataFrame): test_complex_db_df = complex_db_df.set_index('complex_name') if test_complex_db_df.index.has_duplicates: print("WARNING: complex_input.csv has the following duplicates:") print("\n".join(complex_db_df[test_complex_db_df.index.duplicated(keep='first')]['complex_name'].tolist()) + "\n") def sanity_test_report_complexes_with_same_participants( complex_db_df: pd.DataFrame, version: str, protein_column_names: list): participants_set_to_complex_names = {} participants_set_to_data_sources = {} cols = ['complex_name'] + version + protein_column_names start_idx = cols.index('uniprot_1') for row in complex_db_df[cols].itertuples(index=False): participants_set = frozenset([i for i in row[start_idx:] if str(i) != 'nan']) complex_name = row[0] data_source = row[1] m = re.search(r"^{}".format(CORE_CELLPHONEDB_DATA), data_source) if m: # Store in data_source only CORE_CELLPHONEDB_DATA (i.e. exclude any version information) # Here we just need to know that this complex was added by the CellphoneDB team data_source = m[0] if participants_set not in participants_set_to_complex_names: participants_set_to_complex_names[participants_set] = [complex_name] participants_set_to_data_sources[participants_set] = set([data_source]) else: participants_set_to_complex_names[participants_set].append(complex_name) participants_set_to_data_sources[participants_set].add(data_source) complex_dups = "" for participants_set in participants_set_to_complex_names: data_sources = list(participants_set_to_data_sources[participants_set]) if (len(data_sources) > 1 or not data_sources or data_sources[0] != CORE_CELLPHONEDB_DATA): complex_names = participants_set_to_complex_names[participants_set] if len(complex_names) > 1: complex_dups += ", ".join(complex_names) + " : " + ", ".join(participants_set) + "\n" if len(complex_dups) > 0: # data_errors_found = True print("WARNING: The following multiple complexes (left) appear to have the same composition (right):") print(complex_dups) def sanity_test_report_interactions_with_same_participants(interaction_db_df: pd.DataFrame): partner_sets = [set([i for i in row]) for row in interaction_db_df[['partner_a', 'partner_b']].itertuples(index=False)] # Find duplicate sets of partners seen = set() duplicate_partner_sets = [x for x in partner_sets if x in seen or seen.add(frozenset(x))] if duplicate_partner_sets: # data_errors_found = True print("WARNING: The following sets of interaction partners appear in multiple rows of interaction_input.csv file:") for dup in set([frozenset(x) for x in duplicate_partner_sets]): print(','.join(dup)) print() def sanity_test_report_uniprot_accession_duplicates(protein_db_df: pd.DataFrame): test_protein_db_df = protein_db_df.set_index('uniprot') if test_protein_db_df.index.has_duplicates: print("WARNING: protein_input.csv has the following UniProt accession duplicates:") print("\n".join(protein_db_df[test_protein_db_df.index.duplicated(keep='first')]['uniprot'].tolist()) + "\n") def sanity_test_report_orphan_complexes(complex_db_df: pd.DataFrame, interaction_db_df: pd.DataFrame) -> set: all_complexes_set = set(complex_db_df['complex_name'].tolist()) interaction_participants_set = set(interaction_db_df['partner_a'].tolist() + interaction_db_df['partner_b'].tolist()) orphan_complexes = all_complexes_set - interaction_participants_set if orphan_complexes: print("WARNING: The following complexes are not found in interaction_input.txt:") print("\n".join(orphan_complexes)) print() return orphan_complexes def sanity_test_report_orphan_proteins( protein_db_df: pd.DataFrame, complex_db_df: pd.DataFrame, interaction_db_df: pd.DataFrame, protein_column_names: list, orphan_complexes: set): all_proteins_set = set(protein_db_df['uniprot'].tolist()) interaction_participants_set = set( interaction_db_df['partner_a'].tolist() + interaction_db_df['partner_b'].tolist()) proteins_in_complexes_participating_in_interactions = [] for colName in protein_column_names: proteins_in_complexes_participating_in_interactions += \ complex_db_df[~complex_db_df['complex_name'].isin(orphan_complexes)][colName].tolist() orphan_proteins = all_proteins_set - interaction_participants_set - \ set(proteins_in_complexes_participating_in_interactions) if orphan_proteins: print("WARNING: The following proteins are not found in interaction_input.txt (either directly " + "or via complexes they are part of):") print("\n".join(orphan_proteins)) def sanity_test_report_unknown_interactors( protein_db_df: pd.DataFrame, complex_db_df: pd.DataFrame, interaction_db_df: pd.DataFrame ): unknown_interactors = set() for col in ['partner_a', 'partner_b']: aux_df = pd.merge(interaction_db_df, protein_db_df, left_on=col, right_on='uniprot', how='outer') unknown_interactor_proteins = set(aux_df[pd.isnull(aux_df['uniprot'])][col].tolist()) aux_df = pd.merge(interaction_db_df, complex_db_df, left_on=col, right_on='complex_name', how='outer') unknown_interactor_complexes = set(aux_df[pd.isnull(aux_df['complex_name'])][col].tolist()) unknown_interactors = unknown_interactors.union( unknown_interactor_proteins.intersection(unknown_interactor_complexes)) if unknown_interactors: print("WARNING: The following interactors in interaction_input.txt could not be found in either " + "protein_input.csv or complex_indput.csv:") print("\n".join(sorted(unknown_interactors)) + "\n") def sanity_test_report_unknown_proteins( protein_db_df: pd.DataFrame, complex_db_df: pd.DataFrame, protein_column_names: list): unknown_proteins = set() for col in protein_column_names: aux_df = pd.merge(complex_db_df, protein_db_df, left_on=col, right_on='uniprot', how='outer') unknown_complex_proteins = set(aux_df[pd.isnull(aux_df['uniprot']) & ~pd.isnull(aux_df[col])][col].tolist()) unknown_proteins = unknown_proteins.union(unknown_complex_proteins) if unknown_proteins: print("WARNING: The following proteins in complex_input.txt could not be found in protein_input.csv:") print("\n".join(sorted(unknown_proteins)) + "\n") def sanity_test_report_proteins_not_in_genes_file( protein_db_df: pd.DataFrame, gene_db_df: pd.DataFrame): proteins = set(protein_db_df['uniprot'].tolist()) unknown_proteins = proteins.difference(set(gene_db_df['uniprot'].tolist())) if unknown_proteins: print("WARNING: The following proteins in protein_input.txt could not be found in gene_input.csv:") print("\n".join(sorted(unknown_proteins)) + "\n") print() def sanity_test_report_tfs_not_in_gene_or_complex_files( gene_db_df: pd.DataFrame, complex_db_df: pd.DataFrame, tf_input_df: pd.DataFrame): if tf_input_df is not None: # Cater for DB version-dependent input files for (bioentity, df) in {"gene": gene_db_df, "complex": complex_db_df}.items(): if bioentity == "gene": complex_filter = ~tf_input_df['receptor_id'].str.match('.*_.*') else: complex_filter = tf_input_df['receptor_id'].str.match('.*_.*') bioentities_in_tf_input = set([i.strip() for i in tf_input_df[complex_filter]['receptor_id'].values.tolist()]) bioentities_in_input = set(df['{}_name'.format(bioentity)].values.tolist()) # Below: bioentities in bioentities_in_tf_input but not in bioentities_in_input bioentities_not_in_input = bioentities_in_tf_input.difference(bioentities_in_input) if bioentities_not_in_input: print("WARNING: The following receptors in transcription_factor_input could not be found in " + "{}_input.csv:".format(bioentity)) print("\n".join(set(bioentities_not_in_input))) print() def run_sanity_tests(data_dfs, protein_column_names, version): data_errors_found = False protein_db_df = data_dfs['protein_input'] complex_db_df = data_dfs['complex_input'] gene_db_df = data_dfs['gene_input'] interaction_db_df = data_dfs['interaction_input'] tf_input_df = None # Cater for DB version-dependent input files if 'transcription_factor_input' in data_dfs: tf_input_df = data_dfs['transcription_factor_input'] # 1. Report any uniprot accessions that map to multiple gene_names sanity_test_uniprot_accessions_map_to_multiple_gene_names(gene_db_df) # 2. Warn about complex name duplicates in complex_db_df sanity_test_report_complex_duplicates(complex_db_df) # 3. Report complexes with (possibly) different names, but with the same uniprot # accession participants (though not necessarily in the same order - hence the use of set below) # NB. Use set below as we don't care about the order of participants when looking for duplicates # NB. Report duplicate complexes _only if_ at least one duplicate's complex_db_df['version'] # does not start with CORE_CELLPHONEDB_DATA) sanity_test_report_complexes_with_same_participants(complex_db_df, version, protein_column_names) # 4. Report interactions with (possibly) a different name, but with the same participants # (though not necessarily in the same order - hence the use of set below) sanity_test_report_interactions_with_same_participants(interaction_db_df) # 5. Warn about uniprot accession duplicates in protein_db_df sanity_test_report_uniprot_accession_duplicates(protein_db_df) # 6. Warn the user if some complexes don't participate in any interactions orphan_complexes = sanity_test_report_orphan_complexes(complex_db_df, interaction_db_df) # 7. Warn the user if some proteins don't participate in any interactions directly, # or are part of some complex in orphan_complexes sanity_test_report_orphan_proteins(protein_db_df, complex_db_df, interaction_db_df, protein_column_names, orphan_complexes) # 8. Warn the user if some interactions contain interactors that are neither # in complex_input.csv or protein_input.csv sanity_test_report_unknown_interactors(protein_db_df, complex_db_df, interaction_db_df) # 9. Warn if some complexes contain proteins not in protein_input.csv sanity_test_report_unknown_proteins(protein_db_df, complex_db_df, protein_column_names) # 10. Warn if some proteins in protein_input.csv are not in gene_input.csv sanity_test_report_proteins_not_in_genes_file(protein_db_df, gene_db_df) # 11. Warn if some receptor ids in tf_input_df are in neither gene_input.csv or complex_input.csv sanity_test_report_tfs_not_in_gene_or_complex_files(gene_db_df, complex_db_df, tf_input_df) if data_errors_found: raise DatabaseCreationException