Source code for tacco.utils._math

import numpy as np
import pandas as pd
from scipy.sparse import issparse, csr_matrix, csc_matrix
from sklearn.utils.sparsefuncs import inplace_column_scale, inplace_row_scale
import numba
import mkl
from numba import njit, prange

@njit(fastmath=True, cache=True)
def _divide_serial(a,b,out):
    for i in prange(len(a)):
        out[i] = a[i] / b[i]

@njit(fastmath=True, parallel=True, cache=True)
def _divide_parallel(a,b,out):
    for i in prange(len(a)):
        out[i] = a[i] / b[i]

[docs] def divide( a, b, out=None, parallel=True, ): """\ Calculates division `out = a / b`. Parameters ---------- a A :class:`~numpy.ndarray` with at least 1 dimension b A :class:`~numpy.ndarray` with at least 1 dimension of the same shape as `a` out A :class:`~numpy.ndarray` with at least 1 dimension of the same shape as `a`. If `None`, the result is returned. Returns ------- A :class:`~numpy.ndarray` containing the result. """ if out is None: out = np.empty_like(a) assert(a.shape==b.shape) assert(a.shape==out.shape) if parallel: _divide_parallel(a,b,out) else: _divide_serial(a,b,out) return out
@njit(fastmath=True, parallel=True, cache=True) def _sparse_result_gemmT(A, B, out_row, out_col, out_data): n = A.shape[1] for rc in prange(len(out_row)): r = out_row[rc] c = out_col[rc] temp = 0.0 for k in range(n): temp += A[r,k] * B[c,k] out_data[rc] += temp @njit(fastmath=True, cache=True) def _sparse_result_gemmT_serial(A, B, out_row, out_col, out_data): n = A.shape[1] for rc in prange(len(out_row)): r = out_row[rc] c = out_col[rc] temp = 0.0 for k in range(n): temp += A[r,k] * B[c,k] out_data[rc] += temp
[docs] def sparse_result_gemmT( A, B, sparse_out, parallel=True, inplace=True, update_out=False, ): """\ Perform a dense matrix-matrix multiplication A @ B.T for the case when only a sparse subset of the result is needed. Note that the second matrix is transposed. Parameters ---------- A A 2d numpy array.. B A 2d numpy array with the same second dimension as `A`. sparse_out A `scipy.sparse` matrix to contain the result and to provide the sparsity structure of the result. parallel Whether to work on multiple cores. inplace Whether to write the result directly into `sparse_out` or return a copy. `inplace` is only possible if `sparse_out` is a :class:`~scipy.sparse.coo_matrix`. update_out Whether to add the result to the existing values of `sparse_out` or to overwrite them. Returns ------- Returns the :class:`~scipy.sparse.coo_matrix` containing the result. """ result_shape = (A.shape[0],B.shape[0]) inner_size = A.shape[1] if B.shape[1] != A.shape[1]: raise ValueError("`A` and `B` dont have the same second dimension! The shapes are %s and %s!" % (A.shape, B.shape)) if result_shape != sparse_out.shape: raise ValueError("The result shape implied by `A` and `B` %s dont fit to the one supplied in `sparse_out`! The corresponding shapes are %s and %s!" % (result_shape, sparse_out.shape)) _sparse_out = sparse_out.tocoo() if inplace and _sparse_out is not sparse_out: raise ValueError("`inplace` is `True` but the supplied sparse result matrix is not a coo matrix!") elif not inplace and _sparse_out is sparse_out: _sparse_out = _sparse_out.copy() if not update_out:[:] = 0 if parallel: _sparse_result_gemmT(A, B, _sparse_out.row, _sparse_out.col, else: _sparse_result_gemmT_serial(A, B, _sparse_out.row, _sparse_out.col, return _sparse_out
@njit(fastmath=True, parallel=True, cache=True) def _csrcsr_gemm_dense(n_row, n_col, Ap, Aj, Ax, Bp, Bj, Bx): C = np.zeros((n_row,n_col)) for i in prange(n_row): jj_start = Ap[i]; jj_end = Ap[i+1]; for jj in range(jj_start, jj_end): j = Aj[jj]; v = Ax[jj]; kk_start = Bp[j]; kk_end = Bp[j+1]; for kk in range(kk_start, kk_end): k = Bj[kk]; C[i,k] += v*Bx[kk]; return C @njit(fastmath=True, parallel=True, cache=True) def _densecsr_gemm_dense(n_row, n_col, A, Bp, Bj, Bx): nk = A.shape[1] C = np.zeros((n_row,n_col)) for i in prange(n_row): for j in range(nk): v = A[i,j]; kk_start = Bp[j]; kk_end = Bp[j+1]; for kk in range(kk_start, kk_end): k = Bj[kk]; C[i,k] += v*Bx[kk]; return C @njit(fastmath=True, parallel=True, cache=True) def _cscdense_gemm_dense(n_row, n_col, Ap, Ai, Ax, B): nk = B.shape[0] C = np.zeros((n_col,n_row)) for j in prange(n_col): for k in range(nk): v = B[k,j] ii_start = Ap[k]; ii_end = Ap[k+1]; for ii in range(ii_start, ii_end): i = Ai[ii]; C[j,i] += v*Ax[ii]; return C.T def cast_down_common(A,B): if not pd.api.types.is_float_dtype(A): A = A.astype(np.float64) if not pd.api.types.is_float_dtype(B): B = B.astype(np.float64) if A.dtype == B.dtype: return A,B elif A.dtype == np.float64 and B.dtype == np.float32: return A.astype(np.float32), B elif A.dtype == np.float32 and B.dtype == np.float64: return A, B.astype(np.float32) else: return A,B
[docs] def gemmT( A, B, parallel=True, sparse_result=False, ): """\ Perform a matrix-matrix multiplication A @ B.T for arbitrary sparseness of A and B in parallel. Uses `sparse_dot_mkl` if available. Parameters ---------- A A 2d :class:`~numpy.ndarray` or a `scipy` sparse matrix. B A 2d :class:`~numpy.ndarray` or a `scipy` sparse matrix with the same second dimension as `A`. parallel Whether to run the multiplication in parallel. sparse_result Whether to return a sparse result when both inputs are sparse Returns ------- Depending on `sparse_result` returns either a :class:`~numpy.ndarray` or a\ scipy sparse matrix containing the result. """ if not issparse(A) and not isinstance(A,np.ndarray): raise ValueError("`A` can only be a scipy sparse matrix or a numpy array!") if not issparse(B) and not isinstance(B,np.ndarray): raise ValueError("`B` can only be a scipy sparse matrix or a numpy array!") if issparse(A) and not isinstance(A, (csr_matrix, csc_matrix)): A = A.tocsr() if issparse(B) and not isinstance(B, (csr_matrix, csc_matrix)): B = B.tocsr() A,B = cast_down_common(A,B) numba_threads = numba.get_num_threads() mkl_threads = mkl.get_max_threads() if not parallel: numba.set_num_threads(1) mkl.set_num_threads(1) result_shape = (A.shape[0],B.shape[0]) inner_size = A.shape[1] if B.shape[1] != A.shape[1]: raise ValueError("`A` and `B` dont have the same second dimension! The shapes are %s and %s!" % (A.shape, B.shape)) try: # if mkl is available, use it. import sparse_dot_mkl return sparse_dot_mkl.dot_product_mkl(A, B.T, dense=(not sparse_result)); except ImportError: # use custom numba implementation print('sparse_dot_mkl is not found, so a (slower) fallback is used.') if issparse(A) and issparse(B): if not sparse_result: A = A.tocsr() BT = B.tocsc() # avoids intermediate during transposition: .tocsc() == .T.tocsr() return _csrcsr_gemm_dense(result_shape[0], result_shape[1], A.indptr, A.indices,, BT.indptr, BT.indices, else: return A@(B.T) elif issparse(A) and not issparse(B): A = A.tocsc() return _cscdense_gemm_dense(result_shape[0], result_shape[1], A.indptr, A.indices,, B.T) elif not issparse(A) and issparse(B): BT = B.tocsc() # avoids intermediate during transposition: .tocsc() == .T.tocsr() return _densecsr_gemm_dense(result_shape[0], result_shape[1], A, BT.indptr, BT.indices, else: return A@(B.T) if not parallel: numba.set_num_threads(numba_threads) mkl.set_num_threads(mkl_threads)
[docs] def row_scale( X, rescaling_factors, round=False, ): """\ Rescales rows of dense or sparse matrix inplace. Parameters ---------- X A 2d :class:`~numpy.ndarray` array or a `scipy` sparse matrix. rescaling_factors A 1d :class:`~numpy.ndarray` containing the row-wise rescaling factors. round Whether to round the result to integer values Returns ------- `None`. This is an inplace operation. """ if hasattr(rescaling_factors, 'to_numpy'): rescaling_factors = rescaling_factors.to_numpy() rescaling_factors = rescaling_factors.flatten() if issparse(X): inplace_row_scale(X, rescaling_factors) if round: np.around(, else: X *= rescaling_factors[:,None] if round: np.around(X, out=X)
[docs] def col_scale( X, rescaling_factors, round=False, ): """\ Rescales columns of dense or sparse matrix inplace. Parameters ---------- X A 2d :class:`~numpy.ndarray` array or a `scipy` sparse matrix. rescaling_factors A 1d :class:`~numpy.ndarray` containing the column-wise rescaling factors. round Whether to round the result to integer values Returns ------- `None`. This is an inplace operation. """ if not isinstance(X, np.ndarray) and not issparse(X): raise ValueError(f'`X` must be a numpy array or a scipy sparse matrix!') if hasattr(rescaling_factors, 'to_numpy'): rescaling_factors = rescaling_factors.to_numpy() rescaling_factors = rescaling_factors.flatten() if issparse(X): inplace_column_scale(X, rescaling_factors) if round: np.around(, else: X *= rescaling_factors if round: np.around(X, out=X)
@njit(fastmath=True, parallel=True, cache=True) def _log1p(x): for i in prange(len(x)): x[i] = np.log1p(x[i])
[docs] def log1p( X, ): """\ Calculates log1p inplace and in parallel. Parameters ---------- X A :class:`~numpy.ndarray` with more than 1 dimension, a `scipy` sparse matrix, or something which has an attribute `.X` which fits this description, e.g. an :class:`~anndata.AnnData` Returns ------- `None`. This is an inplace operation. """ if hasattr(X, 'X'): X = X.X if issparse(X): _log1p( else: _log1p(X)
@njit(fastmath=True, parallel=True, cache=True) def _log(x): for i in prange(len(x)): x[i] = np.log(x[i])
[docs] def log( X, ): """\ Calculates log inplace and in parallel. Parameters ---------- X A :class:`~numpy.ndarray` with more than 1 dimension, a `scipy` sparse matrix, or something which has an attribute `.X` which fits this description, e.g. an :class:`~anndata.AnnData` Returns ------- `None`. This is an inplace operation. """ if hasattr(X, 'X'): X = X.X if issparse(X): _log( else: _log(X)
@njit(fastmath=True, parallel=True, cache=True) def _sqrt(x): for i in prange(len(x)): x[i] = np.sqrt(x[i])
[docs] def sqrt( X, ): """\ Calculates sqrt inplace and in parallel. Parameters ---------- X A :class:`~numpy.ndarray` with more than 1 dimension, a `scipy` sparse matrix, or something which has an attribute `.X` which fits this description, e.g. an :class:`~anndata.AnnData` Returns ------- `None`. This is an inplace operation. """ if hasattr(X, 'X'): X = X.X if issparse(X): _sqrt( else: _sqrt(X)
def integrate_mean(y,x): # using trapezoidal rule heights = 0.5 * (y[1:] + y[:-1]) widths = x[1:] - x[:-1] ranges = x[1:] - x[0] means = (heights * widths).cumsum() / ranges # prepend start value to get same length result return np.concatenate([[y[0]],means])
[docs] def get_sum( X, axis, dtype=None, ): """\ Calculates the sum of a sparse matrix or array-like in a specified axis and returns a flattened result. Parameters ---------- X A 2d :class:`~numpy.ndarray` or a `scipy` sparse matrix. axis The axis along which to calculate the sum dtype The dtype used in the accumulators and the result. Returns ------- The flattened sums as 1d :class:`~numpy.ndarray`. """ if issparse(X): result = X.sum(axis=axis, dtype=dtype).A.flatten() else: result = X.sum(axis=axis, dtype=dtype) import anndata._core.views if isinstance(result, anndata._core.views.ArrayView): result = result.toarray() return result