Source code for dcor._rowwise

"""
Functions to compute a pairwise dependency measure.
"""
from __future__ import annotations

from typing import Any, Callable, TypeVar

import numpy as np

from . import _dcor
from ._fast_dcov_avl import _rowwise_distance_covariance_sqr_avl_generic
from ._utils import ArrayType, RowwiseMode as RowwiseMode, _sqrt, array_namespace

Array = TypeVar("Array", bound=ArrayType)


def _generate_rowwise_distance_covariance_sqr(unbiased):
    def rowwise_distance_covariance_sqr(
            x, y, exponent=1, *,
            method=_dcor.DistanceCovarianceMethod.AUTO,
            **kwargs):

        if not _dcor._can_use_fast_algorithm(x[0], y[0],
                                             exponent=exponent):
            return NotImplemented

        if (method in (_dcor.DistanceCovarianceMethod.AUTO,
                       _dcor.DistanceCovarianceMethod.AVL)):
            return _rowwise_distance_covariance_sqr_avl_generic(
                x, y, unbiased=unbiased, **kwargs)
        else:
            return NotImplemented

    return rowwise_distance_covariance_sqr


_dcor.distance_covariance_sqr.rowwise_function = (
    _generate_rowwise_distance_covariance_sqr(unbiased=False))

_dcor.u_distance_covariance_sqr.rowwise_function = (
    _generate_rowwise_distance_covariance_sqr(unbiased=True))


def _rowwise_distance_covariance(*args, **kwargs):

    res_covs = _dcor.distance_covariance_sqr.rowwise_function(*args, **kwargs)
    if res_covs is NotImplemented:
        return NotImplemented

    return _sqrt(res_covs)


_dcor.distance_covariance.rowwise_function = _rowwise_distance_covariance


def _generate_rowwise_distance_correlation_sqr(unbiased):
    def rowwise_distance_correlation_sqr(x, y, **kwargs):

        cov_fun = (_dcor.u_distance_covariance_sqr if unbiased
                   else _dcor.distance_covariance_sqr)

        n_comps = len(x)

        concat_x = np.concatenate((x, x, y))
        concat_y = np.concatenate((y, x, y))

        res_covs = cov_fun.rowwise_function(concat_x, concat_y, **kwargs)
        if res_covs is NotImplemented:
            return NotImplemented

        cov = res_covs[:n_comps]
        x_std = _sqrt(res_covs[n_comps:2 * n_comps])
        y_std = _sqrt(res_covs[2 * n_comps:])

        with np.errstate(divide='ignore', invalid='ignore'):
            corr_sqr = cov / x_std / y_std

        corr_sqr[np.isnan(corr_sqr)] = 0

        return corr_sqr

    return rowwise_distance_correlation_sqr


_dcor.distance_correlation_sqr.rowwise_function = (
    _generate_rowwise_distance_correlation_sqr(unbiased=False))

_dcor.u_distance_correlation_sqr.rowwise_function = (
    _generate_rowwise_distance_correlation_sqr(unbiased=True))


def _rowwise_distance_correlation(*args, **kwargs):

    res_corrs = _dcor.distance_correlation_sqr.rowwise_function(
        *args, **kwargs)
    if res_corrs is NotImplemented:
        return NotImplemented

    return _sqrt(res_corrs)


_dcor.distance_correlation.rowwise_function = _rowwise_distance_correlation


[docs]def rowwise( function: Callable[..., Array], x: Array, y: Array, *, rowwise_mode: RowwiseMode = RowwiseMode.AUTO, **kwargs: Any, ) -> Array: """ Compute a dependency measure between pairs of elements. It will use an optimized implementation if one is available. Parameters: function: Dependency measure function. x: First list of random vectors. The columns of each vector correspond with the individual random variables while the rows are individual instances of the random vector. y: Second list of random vectors. The columns of each vector correspond with the individual random variables while the rows are individual instances of the random vector. rowwise_mode: Mode of rowwise computations. kwargs: Additional options necessary. Returns: A length :math:`n` vector where the :math:`i`-th entry is the dependency between :math:`x[i]` and :math:`y[i]`. Examples: >>> import numpy as np >>> import dcor The following example shows two computations of distance covariance between random variables. This has an optimized implementation using multiple cores if available. >>> a = [np.array([1., 2., 3., 4., 5. ,6.]), ... np.array([7., 8., 9., 10., 11., 12.]) ... ] >>> b = [np.array([1., 4., 9., 16., 25., 36.]), ... np.array([1., 3., 6., 8., 10., 12.]) ... ] >>> dcor.rowwise(dcor.distance_covariance, a, b) array([3.45652005, 1.95789002]) The following example shows two computations of distance correlation between random vectors of length 2. Currently there is no optimized implementation for the random vector case, so it will be equivalent to calling map. >>> a = [np.array([[1., 1.], ... [2., 4.], ... [3., 8.], ... [4., 16.]]), ... np.array([[9., 10.], ... [11., 12.], ... [13., 14.], ... [15., 16.]]) ... ] >>> b = [np.array([[0., 1.], ... [3., 1.], ... [6., 2.], ... [9., 3.]]), ... np.array([[5., 1.], ... [8., 1.], ... [13., 1.], ... [21., 1.]]) ... ] >>> dcor.rowwise(dcor.distance_correlation, a, b) array([0.98182263, 0.98320103]) """ if rowwise_mode is not RowwiseMode.NAIVE: rowwise_function = getattr(function, 'rowwise_function', None) if rowwise_function: result = rowwise_function(x, y, **kwargs) if result is not NotImplemented: return result if rowwise_mode is RowwiseMode.OPTIMIZED: raise NotImplementedError( "There is not an optimized rowwise implementation") xp = array_namespace(x, y) return xp.asarray( [function(x_elem, y_elem, **kwargs) for x_elem, y_elem in zip(x, y)], )