NPAtlas analysis notebook
path = "../../notebooks-output/npatlas/npatlas.json"
import json
with open(path, 'r') as f:
objs = json.load(f)
len(objs)
assert all("smiles" in obj for obj in objs)
distinct_structures = set(obj["smiles"] for obj in objs)
len(distinct_structures)
import re
from copy import copy
def flatten_entry(entry: dict):
flattened = copy(entry)
#flattened["classyfire_class_name"] = entry["classyfire"]["class"]["name"]
cf = entry.get("classyfire", {})
if not cf:
cf = {}
if True:
cf_class = cf.get("class", {})
if cf_class:
flattened["classyfire_class_name"] = cf_class.get("name")
cf_subclass = cf.get("subclass", {})
if cf_subclass:
flattened["classyfire_subclass_name"] = cf_subclass.get("name")
cf_superclass = cf.get("superclass", {})
if cf_superclass:
flattened["classyfire_superclass_name"] = cf_superclass.get("name")
chebi_terms = cf.get("predicted_chebi_terms", [])
entry["chebi_terms_flat"] = "; ".join(chebi_terms)
chebi_term_ix = {}
for t in chebi_terms:
for k, v in re.findall(r"(.*) \((CHEBI:\d+)\)", t):
chebi_term_ix[k] = v
entry["predicted_chebi_index"] = chebi_term_ix
entry["predicted_chebi_ids"] = tuple(chebi_term_ix.keys())
entry["predicted_chebi_labels"] = tuple(chebi_term_ix.values())
lipidmaps_terms = cf.get("predicted_lipidmaps_terms", [])
entry["lipidmaps_terms_flat"] = "; ".join(lipidmaps_terms)
lipidmaps_term_ix = {}
for t in lipidmaps_terms:
for k, v in re.findall(r"(.*) \((FA\d+)\)", t):
lipidmaps_term_ix[k] = v
entry["predicted_lipidmaps_index"] = lipidmaps_term_ix
entry["predicted_lipidmaps_ids"] = tuple(lipidmaps_term_ix.keys())
entry["predicted_lipidmaps_labels"] = tuple(lipidmaps_term_ix.values())
origin_organism = entry.get("origin_organism", {})
entry["origin_organism_species"] = f"{origin_organism['genus']} {origin_organism['species']}"
entry["origin_organism_genus"] = origin_organism['genus']
entry["origin_organism_type"] = origin_organism['type']
formula = entry.get("mol_formula")
# split formula string, e.g. "C10H12O5" -> {"C": 10, "H": 12, "O": 5}
if formula:
flattened.update({k: int(v) for k, v in re.findall(r"([A-Z][a-z]*)(\d+)", formula)})
oref = entry.get("origin_reference", {})
flattened["doi"] = oref.get("doi")
flattened["pmid"] = oref.get("pmid")
return flattened
flattened_entries = [flatten_entry(obj) for obj in objs]
len(flattened_entries)
entries_no_cf = [e for e in flattened_entries if "classyfire_class_name" not in e or not e["classyfire_class_name"]]
len(entries_no_cf)
len([e for e in flattened_entries if "classyfire_subclass_name" not in e or not e["classyfire_subclass_name"]])
import pandas as pd
entries_df = pd.DataFrame(flattened_entries)
entries_df
entries_df[entries_df["chebi_terms_flat"] == ""]
entries_df[entries_df["lipidmaps_terms_flat"] != ""]
entries_df[["C", "classyfire_class_name", "classyfire_subclass_name", "chebi_terms_flat"]].value_counts()
entries_df[["C", "classyfire_class_name", "classyfire_subclass_name", "chebi_terms_flat", "lipidmaps_terms_flat"]].value_counts()
entries_df[["C", "classyfire_class_name", "classyfire_subclass_name"]].value_counts()
entries_df[(entries_df["C"] > 20) & (entries_df["classyfire_subclass_name"] == "Sesquiterpenoids")][["C", "classyfire_class_name", "classyfire_subclass_name"]].value_counts()
entries_df[(entries_df["C"] >= 40) & (entries_df["doi"].notna()) & (entries_df["classyfire_subclass_name"] == "Sesquiterpenoids")][["C", "classyfire_subclass_name", "doi", "pmid", "original_name", "smiles"]]
C3P Classification
from c3p.classifier import Classifier
c3p_classifier = Classifier()
!mkdir -p npatlas
def classify_all(classifier, structures):
n = 0
results = []
for r in classifier.classify_iter(structures):
if r.is_match:
results.append(r)
n += 1
if n % 1000 == 0:
print(n)
print(len(results))
return results
from pathlib import Path
import pandas as pd
path = Path("../../notebooks-output/npatlas/c3p_results.csv")
if path.exists():
c3p_df = pd.read_csv(path)
else:
c3p_results = classify_all(c3p_classifier, distinct_structures)
c3p_df = pd.DataFrame( [r.model_dump() for r in c3p_results] )
c3p_df.to_csv(path)
c3p_df["class_name"].value_counts()
CHEBI classification
from pathlib import Path
import pandas as pd
path = Path("../../notebooks-output/npatlas/chebi_results.csv")
if path.exists():
chebi_df = pd.read_csv(path)
else:
from c3p.chebi_classifier import ChEBIClassifier
chebi_classifier = ChEBIClassifier()
chebi_results = classify_all(chebi_classifier, distinct_structures)
chebi_df = pd.DataFrame( [r.model_dump() for r in chebi_results] )
chebi_df.to_csv(path)
#chebi_results = classify_all(chebi_classifier, list(distinct_structures))
#len(chebi_results)
from c3p.clients.chebifier import ChebifierClient
chebifier = ChebifierClient()
chebifier_results = {}
if False:
for s in distinct_structures:
if s not in chebifier_results:
chebifier_results[s] = chebifier.classify(s)
len(chebifier_results) / len(distinct_structures)
#list(chebifier_results.values())[0]
chebifier_df = pd.DataFrame( [r.model_dump() for rs in chebifier_results.values() for r in rs] )
import pandas as pd
import matplotlib.pyplot as plt
def plot_class_distribution(df, column_name="class_name", title=None, min_count=1000, save_to=None):
# Count occurrences of each class
class_counts = df[column_name].value_counts()
# filter to be at least N
class_counts = class_counts[class_counts > min_count]
# Re-plot the bar chart with c3p_classes on y-axis (horizontal bar chart)
plt.figure(figsize=(8, 14))
class_counts.plot(kind='barh')
plt.ylabel(column_name)
plt.xlabel("Count")
plt.title(title)
if save_to:
plt.savefig(save_to, dpi=300, bbox_inches='tight')
plt.show()
plot_class_distribution(c3p_df, column_name="class_name", title="Distribution of c3p_classes in NPAtlas",
min_count=1400,
save_to="../../notebooks-output/npatlas/npatlas-summary.png")
#plt.savefig("../../notebooks-output/npatlas/summary.png", dpi=300, bbox_inches="tight")
#plot_class_distribution(chebifier_df, column_name="class_name", title="Distribution of chebifier_classes in NPAtlas")
plot_class_distribution(entries_df, column_name="classyfire_subclass_name", title="Distribution of classyfire_subclasses in NPAtlas", min_count=100)
plot_class_distribution(entries_df, column_name="classyfire_class_name", title="Distribution of classyfire_classes in NPAtlas", min_count=100)
plot_class_distribution(entries_df, column_name="classyfire_superclass_name", title="Distribution of classyfire_superclasses in NPAtlas", min_count=5)
def merge_classifications_df(entries_df, df, renamed_column="c3p_class_name"):
df2 = df.rename(columns={"class_name": renamed_column})
joined = df2.merge(entries_df, left_on="input_smiles", right_on="smiles")
# compare the two columns
return joined
merged_df = merge_classifications_df(entries_df, c3p_df)
merged_df
merged_df[["classyfire_subclass_name", "c3p_class_name"]].value_counts()
merged_df[["classyfire_class_name", "c3p_class_name"]].value_counts()
merged_df[["classyfire_superclass_name", "classyfire_class_name", "classyfire_subclass_name", "c3p_class_name"]].value_counts()
def make_correlation_df(df, column1, column2):
return merged_df[[column1, column2]].value_counts().reset_index(name="count")
correlation_df = make_correlation_df(merged_df, "classyfire_class_name", "c3p_class_name")
correlation_df
## correlation_df = merged_df.DataFrame([(cls1, cls2) for cls1, cls2 in class_c3p_mapping.items() for c3p in c3p_list], columns=["classes", "c3p_classes"])
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import chi2_contingency
def plot_correlation(df, col1, col2, N=20, M=20, mode='resid', title=None):
"""
Plots a heatmap for top N categories in col1 vs top M categories in col2,
allowing different cell-level statistics:
- 'count': raw observed counts (default)
- 'contri': cell-wise chi-square contribution
- 'resid': standardized residuals
Args:
df (pd.DataFrame): The input DataFrame.
col1 (str): Column name for x-axis (rows).
col2 (str): Column name for y-axis (columns).
N (int): Number of top categories to keep from col1.
M (int): Number of top categories to keep from col2.
mode (str): Which metric to display in heatmap cells:
'count', 'contri', or 'resid'.
title (str): Optional plot title.
"""
# 1. Identify top N categories in col1
top_n_categories_col1 = df[col1].value_counts().nlargest(N).index
# 2. Identify top M categories in col2
top_m_categories_col2 = df[col2].value_counts().nlargest(M).index
# 3. Filter the DataFrame
df_filtered = df[
df[col1].isin(top_n_categories_col1) &
df[col2].isin(top_m_categories_col2)
]
# 4. Create a contingency table
contingency_table = pd.crosstab(df_filtered[col1], df_filtered[col2])
# 5. Perform the chi-squared test once
chi2, p, dof, expected = chi2_contingency(contingency_table)
print(f"Overall chi2 = {chi2:.4f}, p-value = {p:.4e}, dof = {dof}")
# 6. Decide which cell-level statistic to plot
if mode == 'count':
# Raw observed counts
data_to_plot = contingency_table
fmt_str = 'd'
colorbar_label = 'Observed Count'
elif mode == 'contri':
# Cell-level chi-square contribution: (O-E)^2 / E
contrib = (contingency_table - expected) ** 2 / expected
data_to_plot = pd.DataFrame(contrib,
index=contingency_table.index,
columns=contingency_table.columns)
fmt_str = '.2f'
colorbar_label = 'Chi-square Contribution'
elif mode == 'resid':
# Standardized residual: (O-E)/sqrt(E)
resid = (contingency_table - expected) / np.sqrt(expected)
data_to_plot = pd.DataFrame(resid,
index=contingency_table.index,
columns=contingency_table.columns)
fmt_str = '.1f'
colorbar_label = 'Standardized Residual'
else:
raise ValueError("Invalid mode. Choose from ['count', 'contri', 'resid'].")
# 7. Plot the heatmap
plt.figure(figsize=(10, 8))
# Center=0 is often useful for 'contri' or 'resid' to visually highlight +/- deviance
center_val = 0 if mode in ('contri', 'resid') else None
sns.heatmap(
data_to_plot,
annot=True,
fmt=fmt_str,
cmap='coolwarm' if mode in ('contri','resid') else 'YlGnBu',
center=center_val
)
#plt.figure(figsize=(16, 16))
plt.title(title if title else f"Top {N} {col1} x Top {M} {col2} [{mode}]")
plt.xlabel(col2)
plt.ylabel(col1)
cbar = plt.gca().collections[0].colorbar
cbar.set_label(colorbar_label)
plt.tight_layout()
plt.show()
return data_to_plot,
corrs = plot_correlation(merged_df, "classyfire_class_name", "c3p_class_name", title="Correlation between classyfire and c3p classes")
import pandas as pd
import numpy as np
from scipy.stats import chi2_contingency
def top_significant_pairs(df, col1, col2, N=20, M=20, top_k=20):
"""
Return a DataFrame of the most positively significant (over-represented)
category pairs, based on standardized residuals from the chi-square test.
1. Filter to top N categories in col1 and top M categories in col2 by frequency.
2. Build contingency table.
3. Perform chi2_contingency to get expected counts.
4. Compute standardized residuals = (O - E) / sqrt(E).
5. Return a table of pairs sorted by descending standardized residual.
Args:
df (pd.DataFrame): input data
col1 (str): the name of the first categorical column
col2 (str): the name of the second categorical column
N (int): keep top N categories in col1
M (int): keep top M categories in col2
top_k (int): how many top pairs to return (sorted by largest positive standardized residual)
Returns:
pd.DataFrame: A DataFrame with columns:
[col1, col2, observed_count, expected_count, std_resid]
"""
# 1. Identify top N categories in col1
top_n_categories_col1 = df[col1].value_counts().nlargest(N).index
# 2. Identify top M categories in col2
top_m_categories_col2 = df[col2].value_counts().nlargest(M).index
# 3. Filter the DataFrame to only those categories
df_filtered = df[
df[col1].isin(top_n_categories_col1) &
df[col2].isin(top_m_categories_col2)
]
# 4. Create a contingency table (observed counts)
contingency_table = pd.crosstab(df_filtered[col1], df_filtered[col2])
# 5. Perform chi2_contingency to get expected counts
chi2, p, dof, expected = chi2_contingency(contingency_table)
print(f"Overall Chi-square: {chi2:.4f}, p-value: {p:.2e}, dof: {dof}")
# 6. Compute standardized residuals: (O - E) / sqrt(E)
observed = contingency_table.values
standardized_residuals = (observed - expected) / np.sqrt(expected)
# Flatten into a long-format DataFrame of pairs
rows = []
row_categories = contingency_table.index
col_categories = contingency_table.columns
for i, row_cat in enumerate(row_categories):
for j, col_cat in enumerate(col_categories):
obs = observed[i, j]
exp = expected[i, j]
sr = standardized_residuals[i, j]
rows.append({
col1: row_cat,
col2: col_cat,
"observed_count": obs,
"expected_count": exp,
"std_resid": sr
})
result_df = pd.DataFrame(rows)
# 7. Sort by descending standardized residual (show most over-represented pairs first)
result_df.sort_values("std_resid", ascending=False, inplace=True)
# Optionally take the top_k pairs
top_pairs = result_df.head(top_k).reset_index(drop=True)
return top_pairs
top_subclass_pairs = top_significant_pairs(merged_df, "classyfire_subclass_name", "c3p_class_name", N=100, M=100, top_k=100)
top_subclass_pairs.head(100).to_csv("../../notebooks-output/npatlas/top_subclass_pairs.csv", index=False)
top_subclass_pairs
top_pairs = top_significant_pairs(merged_df, "classyfire_class_name", "c3p_class_name", N=100, M=100, top_k=100)
top_pairs.head(100)
top_pairs.head(100).to_csv("../../notebooks-output/npatlas/top_pairs.csv", index=False)
top_pairs = top_significant_pairs(merged_df, "lipidmaps_terms_flat", "c3p_class_name", N=100, M=100, top_k=100)
top_pairs.head(100)
top_pairs = top_significant_pairs(merged_df, "origin_organism_species", "c3p_class_name", N=100, M=100, top_k=100)
top_pairs.head(100)
top_pairs = top_significant_pairs(merged_df, "origin_organism_genus", "c3p_class_name", N=100, M=100, top_k=100)
top_pairs.head(100)
top_pairs = top_significant_pairs(merged_df, "origin_organism_type", "c3p_class_name", N=100, M=100, top_k=100)
top_pairs.head(100)
melt_cols = ['classyfire_class_name', 'classyfire_subclass_name']
merged_df_melted = pd.melt(
merged_df,
id_vars=[col for col in merged_df.columns if col not in melt_cols],
value_vars=melt_cols,
var_name='classyfire_level',
value_name='classyfire_term'
)
# Clean up the classyfire_level values to remove the '_name' suffix
merged_df_melted['classyfire_level'] = merged_df_melted['classyfire_level'].str.replace('_name', '')
merged_df_melted
merged_df_melted.groupby(['classyfire_level']).size()
top_pairs = top_significant_pairs(merged_df_melted, "classyfire_term", "c3p_class_name", N=100, M=100, top_k=100)
top_pairs.to_csv("../../notebooks-output/npatlas/top_pairs.csv", index=False)
top_pairs.head(100)
top_pairs = top_significant_pairs(merged_df_melted, "classyfire_term", "lipidmaps_terms_flat", N=100, M=100, top_k=100)
top_pairs.head(100)
top_pairs = top_significant_pairs(merged_df_melted, "predicted_chebi_terms", "c3p_class_name", N=100, M=100, top_k=100)
#top_pairs.to_csv("../../notebooks-output/npatlas/top_pairs.csv", index=False)
top_pairs.head(100)