Source code for skrough.homogeneity

"""Functions related to homogeneity/heterogeneity of decision tables."""

from typing import Tuple

import numba
import numpy as np
import numpy.typing as npt

import skrough.typing as rght
from skrough.structs.group_index import GroupIndex
from skrough.unique import get_uniques_and_compacted


[docs]@numba.njit def get_homogeneity( distribution: npt.NDArray[np.int64], ) -> npt.NDArray[np.int8]: """Compute distribution homogeneity. Compute homogeneity for a given input distribution. The function is mainly used for computation of homogeneity of decision attributes. The distribution format is defined as a 2D array where: - rows correspond to separate contexts, e.g., groups of objects or equivalence classes, - values in columns for a particular row represent discrete distribution, i.e., the number of occurrences of each possible decision attribute distinct value. The result is a sequence of integer values (``0`` or ``1``), where each corresponds to a group/context (row) in the ``distribution`` input. A value of ``1`` means that there is at most one non-zero value in a given row (meaning that a row is homogenous), ``0`` otherwise (non-homogenous). Args: distribution: A 2D array representing a distribution. Raises: ValueError: If ``distribution`` is not a two-dimensional array. Returns: An array consisting of integer values ``0`` or ``1`` indicating that a corresponding row in the ``distribution`` input argument is either non-homogenous (for ``0``) or homogenous (for ``1``). Examples: >>> get_homogeneity( ... np.asarray( ... [ ... [0, 0], ... [1, 1], ... [0, 3], ... [5, 0], ... ] ... ) ... ) array([1, 0, 1, 1]) """ if distribution.ndim != 2: raise ValueError("input `distribution` should be 2D") ngroup, ndec = distribution.shape result: npt.NDArray[np.int8] = np.ones(ngroup, dtype=np.int8) for i in numba.prange(ngroup): # pylint: disable=not-an-iterable non_zero_so_far = False for j in range(ndec): if distribution[i, j] > 0: if non_zero_so_far: result[i] = 0 break non_zero_so_far = True return result
HETEROGENEITY_MAX_COLS = 63
[docs]@numba.njit def get_heterogeneity( distribution: npt.NDArray[np.int64], ) -> npt.NDArray[np.int64]: """Compute distribution heterogeneity. Compute heterogeneity for a given input distribution. The function is mainly used for computation of heterogeneity of decision attributes. The distribution format is defined as a 2D array where: - rows correspond to separate contexts, e.g., groups of objects or equivalence classes, - values in columns for a particular row represent discrete distribution, i.e., the number of occurrences of each possible decision attribute distinct value. The result is a sequence of integer values (``0`` or :code:`>=1`), where each corresponds to a group/context (row) in the ``distribution`` input. A value of ``0`` means that there is at most one non-zero value in a given row (meaning that a row is non-heterogenous/homogenous). Values :code:`>=1` represent heterogenous rows, where different positive values show different kinds of heterogeneity. E.g., the function distinguishes a row where there are non zero values on positions ``0`` and ``1`` from a row where there are non zero values on positions ``1`` and ``2``. The actual return value :code:`>=1` that corresponds to a given row is created as a binary represented number with bits set for places where discrete distribution counts are greater than ``0``. Args: distribution: A 2D array representing a distribution. Raises: ValueError: If ``distribution`` is not a two-dimensional array. ValueError: If the number of columns in the ``distribution`` input argument is greater than ``63``. Returns: An array consisting of integer values ``0`` or :code:`>=1` indicating that a corresponding row in the ``distribution`` input argument is either non-heterogenous/homogenous (for ``0``) or heterogenous (for :code:`>=1`). Examples: >>> get_heterogeneity( ... np.asarray( ... [ ... [0, 0, 0], ... [1, 0, 0], ... [0, 1, 0], ... [0, 0, 1], ... [1, 1, 0], ... [1, 9, 0], ... [9, 1, 0], ... [1, 0, 1], ... [1, 0, 9], ... [9, 0, 1], ... [0, 1, 1], ... [0, 9, 1], ... [0, 1, 9], ... [1, 1, 1], ... [1, 8, 9], ... [8, 9, 1], ... ] ... ) ... ) array([0, 0, 0, 0, 6, 6, 6, 5, 5, 5, 3, 3, 3, 7, 7, 7]) """ if distribution.ndim != 2: raise ValueError("input `distribution` should be 2D") if distribution.shape[1] > HETEROGENEITY_MAX_COLS: raise ValueError("number of columns in `distribution` is too large") ngroup, ndec = distribution.shape result: npt.NDArray[np.int64] = np.zeros(ngroup, dtype=np.int64) for i in numba.prange(ngroup): # pylint: disable=not-an-iterable non_zero_values = 0 heterogeneity_value = np.int64(0) for j in range(ndec): heterogeneity_value = 2 * heterogeneity_value if distribution[i, j] > 0: non_zero_values += 1 heterogeneity_value += 1 if non_zero_values > 1: result[i] = heterogeneity_value return result
@numba.njit def _groups_decisions_replace( group_index: np.ndarray, y: np.ndarray, y_count: int, replace: np.ndarray, ) -> np.ndarray: """Replace decisions in groups. Replace decisions in groups according to the given ``group_decisions``. The function interprets ``replace`` argument as a mapping, where: - positions represent group ids and they are used as keys in the mapping - values represent new decision values and they are used as values in the mapping So, effectively, it maps :code:`group ids -> new decisions`. When decision value in the mapping equals to ``0`` then it has a special handling and it is interpreted as an instruction to preserve the original decision for an object. Args: group_index: Sequence of group ids that represents split of the objects represented by this structure into groups. y: Factorized decision values for the objects represented by the input ``group_index``. The values should be given in a form of integer-location based indexing sequence of the factorized decision values, i.e., 0-based values that index distinct decisions. y_count: Number of distinct decision attribute values. group_decisions: A mapping of objects groups to decisions, given as a sequence of decision ids where positions in the sequence represent group ids. The mapping represented in this way is used to change original object decisions to new decisions encoded in the mapping. The following rules are applied: - if a given object has a group (in terms of the ``group_index`` input) that maps to ``0`` (in terms of the ``group_decisions`` mapping) then the original object's decision is preserved - otherwise, a given object is assigned a new decision using the following expression:: y_count - 1 + replace[group_index[i]] i.e., a new decision value that is greater than the original range of possible values (``y_count``) is assigned according to the given ``replace`` argument Returns: New decision values created from the input ``y`` changed according to the ``group_decisions`` values. """ result = np.empty_like(y) for i in numba.prange(len(y)): # pylint: disable=not-an-iterable if replace[group_index[i]] == 0: # ``0`` is reserved for non-heterogenous groups, so we preserve the original # decision result[i] = y[i] else: # values > ``0`` represents heterogenous groups, so we set new decisions for # objects belonging to those groups based on ``heterogeneity_values``; but # the new decision values need to be numbered accordingly, i.e., the values # need to be shifted behind the original range of decisions # ``0..(y_count-1)`` result[i] = y_count - 1 + replace[group_index[i]] return result
[docs]def heterogeneous_groups_decisions_replace( x: np.ndarray, x_counts: np.ndarray, y: np.ndarray, y_count: int, attrs: rght.LocationsLike, distinguish_generalized_decisions: bool = False, ) -> Tuple[np.ndarray, int]: """Return consistent decision values. Prepare new decision values in a way that makes data consistent (in the meaning of a consistent decision table). The groups (equivalence classes in the context of the indiscernibility relation) are induced from the given dataset ``x`` and a subset of attributes ``attrs``. Original decisions ``y`` are then processed to prepare new consistent decision values. It is done by preserving decision values for homogenous groups and replacing decisions for objects from heterogenous ones. The ``distinguish_generalized_decisions`` boolean flag can be used to control whether heterogenous groups should be distinguished from each other (:code:`distinguish_generalized_decisions is True`) or treated equally (:code:`distinguish_generalized_decisions is False`). Distinguishing the heterogenous groups means that objects from groups of different characteristics (a different subset of decision values appearing in a group, cf. :func:`~skrough.homogeneity.get_heterogeneity`) are assigned different new decision values. When heterogenous groups are not to be distinguished then objects from all heterogenous groups are assigned the same new decision value. Args: x: Factorized data table representing conditional features/attributes for the objects the computation should be performed on. The values in each column should be given in a form of integer-location based indexing sequence of the factorized conditional attribute values, i.e., 0-based values that index distinct values of the conditional attribute. x_counts: Number of distinct attribute values given for each conditional attribute. The argument is expected to be given as a 1D array. y: Factorized decision values for the objects represented by the input :obj:`x` argument. The values should be given in a form of integer-location based indexing sequence of the factorized decision values, i.e., 0-based values that index distinct decisions. y_count: Number of distinct decision attribute values. attrs: A subset of conditional attributes the check should be performed on. It should be given in a form of a sequence of integer-location based indexing of the selected conditional attributes from ``x``. :obj:`None` value means to use all available conditional attributes. Defaults to :obj:`None`. distinguish_generalized_decisions: A flag to control whether heterogenous groups should be distinguished from each other or not. Defaults to :obj:`False`. Returns: New decision values returned in a form of 2-element tuple with the following elements - factorized decision attribute returned in form of 1d array - decision attribute domain size The new decision values together with the input data ``x`` and ``x_counts`` form a consistent decision table. Examples: >>> from skrough.dataprep import ( ... prepare_factorized_array, ... prepare_factorized_vector ... ) >>> x, x_counts = prepare_factorized_array(np.asarray([[8, 8, 8], ... [8, 8, 8], ... [1, 7, 8], ... [1, 8, 8], ... [1, 1, 8], ... [1, 1, 1]])) >>> y, y_count = prepare_factorized_vector(np.asarray([3, 4, 8, 9, 4, 5])) >>> y, y_count (array([0, 1, 2, 3, 1, 3]), 5) >>> replace_heterogeneous_groups_decisions( ... x, ... x_counts, ... y, ... y_count, ... attrs=[0, 1], ... distinguish_generalized_decisions=False, ... ) (array([5, 5, 2, 3, 5, 5]), 6) >>> replace_heterogeneous_groups_decisions( ... x, ... x_counts, ... y, ... y_count, ... attrs=[0, 1], ... distinguish_generalized_decisions=True, ... ) (array([6, 6, 2, 3, 5, 5]), 7) """ if len(x) == 0: return y, y_count group_index = GroupIndex.from_data(x, x_counts, attrs) dec_distribution = group_index.get_distribution(y, y_count) if distinguish_generalized_decisions: heterogeneity = get_heterogeneity(dec_distribution) else: heterogeneity = 1 - get_homogeneity(dec_distribution) # values ``0`` (if present) mean non-heterogenous groups, i.e., homogenous groups # values > ``0`` (if present) mean heterogenous groups # get_uniques_and_compacted returns unique elements as sorted (ascending) sequence heterogeneity_unique_values, heterogeneity_compacted = get_uniques_and_compacted( heterogeneity ) # let's compute the number of heterogenous groups heterogenous_groups_count = len(heterogeneity_unique_values) if heterogeneity_unique_values[0] == 0: # if the value ``0`` is there, we need to adjust, i.e., # decrease heterogenous_group_count by 1 heterogenous_groups_count -= 1 else: # otherwise, we need to adjust ``heterogeneity_compacted``, as ``0`` now # represents actual heterogenous group but we want to keep ``0`` reserved for # non-heterogenous ones heterogeneity_compacted += 1 result = _groups_decisions_replace( group_index=group_index.index, y=y, y_count=y_count, replace=heterogeneity_compacted, ) return result, (y_count + heterogenous_groups_count)