Source code for tacco.preprocessing._platform
import numpy as np
import pandas as pd
import anndata as ad
import scanpy as sc
import scipy.sparse
import scipy.optimize
from numba import njit,prange
from numpy.random import Generator, PCG64
from .. import get
from . _reference import construct_reference_profiles
from .. import utils
from ..utils._split import split_beads
from ..utils._utils import _infer_annotation_key, _get_unique_keys
[docs]
def subsample_annotation(
adata,
annotation_key=None,
modification=None,
range_factor=1,
seed=42,
):
"""\
Subsamples the observations to change the annotation fractions.
Parameters
----------
adata
An :class:`~anndata.AnnData` containing categorical annotation to
subsample.
annotation_key
The `.obs` key with the categorical annotation. Is determined
automatically if possible.
modification
A dict-like mapping the annotation categories to a float giving the
fraction of this category to be kept. Unlisted categories are
unchanged. Instead of the full name of a category, also unambiguous
shortcuts are possible, i.e. the first few letters.
range_factor
A float giving a factor to scale down all categories a priori. A
factor larger than `1` (e.g. `2`) reduces all category observation and
makes values larger than `1` in `modification` possible without
duplicated observations.
seed
The random seed to use
Returns
-------
Returns a :class:`~pandas.Series` with the random recaling factors.
"""
annotation_key = _infer_annotation_key(adata, annotation_key)
if annotation_key not in adata.obs.columns:
raise Exception('Only categorical `.obs` annotation columns are supported for subsampling!')
if modification is None:
modification = {}
params = pd.DataFrame({'modification':pd.Series(modification)})
counts = pd.DataFrame({'all':adata.obs[annotation_key].value_counts()})
#print(pd.Series({s:[ p for p in params.index if s.startswith(p)] for s in counts.index}))
params['map'] = [[ s for s in counts.index if str(s) == str(p)] for p in params.index]
params['map_short'] = [[ s for s in counts.index if str(s).startswith(str(p))] for p in params.index]
params['map'] = [ m if len(m) == 1 else s for m, s in zip(params['map'],params['map_short'])]
if (params['map'].map(len) > 1).any():
params['error'] = params['map'].map(len) > 1
raise Exception('There is a not unique mapping of shortcuts to labels! Here is what has been mapped:\n%s' % params)
if (params['map'].map(len) == 0).any():
params['error'] = params['map'].map(len) == 0
raise Exception('There is are shortcuts without mapping labels! Here is what has been mapped:\n%s' % params)
if len(params['map']) > 0:
params['map'] = params['map'].str[0]
counts['modification'] = params.set_index('map')['modification']
counts['modification'] = counts['modification'].fillna(1.0)
counts['new'] = ((counts['all'] * counts['modification']) / range_factor).astype(int)
selection = np.concatenate([
utils.complete_choice(df.index, counts.loc[l,'new'], seed=seed)
for l,df in adata.obs.groupby(annotation_key)
])
return adata[selection]
def _scale_genes(adata, rescaling_factors, gene_keys=None, counts_location=None, round=False):
counts = get.counts(adata, counts_location=counts_location)
try:
rescaling_factors = rescaling_factors.reindex(counts.var.index).fillna(0).to_numpy() # if we get a pd.Series, then reorder it accordingly
except:
pass
utils.scale_counts(adata, rescaling_factors, counts_location=counts_location, round=round)
if isinstance(gene_keys, bool):
if gene_keys:
for gene_key in adata.varm:
try: # all varms
adata.varm[gene_key] *= rescaling_factors.astype(adata.varm[gene_key].iloc[0].dtype)[:,None]
except:
pass
for gene_key in adata.var:
try: # all vars
adata.var[gene_key] *= rescaling_factors.astype(adata.var[gene_key].iloc[0].dtype)[:,None]
except:
pass
elif gene_keys is not None:
if isinstance(gene_keys, str):
gene_keys = [gene_keys]
for gene_key in gene_keys:
if gene_key in adata.varm:
adata.varm[gene_key] *= rescaling_factors.astype(adata.varm[gene_key].iloc[0].dtype)[:,None]
for gene_key in gene_keys:
if gene_key in adata.var.columns:
adata.var[gene_key] *= rescaling_factors.astype(adata.var[gene_key].dtype)[:,None]
def _get_random_platform_factors(adata, platform_log10_mean=0, platform_log10_std=0, seed=42):
rg = Generator(PCG64(seed=seed))
rescaling_factors = np.power(10,rg.laplace(loc=platform_log10_mean,scale=platform_log10_std/np.sqrt(2),size=len(adata.var.index)))
return pd.Series(rescaling_factors, index=adata.var.index)
[docs]
def apply_random_platform_effect(
adata,
platform_log10_mean=0,
platform_log10_std=0.6,
seed=42,
round=True,
counts_location=None,
gene_keys=None,
):
"""\
Applies a random log-laplace distributed rescaling per gene inplace.
Parameters
----------
adata
An :class:`~anndata.AnnData` containing the expression and/or profiles
to rescale inplace.
platform_log10_mean
log10 mean of the Laplace distribution
platform_log10_std
log10 of the standard deviation of the Laplace distribution
seed
The random seed to use
round
Whether to round the resulting expression matrix to integer counts
after rescaling
counts_location
A string or tuple specifying where the count matrix is stored, e.g.
`'X'`, `('raw','X')`, `('raw','obsm','my_counts_key')`,
`('layer','my_counts_key')`, ... For details see
:func:`~tacco.get.counts`.
gene_keys
String or list of strings specifying additional count-like `.var` and
`.varm` annotations to scale along with the platform effect. If `True`,
take all `.var` and `.varm` keys.
Returns
-------
Returns a :class:`~pandas.Series` with the random recaling factors.
"""
rescaling_factors = _get_random_platform_factors(adata, platform_log10_mean=platform_log10_mean, platform_log10_std=platform_log10_std, seed=seed)
_scale_genes(adata, rescaling_factors.to_numpy(), gene_keys=gene_keys, counts_location=counts_location, round=round)
if round and scipy.sparse.issparse(adata.X):
adata.X.eliminate_zeros()
return rescaling_factors
@njit(parallel=True, fastmath=True, cache=True)
def _platform_objective_fdf(x, profilesA, profilesB, reg_a, reg_b):
n,m = profilesA.shape
a = x[:m]
b = x[m:]
der = np.zeros(m+n)
dera = der[:m]
derb = der[m:]
loss = 0.0
for i in prange(n):
_tempB = 0.0
for j in range(m):
delta = b[i] * profilesA[i,j] * a[j] - profilesB[i,j]
loss += delta**2
dera[j] += 2 * delta * b[i] * profilesA[i,j]
derb[i] += 2 * delta * profilesA[i,j] * a[j]
if reg_a > 0:
reg_a = reg_a * n # make the terms comparable
for j in range(m):
loss += reg_a * (a[j]-1)**2
dera[j] += reg_a * 2 * (a[j]-1)
if reg_b > 0:
reg_b = reg_b * m # make the terms comparable
for i in range(n):
loss += reg_b * (b[i]-1)**2
derb[i] += reg_b * 2 * (b[i]-1)
return loss, der
def get_platform_normalization_factors(
PA,
PB,
reg_a=0.0,
reg_b=0.0,
tol=1e-12,
):
"""\
Find platform normalization factors `a_g` per gene and `b_p` per profiles
for scaling A to conform to B by solving `PB_pg = b_p PA_pg a_g` in least
squares sense with `PA` and `PB` containing counts per profile and gene.
The profiles can be e.g. celltypes or mixtures with defined celltype
composition. `a_g` and `b_p` can be interpreted as relative capture
efficiencies between two methods per gene and per profile (e.g. per cell
type). This assumes that the underlying cell type frequencies are identical
between the two sets of profiles.
Parameters
----------
PA
A 2d :class:`~numpy.ndarray` containing counts with profiles in the
first dimension and genes in the second.
PB
A 2d :class:`~numpy.ndarray` with identical dimensions as as `PA`.
reg_a
Weight of the regularization term `(a_g-1)^2` to put cost on deviations
from 1 for the per gene rescaling factors
reg_b
Weight of the regularization term `(b_p-1)^2` to put cost on deviations
from 1 for the per profiles rescaling factors
tol
Solver tolerance.
Returns
-------
A :class:`~numpy.ndarray` containing the results.
"""
if PA.shape != PB.shape:
raise ValueError(f'PA.shape != PB.shape: {PA.shape} != {PB.shape}')
if not isinstance(PA, np.ndarray):
raise ValueError(f'PA has to be a numpy ndarray, but is a {PA.__class__}!')
if not isinstance(PB, np.ndarray):
raise ValueError(f'PB has to be a numpy ndarray, but is a {PB.__class__}!')
# scipy only works with doubles and the numba kernel only works with homogeneous types
if PA.dtype != np.float64:
PA = PA.astype(np.float64)
if PB.dtype != np.float64:
PB = PB.astype(np.float64)
PA = PA.copy()
PB = PB.copy()
n,m = PA.shape
guess = np.ones(m+n)
guess[:m] = PB.sum(axis=0) / PA.sum(axis=0)
guess[m:] = 1
result = scipy.optimize.minimize(_platform_objective_fdf, guess, args=(PA, PB, reg_a, reg_b), method='L-BFGS-B', jac=True, tol=tol, bounds=[(0,np.inf)]*len(guess)).x
# fix gauge degree of freedom
factor = result[m:].mean()
result[m:] /= factor
result[:m] *= factor
return result[:m], result[m:]
[docs]
def normalize_platform(
adata,
reference,
annotation_key=None,
reference_annotation_key=None,
counts_location=None,
gene_keys=None,
inplace=True,
return_rescaling_factors=False,
reg_a=0.0,
reg_b=0.0,
tol=1e-12,
verbose=1,
):
"""\
Normalize the expression data and/or profiles in an
:class:`~anndata.AnnData` to conform with the expression in another
dataset. In effect rescales all genes.
Parameters
----------
adata
An :class:`~anndata.AnnData` containing expression, annotation,
and/or profiles. This data will be rescaled.
reference
Another :class:`~anndata.AnnData` containing expression, annotation,
and/or profiles to be used as reference.
annotation_key
The `.obs`/`.obsm`/`.varm` annotation key for `adata`, or alternatively
annotation fractions for `adata`, which have to correspond to the
profiles given in `reference.varm[reference_annotation_key]`. If
`None`, no annotation information is used and the overall expression
determines the rescaling.
reference_annotation_key
The `.obs`/`.obsm`/`.varm` annotation key for `reference`, or
alternatively annotation fractions for `reference`, which have to
correspond to the profiles given in `adata.varm[annotation_key]`. If
`None`, no annotation information is used and the overall expression
determines the rescaling.
counts_location
A string or tuple specifying where the count matrix is stored, e.g.
`'X'`, `('raw','X')`, `('raw','obsm','my_counts_key')`,
`('layer','my_counts_key')`, ... For details see
:func:`~tacco.get.counts`.
gene_keys
String or list of strings specifying additional count-like `.var` and
`.varm` annotations to scale along with the platform normalization. The
`annotation_key` is included automatically. Note that the any
normalization is generally destroyed for these keys and they have to be
renormalized if necessary. If `True`, take all `.var` and `.varm` keys.
inplace
Whether to update the input :class:`~anndata.AnnData` or return a copy.
return_rescaling_factors
Whether to return the rescaling factors instead of the rescaled data.
reg_a
Weight of per gene rescaling factor regularization
reg_b
Weight of per profile rescaling factor regularization
tol
Solver tolerance.
verbose
Level of verbosity, with `0` (no output), `1` (some output), ...
Returns
-------
Returns an :class:`~anndata.AnnData` containing the rescaled `adata`,\
depending on `inplace` either as copy or as a reference to the original\
`adata`. If `return_rescaling_factors`, no rescalng is performed and the\
factors to do so are returned instead.
"""
if not inplace:
adata = adata.copy()
if isinstance(gene_keys, bool) and gene_keys:
pass # gene_keys == True means take all .var and .varm keys
else:
gene_keys = _get_unique_keys(annotation_key, gene_keys)
got_adata = annotation_key in adata.obs or annotation_key in adata.obsm or annotation_key in adata.varm
got_other = reference_annotation_key in reference.obs or reference_annotation_key in reference.obsm or reference_annotation_key in reference.varm
if got_adata and got_other:
profilesA = None
profilesB = None
if annotation_key in adata.obs: # construct counts-weighted profiles from expression and annotation
#print('.obs')
dums = pd.get_dummies(adata.obs[annotation_key],dtype=adata.X.dtype)
profilesA = adata.X.T @ dums.to_numpy()
profilesA = pd.DataFrame(profilesA, index=adata.var.index, columns=dums.columns)
if profilesA is None and annotation_key in adata.varm: # reconstruct counts-weighted profiles from buffered (normalized) profiles
#print('.varm')
weights = utils.parallel_nnls(adata.varm[annotation_key].to_numpy(),(utils.get_sum(adata.X,axis=0)[None,:]))
profilesA = adata.varm[annotation_key].to_numpy()
utils.col_scale(profilesA, weights.flatten())
profilesA = pd.DataFrame(profilesA, index=adata.var.index, columns=adata.varm[annotation_key].columns)
if reference_annotation_key in reference.obs: # construct counts-weighted profiles from expression and annotation
#print('other .obs')
dums = pd.get_dummies(reference.obs[reference_annotation_key],dtype=reference.X.dtype)
profilesB = reference.X.T @ dums.to_numpy()
profilesB = pd.DataFrame(profilesB, index=reference.var.index, columns=dums.columns)
if profilesB is None and reference_annotation_key in reference.varm: # reconstruct counts-weighted profiles from buffered (normalized) profiles
#print('other .varm')
weights = utils.parallel_nnls(reference.varm[reference_annotation_key].to_numpy(),(utils.get_sum(reference.X,axis=0)[None,:]))
profilesB = reference.varm[reference_annotation_key].to_numpy()
utils.col_scale(profilesB, weights.flatten())
profilesB = pd.DataFrame(profilesB, index=reference.var.index, columns=reference.varm[reference_annotation_key].columns)
if profilesA is None and annotation_key in adata.obsm:
if profilesB is not None: # reconstruct counts-weighted profiles from bead splitting with reference profiles
#print('.obsm1')
profilesA = split_beads(adata, bead_type_map=adata.obsm[annotation_key], type_profiles=profilesB, scaling_jobs=None, return_split_profiles=True)
elif reference_annotation_key in reference.varm: # reconstruct counts-weighted profiles from bead splitting with buffered reference profiles
#print('.obsm2')
profilesA = split_beads(adata, bead_type_map=adata.obsm[annotation_key], type_profiles=reference.varm[reference_annotation_key], scaling_jobs=None, return_split_profiles=True)
if profilesB is None and reference_annotation_key in reference.obsm:
if profilesA is not None: # reconstruct counts-weighted profiles from bead splitting with reference profiles
#print('other .obsm1')
profilesB = split_beads(reference, bead_type_map=reference.obsm[reference_annotation_key], type_profiles=profilesA, scaling_jobs=None, return_split_profiles=True)
elif annotation_key in adata.varm: # reconstruct counts-weighted profiles from bead splitting with buffered reference profiles
#print('other .obsm2')
profilesB = split_beads(reference, bead_type_map=reference.obsm[reference_annotation_key], type_profiles=adata.varm[annotation_key], scaling_jobs=None, return_split_profiles=True)
if profilesA is None or profilesB is None:
raise ValueError(f'The annotation information is not sufficient to run platform normalization with annotation! You can specify explicitly not to use annotation by setting `annotation_key` to `None`.')
else:
#print('.X')
profilesA = utils.get_sum(adata.X, axis=0)[:,None]
profilesB = utils.get_sum(reference.X, axis=0)[:,None]
if isinstance(profilesA, pd.DataFrame) and isinstance(profilesB, pd.DataFrame):
# make sure that both are ordered identically
profilesA = profilesA.reindex_like(profilesB)
if isinstance(profilesA, pd.DataFrame):
profilesA = profilesA.to_numpy()
if isinstance(profilesB, pd.DataFrame):
profilesB = profilesB.to_numpy()
gene_rescaling_factor = get_platform_normalization_factors(profilesA.T, profilesB.T, reg_a=reg_a, reg_b=reg_b, tol=tol)[0]
if verbose > 0:
print('mean,std( rescaling(gene) ) ', gene_rescaling_factor.mean(), gene_rescaling_factor.std())
if return_rescaling_factors:
return gene_rescaling_factor
_scale_genes(adata, gene_rescaling_factor, gene_keys=gene_keys, counts_location=counts_location)
return adata