"""Module containing a custom accessor and helpers for querying lyDATA.
Because of the special three-level header of the lyDATA tables, it is sometimes
cumbersome and lengthy to access the columns. While this is certainly necessary to
access e.g. the contralateral involvement of LNL II as observed on CT images
(``df["CT", "contra", "II"]``), for simple patient information such as age and HPV
status, it is more convenient to use short names, which we implement in this module.
The main class in this module is the :py:class:`LyDataAccessor` class, which provides
the above mentioned functionality. That way, accessing the age of all patients is now
as easy as typing ``df.ly.age``.
Beyond that, the module implements a convenient wat to query the
:py:class:`pd.DataFrame`: The :py:class:`Q` object, that was inspired by Django's
``Q`` object. It allows for more readable and modular queries, which can be combined
with logical operators and reused across different DataFrames.
The :py:class:`Q` objects can be passed to the :py:meth:`LyDataAccessor.query` and
:py:meth:`LyDataAccessor.portion` methods to filter the DataFrame or compute the
:py:class:`QueryPortion` of rows that satisfy the query.
Further, we implement methods like :py:meth:`LyDataAccessor.combine`,
:py:meth:`LyDataAccessor.infer_sublevels`, and
:py:meth:`LyDataAccessor.infer_superlevels` to compute additional columns from the
lyDATA tables. This is sometimes necessary, because not all data contains all the
possibly necessary columns. E.g., in some cohorts we do have detailed sublevel
information (i.e., IIa and IIb), while in others only the superlevel (II) is reported.
"""
from __future__ import annotations
import warnings
from collections.abc import Callable
from dataclasses import dataclass
from itertools import product
from typing import Any, Literal
import numpy as np
import pandas as pd
import pandas.api.extensions as pd_ext
from lydata.utils import (
ModalityConfig,
get_default_column_map,
get_default_modalities,
)
from lydata.validator import construct_schema
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
def _get_all_true(df: pd.DataFrame) -> pd.Series:
"""Return a mask with all entries set to ``True``."""
return pd.Series([True] * len(df))
[docs]
class CombineQMixin:
"""Mixin class for combining queries.
Four operators are defined for combining queries:
1. ``&`` for logical AND operations.
The returned object is an :py:class:`AndQ` instance and - when executed -
returns a boolean mask where both queries are satisfied. When the right-hand
side is ``None``, the left-hand side query object is returned unchanged.
2. ``|`` for logical OR operations.
The returned object is an :py:class:`OrQ` instance and - when executed -
returns a boolean mask where either query is satisfied. When the right-hand
side is ``None``, the left-hand side query object is returned unchanged.
3. ``~`` for inverting a query.
The returned object is a :py:class:`NotQ` instance and - when executed -
returns a boolean mask where the query is not satisfied.
4. ``==`` for checking if two queries are equal.
Two queries are equal if their column names, operators, and values are equal.
Note that this does not check if the queries are semantically equal, i.e., if
they would return the same result when executed.
"""
def __and__(self, other: QTypes | None) -> AndQ:
"""Combine two queries with a logical AND."""
return self if other is None else AndQ(self, other)
def __or__(self, other: QTypes | None) -> OrQ:
"""Combine two queries with a logical OR."""
return self if other is None else OrQ(self, other)
def __invert__(self) -> NotQ:
"""Negate the query."""
return NotQ(self)
def __eq__(self, value):
"""Check if two queries are equal."""
return (
isinstance(value, self.__class__)
and self.colname == value.colname
and self.operator == value.operator
and self.value == value.value
)
[docs]
class Q(CombineQMixin):
"""Combinable query object for filtering a DataFrame.
The syntax for this object is similar to Django's ``Q`` object. It can be used to
define queries in a more readable and modular way.
.. caution::
The column names are not checked upon instantiation. This is only done when the
query is executed. In fact, the ``Q`` object does not even know about the
:py:class:`~pandas.DataFrame` it will be applied to in the beginning. On the
flip side, this means a query may be reused for different DataFrames.
"""
_OPERATOR_MAP: dict[str, Callable[[pd.Series, Any], pd.Series]] = {
"==": lambda series, value: series == value,
"<": lambda series, value: series < value,
"<=": lambda series, value: series <= value,
">": lambda series, value: series > value,
">=": lambda series, value: series >= value,
"!=": lambda series, value: series != value, # same as ~Q("col", "==", value)
"in": lambda series, value: series.isin(value), # value is a list
"contains": lambda series, value: series.str.contains(value), # value is a str
}
def __init__(
self,
column: str,
operator: Literal["==", "<", "<=", ">", ">=", "!=", "in", "contains"],
value: Any,
) -> None:
"""Create query object that can compare a ``column`` with a ``value``."""
self.colname = column
self.operator = operator
self.value = value
self._column_map = get_default_column_map()
def __repr__(self) -> str:
"""Return a string representation of the query."""
return f"Q({self.colname!r}, {self.operator!r}, {self.value!r})"
[docs]
def execute(self, df: pd.DataFrame) -> pd.Series:
"""Return a boolean mask where the query is satisfied for ``df``.
>>> df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['foo', 'bar', 'baz']})
>>> Q('col1', '<=', 2).execute(df)
0 True
1 True
2 False
Name: col1, dtype: bool
>>> Q('col2', 'contains', 'ba').execute(df)
0 False
1 True
2 True
Name: col2, dtype: bool
"""
try:
colname = self._column_map.from_short[self.colname].long
except KeyError:
colname = self.colname
column = df[colname]
if callable(self.value):
return self.value(column)
return self._OPERATOR_MAP[self.operator](column, self.value)
[docs]
class AndQ(CombineQMixin):
"""Query object for combining two queries with a logical AND.
>>> df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['foo', 'bar', 'baz']})
>>> q1 = Q('col1', '!=', 3)
>>> q2 = Q('col2', 'contains', 'ba')
>>> and_q = q1 & q2
>>> print(and_q)
Q('col1', '!=', 3) & Q('col2', 'contains', 'ba')
>>> isinstance(and_q, AndQ)
True
>>> and_q.execute(df)
0 False
1 True
2 False
dtype: bool
>>> all((q1 & None).execute(df) == q1.execute(df))
True
"""
def __init__(self, q1: QTypes, q2: QTypes) -> None:
"""Combine two queries with a logical AND."""
self.q1 = q1
self.q2 = q2
def __repr__(self) -> str:
"""Return a string representation of the query."""
return f"{self.q1!r} & {self.q2!r}"
[docs]
def execute(self, df: pd.DataFrame) -> pd.Series:
"""Return a boolean mask where both queries are satisfied."""
return self.q1.execute(df) & self.q2.execute(df)
[docs]
class OrQ(CombineQMixin):
"""Query object for combining two queries with a logical OR.
>>> df = pd.DataFrame({'col1': [1, 2, 3]})
>>> q1 = Q('col1', '==', 1)
>>> q2 = Q('col1', '==', 3)
>>> or_q = q1 | q2
>>> print(or_q)
Q('col1', '==', 1) | Q('col1', '==', 3)
>>> isinstance(or_q, OrQ)
True
>>> or_q.execute(df)
0 True
1 False
2 True
Name: col1, dtype: bool
>>> all((q1 | None).execute(df) == q1.execute(df))
True
"""
def __init__(self, q1: QTypes, q2: QTypes) -> None:
"""Combine two queries with a logical OR."""
self.q1 = q1
self.q2 = q2
def __repr__(self) -> str:
"""Return a string representation of the query."""
return f"{self.q1!r} | {self.q2!r}"
[docs]
def execute(self, df: pd.DataFrame) -> pd.Series:
"""Return a boolean mask where either query is satisfied."""
return self.q1.execute(df) | self.q2.execute(df)
[docs]
class NotQ(CombineQMixin):
"""Query object for negating a query.
>>> df = pd.DataFrame({'col1': [1, 2, 3]})
>>> q = Q('col1', '==', 2)
>>> not_q = ~q
>>> print(not_q)
~Q('col1', '==', 2)
>>> isinstance(not_q, NotQ)
True
>>> not_q.execute(df)
0 True
1 False
2 True
Name: col1, dtype: bool
"""
def __init__(self, q: QTypes) -> None:
"""Negate the given query ``q``."""
self.q = q
def __repr__(self) -> str:
"""Return a string representation of the query."""
return f"~{self.q!r}"
[docs]
def execute(self, df: pd.DataFrame) -> pd.Series:
"""Return a boolean mask where the query is not satisfied."""
return ~self.q.execute(df)
[docs]
class NoneQ(CombineQMixin):
"""Query object that always returns the entire DataFrame. Useful as default."""
def __repr__(self) -> str:
"""Return a string representation of the query."""
return "NoneQ()"
[docs]
def execute(self, df: pd.DataFrame) -> pd.Series:
"""Return a boolean mask with all entries set to ``True``."""
return _get_all_true(df)
QTypes = Q | AndQ | OrQ | NotQ | None
"""Type for a query object or a combination of query objects."""
[docs]
class C:
"""Wraps a column name and produces a :py:class:`Q` object upon comparison.
.. caution::
Just like for the :py:class:`Q` object, it is not checked upon instantiation
whether the column name is valid. This is only done when the query is executed.
"""
def __init__(self, *column: str) -> None:
"""Create a column object for comparison.
For querying multi-level columns, both the syntax ``C('col1', 'col2')`` and
``C(('col1', 'col2'))`` are valid.
>>> (C('col1', 'col2') == 1) == (C(('col1', 'col2')) == 1)
True
"""
self.column = column[0] if len(column) == 1 else column
def __eq__(self, value: Any) -> Q:
"""Create a query object for comparing equality.
>>> C('foo') == 'bar'
Q('foo', '==', 'bar')
"""
return Q(self.column, "==", value)
def __lt__(self, value: Any) -> Q:
"""Create a query object for comparing less than.
>>> C('foo') < 42
Q('foo', '<', 42)
"""
return Q(self.column, "<", value)
def __le__(self, value: Any) -> Q:
"""Create a query object for comparing less than or equal.
>>> C('foo') <= 42
Q('foo', '<=', 42)
"""
return Q(self.column, "<=", value)
def __gt__(self, value: Any) -> Q:
"""Create a query object for comparing greater than.
>>> C('foo') > 42
Q('foo', '>', 42)
"""
return Q(self.column, ">", value)
def __ge__(self, value: Any) -> Q:
"""Create a query object for comparing greater than or equal.
>>> C('foo') >= 42
Q('foo', '>=', 42)
"""
return Q(self.column, ">=", value)
def __ne__(self, value: Any) -> Q:
"""Create a query object for comparing inequality.
>>> C('foo') != 'bar'
Q('foo', '!=', 'bar')
"""
return Q(self.column, "!=", value)
[docs]
def isin(self, value: list[Any]) -> Q:
"""Create a query object for checking if the column values are in a list.
>>> C('foo').isin([1, 2, 3])
Q('foo', 'in', [1, 2, 3])
"""
return Q(self.column, "in", value)
[docs]
def contains(self, value: str) -> Q:
"""Create a query object for checking if the column values contain a string.
>>> C('foo').contains('bar')
Q('foo', 'contains', 'bar')
"""
return Q(self.column, "contains", value)
[docs]
@dataclass
class QueryPortion:
"""Dataclass for storing the portion of a query."""
match: int
total: int
def __post_init__(self) -> None:
"""Check that the portion is valid.
>>> QueryPortion(5, 2)
Traceback (most recent call last):
...
ValueError: Match must be less than or equal to total.
"""
if self.total < 0:
raise ValueError("Total must be non-negative.")
if self.match < 0:
raise ValueError("Match must be non-negative.")
if self.match > self.total:
raise ValueError("Match must be less than or equal to total.")
@property
def fail(self) -> int:
"""Get the number of failures.
>>> QueryPortion(2, 5).fail
3
"""
return self.total - self.match
@property
def ratio(self) -> float:
"""Get the ratio of matches over the total.
>>> QueryPortion(2, 5).ratio
0.4
"""
return self.match / self.total
@property
def percent(self) -> float:
"""Get the percentage of matches over the total.
>>> QueryPortion(2, 5).percent
40.0
"""
return self.ratio * 100
[docs]
def invert(self) -> QueryPortion:
"""Return the inverted portion.
>>> QueryPortion(2, 5).invert()
QueryPortion(match=3, total=5)
"""
return QueryPortion(match=self.fail, total=self.total)
[docs]
def align_diagnoses(
dataset: pd.DataFrame,
modalities: list[str],
) -> list[pd.DataFrame]:
"""Stack aligned diagnosis tables in ``dataset`` for each of ``modalities``."""
diagnosis_stack = []
for modality in modalities:
try:
this = dataset[modality].copy().drop(columns=["info"], errors="ignore")
except KeyError:
warnings.warn(f"Did not find modality {modality}, cannot align. Skipping.") # noqa
continue
for i, other in enumerate(diagnosis_stack):
this, other = this.align(other, join="outer")
diagnosis_stack[i] = other
diagnosis_stack.append(this)
return diagnosis_stack
def _stack_to_float_matrix(diagnosis_stack: list[pd.DataFrame]) -> np.ndarray:
"""Convert diagnosis stack to 3D array of floats with ``Nones`` as ``np.nan``."""
diagnosis_matrix = np.array(diagnosis_stack)
diagnosis_matrix[pd.isna(diagnosis_matrix)] = np.nan
return np.astype(diagnosis_matrix, float)
def _evaluate_likelihood_ratios(
diagnosis_matrix: np.ndarray,
sensitivities: np.ndarray,
specificities: np.ndarray,
method: Literal["max_llh", "rank"],
) -> np.ndarray:
"""Compare the likelihoods of true/false diagnoses using the given ``method``.
The ``diagnosis_matrix`` is a 3D array of shape ``(n_modalities, n_patients,
n_levels)``. The ``sensitivities`` and ``specificities`` are 1D arrays of shape
``(n_modalities,)``. When choosing the ``method="max_llh"``, the likelihood of each
diagnosis is combined into one likelihood for each patient and level. With
``method="rank"``, the most trustworthy diagnosis is chosen for each patient and
level.
"""
true_pos = sensitivities[:, None, None] * diagnosis_matrix
false_neg = (1 - sensitivities[:, None, None]) * (1 - diagnosis_matrix)
true_neg = specificities[:, None, None] * (1 - diagnosis_matrix)
false_pos = (1 - specificities[:, None, None]) * diagnosis_matrix
if method not in {"max_llh", "rank"}:
raise ValueError(f"Unknown method {method}")
agg_func = np.nanprod if method == "max_llh" else np.nanmax
true_llh = agg_func(true_pos + false_neg, axis=0)
false_llh = agg_func(true_neg + false_pos, axis=0)
return true_llh >= false_llh
def _expand_mapping(
short_map: dict[str, Any],
colname_map: dict[str | tuple[str, str, str], Any] | None = None,
) -> dict[tuple[str, str, str], Any]:
"""Expand the column map to full column names.
>>> _expand_mapping({'age': 'foo', 'hpv': 'bar'})
{('patient', '#', 'age'): 'foo', ('patient', '#', 'hpv_status'): 'bar'}
"""
_colname_map = colname_map or get_default_column_map().from_short
expanded_map = {}
for colname, func in short_map.items():
expanded_colname = getattr(_colname_map.get(colname), "long", colname)
expanded_map[expanded_colname] = func
return expanded_map
AggFuncType = dict[str | tuple[str, str, str], Callable[[pd.Series], pd.Series]]
[docs]
@pd_ext.register_dataframe_accessor("ly")
class LyDataAccessor:
"""Custom accessor for handling lymphatic involvement data.
This aims to provide an easy and user-friendly interface to the most commonly needed
operations on the lymphatic involvement data we publish in the lydata project.
"""
def __init__(self, obj: pd.DataFrame) -> None:
"""Initialize the accessor with a DataFrame."""
self._obj = obj
self._column_map = get_default_column_map()
def __contains__(self, key: str) -> bool:
"""Check if a column is contained in the DataFrame.
>>> df = pd.DataFrame({("patient", "#", "age"): [61, 52, 73]})
>>> "age" in df.ly
True
>>> "foo" in df.ly
False
>>> ("patient", "#", "age") in df.ly
True
"""
_key = self._get_safe_long(key)
return _key in self._obj
def __getitem__(self, key: str) -> pd.Series:
"""Allow column access by short name, too."""
_key = self._get_safe_long(key)
return self._obj[_key]
def __getattr__(self, name: str) -> Any:
"""Access columns also by short name.
>>> df = pd.DataFrame({("patient", "#", "age"): [61, 52, 73]})
>>> df.ly.age
0 61
1 52
2 73
Name: (patient, #, age), dtype: int64
>>> df.ly.foo
Traceback (most recent call last):
...
AttributeError: Attribute 'foo' not found.
"""
try:
return self[name]
except KeyError as key_err:
raise AttributeError(f"Attribute {name!r} not found.") from key_err
def _get_safe_long(self, key: Any) -> tuple[str, str, str]:
"""Get the long column name or return the input."""
return getattr(self._column_map.from_short.get(key), "long", key)
[docs]
def validate(self, modalities: list[str] | None = None) -> pd.DataFrame:
"""Validate the DataFrame against the lydata schema.
The schema is constructed by the :py:func:`construct_schema` function using
the ``modalities`` provided or it will :py:func:`get_default_modalities` if
``None`` are provided.
"""
modalities = modalities or list(get_default_modalities().keys())
lydata_schema = construct_schema(modalities=modalities)
return lydata_schema.validate(self._obj)
[docs]
def get_modalities(self, _filter: list[str] | None = None) -> list[str]:
"""Return the modalities present in this DataFrame.
.. warning::
This method assumes that all top-level columns are modalities, except for
some predefined non-modality columns. For some custom dataset, this may not
be correct. In that case, you should provide a list of columns to
``_filter``, i.e., the columns that are *not* modalities.
"""
top_level_cols = self._obj.columns.get_level_values(0)
modalities = top_level_cols.unique().tolist()
for non_modality_col in _filter or [
"patient",
"tumor",
"total_dissected",
"positive_dissected",
"enbloc_dissected",
"enbloc_positive",
]:
try:
modalities.remove(non_modality_col)
except ValueError:
pass
return modalities
[docs]
def query(self, query: QTypes = None) -> pd.DataFrame:
"""Return a DataFrame with rows that satisfy the ``query``.
A query is a :py:class:`Q` object that can be combined with logical operators.
See this class' documentation for more information.
As a shorthand for creating these :py:class:`Q` objects, you can use the
:py:class:`C` object as in the example below, where we query all entries where
``x`` is greater than 1 and not less than 3:
>>> df = pd.DataFrame({'x': [1, 2, 3]})
>>> df.ly.query((C('x') > 1) & ~(C('x') < 3))
x
2 3
"""
mask = (query or NoneQ()).execute(self._obj)
return self._obj[mask]
[docs]
def portion(self, query: QTypes = None, given: QTypes = None) -> QueryPortion:
"""Compute how many rows satisfy a ``query``, ``given`` some other conditions.
This returns a :py:class:`QueryPortion` object that contains the number of rows
satisfying the ``query`` and ``given`` :py:class:`Q` object divided by the
number of rows satisfying only the ``given`` condition.
>>> df = pd.DataFrame({'x': [1, 2, 3]})
>>> df.ly.portion(query=C('x') == 2, given=C('x') > 1)
QueryPortion(match=np.int64(1), total=np.int64(2))
>>> df.ly.portion(query=C('x') == 2, given=C('x') > 3)
QueryPortion(match=np.int64(0), total=np.int64(0))
"""
given_mask = (given or NoneQ()).execute(self._obj)
query_mask = (query or NoneQ()).execute(self._obj)
return QueryPortion(
match=query_mask[given_mask].sum(),
total=given_mask.sum(),
)
[docs]
def stats(
self,
agg_funcs: AggFuncType | None = None,
use_shortnames: bool = True,
out_format: str = "dict",
) -> Any:
"""Compute statistics.
The ``agg_funcs`` argument is a mapping of column names to functions that
receive a :py:class:`pd.Series` and return a :py:class:`pd.Series`. The default
is a useful selection of statistics for the most common columns. E.g., for the
column ``('patient', '#', 'age')`` (or its short column name ``age``), the
default function returns the value counts.
The ``use_shortnames`` argument determines whether the output should use the
short column names or the long ones. The default is to use the short names.
With ``out_format`` one can specify the output format. Available options are
those formats for which pandas has a ``to_<format>`` method.
>>> df = pd.DataFrame({
... ('patient', '#', 'age'): [61, 52, 73, 61],
... ('patient', '#', 'hpv_status'): [True, False, None, True],
... ('tumor', '1', 't_stage'): [2, 3, 1, 2],
... })
>>> df.ly.stats() # doctest: +NORMALIZE_WHITESPACE
{'age': {61: 2, 52: 1, 73: 1},
'hpv': {True: 2, False: 1, None: 1},
't_stage': {2: 2, 3: 1, 1: 1}}
"""
_agg_funcs = self._column_map.from_short.copy()
_agg_funcs.update(agg_funcs or {})
stats = {}
for colname, func in _agg_funcs.items():
if colname not in self:
continue
column = self[colname]
if use_shortnames and colname in self._column_map.from_long:
colname = self._column_map.from_long[colname].short
stats[colname] = getattr(func(column), f"to_{out_format}")()
return stats
def _filter_and_sort_modalities(
self,
modalities: dict[str, ModalityConfig] | None = None,
) -> dict[str, ModalityConfig]:
"""Return only those ``modalities`` present in data and sorted as in data."""
modalities = modalities or get_default_modalities()
return {
modality_name: modality_config
for modality_name, modality_config in modalities.items()
if modality_name in self.get_modalities()
}
[docs]
def combine(
self,
modalities: dict[str, ModalityConfig] | None = None,
method: Literal["max_llh", "rank"] = "max_llh",
) -> pd.DataFrame:
"""Combine diagnoses of ``modalities`` using ``method``.
The order of the provided ``modalities`` does not matter, as it is aligned
with the order in the DataFrame. With ``method="max_llh"``, the most likely
true state of involvement is inferred based on all available diagnoses for
each patient and level. With ``method="rank"``, only the most trustworthy
diagnosis is chosen for each patient and level based on the sensitivity and
specificity of the given list of ``modalities``.
The result contains only the combined columns. The intended use is to
:py:meth:`~pandas.DataFrame.update` the original DataFrame with the result.
>>> df = pd.DataFrame({
... ('CT' , 'ipsi', 'I'): [False, True , False, True, None],
... ('MRI' , 'ipsi', 'I'): [False, True , True , None, None],
... ('pathology', 'ipsi', 'I'): [True , None , None, False, None],
... })
>>> df.ly.combine() # doctest: +NORMALIZE_WHITESPACE
ipsi
I
0 True
1 True
2 False
3 False
4 None
"""
modalities = self._filter_and_sort_modalities(modalities)
diagnosis_stack = align_diagnoses(self._obj, list(modalities.keys()))
diagnosis_matrix = _stack_to_float_matrix(diagnosis_stack)
all_nan_mask = np.all(np.isnan(diagnosis_matrix), axis=0)
result = _evaluate_likelihood_ratios(
diagnosis_matrix=diagnosis_matrix,
sensitivities=np.array([mod.sens for mod in modalities.values()]),
specificities=np.array([mod.spec for mod in modalities.values()]),
method=method,
)
result = np.astype(result, object)
result[all_nan_mask] = None
return pd.DataFrame(result, columns=diagnosis_stack[0].columns)
[docs]
def infer_sublevels(
self,
modalities: list[str] | None = None,
sides: list[Literal["ipsi", "contra"]] | None = None,
subdivisions: dict[str, list[str]] | None = None,
) -> pd.DataFrame:
"""Determine involvement status of an LNL's sublevels (e.g., IIa and IIb).
Some LNLs have sublevels, e.g., IIa and IIb. The involvement of these sublevels
is not always reported, but only the superlevel's status. This function infers
the status of the sublevels from the superlevel.
The sublevel's status is computed for the specified ``modalities``. If and what
sublevels a superlevel has, is specified in ``subdivisions``. The default
``subdivisions`` argument looks like this:
.. code-block:: python
{
"I": ["a", "b"],
"II": ["a", "b"],
"V": ["a", "b"],
}
The resulting DataFrame will only contain the newly inferred sublevel columns
and only for those sublevels that were not already present in the DataFrame.
Thus, one can simply :py:meth:`~pandas.DataFrame.join` the original DataFrame
with the result.
>>> df = pd.DataFrame({
... ('MRI', 'ipsi' , 'I' ): [True , False, False, None],
... ('MRI', 'contra', 'I' ): [False, True , False, None],
... ('MRI', 'ipsi' , 'II'): [False, False, True , None],
... ('MRI', 'ipsi' , 'IV'): [False, False, True , None],
... ('CT' , 'ipsi' , 'I' ): [True , False, False, None],
... })
>>> df.ly.infer_sublevels(modalities=["MRI"]) # doctest: +NORMALIZE_WHITESPACE
MRI
ipsi contra
Ia Ib IIa IIb Ia Ib
0 None None False False False False
1 False False False False None None
2 False False None None False False
3 None None None None None None
"""
modalities = modalities or list(get_default_modalities().keys())
sides = sides or ["ipsi", "contra"]
subdivisions = subdivisions or {
"I": ["a", "b"],
"II": ["a", "b"],
"V": ["a", "b"],
}
result = self._obj.copy().drop(self._obj.columns, axis=1)
loop_combinations = product(modalities, sides, subdivisions.items())
for modality, side, (superlevel, subids) in loop_combinations:
try:
is_healthy = self._obj[modality, side, superlevel] == False # noqa
except KeyError:
continue
for subid in subids:
sublevel = superlevel + subid
result.loc[is_healthy, (modality, side, sublevel)] = False
result.loc[~is_healthy, (modality, side, sublevel)] = None
return result
[docs]
def infer_superlevels(
self,
modalities: list[str] | None = None,
sides: list[Literal["ipsi", "contra"]] | None = None,
subdivisions: dict[str, list[str]] | None = None,
) -> pd.DataFrame:
"""Determine involvement status of an LNL's superlevel (e.g., II).
Some LNLs have sublevels, e.g., IIa and IIb. In real data, sometimes the
sublevels are reported, sometimes only the superlevel. This function infers the
status of the superlevel from the sublevels.
The superlevel's status is computed for the specified ``modalities``. If and
what sublevels a superlevel has, is specified in ``subdivisions``.
The resulting DataFrame will only contain the newly inferred superlevel columns
and only for those superlevels that were not already present in the DataFrame.
This way, it is straightforward to :py:meth:`~pandas.DataFrame.join` it with the
original DataFrame.
>>> df = pd.DataFrame({
... ('MRI', 'ipsi' , 'Ia' ): [True , False, False, None],
... ('MRI', 'ipsi' , 'Ib' ): [False, True , False, None],
... ('MRI', 'contra', 'IIa'): [False, False, None , None],
... ('MRI', 'contra', 'IIb'): [False, True , True , None],
... ('CT' , 'ipsi' , 'I' ): [True , False, False, None],
... })
>>> df.ly.infer_superlevels(modalities=["MRI"]) # doctest: +NORMALIZE_WHITESPACE
MRI
ipsi contra
I II
0 True False
1 True True
2 False True
3 None None
"""
modalities = modalities or list(get_default_modalities().keys())
sides = sides or ["ipsi", "contra"]
subdivisions = subdivisions or {
"I": ["a", "b"],
"II": ["a", "b"],
"V": ["a", "b"],
}
result = self._obj.copy().drop(self._obj.columns, axis=1)
loop_combinations = product(modalities, sides, subdivisions.items())
for modality, side, (superlevel, subids) in loop_combinations:
sublevels = [superlevel + subid for subid in subids]
sublevel_cols = [(modality, side, sublevel) for sublevel in sublevels]
try:
are_all_healthy = ~self._obj[sublevel_cols].any(axis=1)
is_any_involved = self._obj[sublevel_cols].any(axis=1)
is_unknown = self._obj[sublevel_cols].isna().all(axis=1)
except KeyError:
continue
result.loc[are_all_healthy, (modality, side, superlevel)] = False
result.loc[is_any_involved, (modality, side, superlevel)] = True
result.loc[is_unknown, (modality, side, superlevel)] = None
return result
def _main() -> None:
"""Run main function."""
...
def _run_doctests() -> None:
"""Run the module doctests."""
import doctest
doctest.testmod()
if __name__ == "__main__":
_run_doctests()