Source code for cardinal_pythonlib.athena_ohdsi

#!/usr/bin/env python
# cardinal_pythonlib/athena_ohdsi.py

"""
===============================================================================

    Original code copyright (C) 2009-2022 Rudolf Cardinal (rudolf@pobox.com).

    This file is part of cardinal_pythonlib.

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        https://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

===============================================================================

**Functions to assist with the Athena OHDSI vocabularies.**

See https://athena.ohdsi.org/.

"""

import csv
import logging
from typing import Collection, Generator, Iterable, List

from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import simple_repr
from cardinal_pythonlib.snomed import SnomedConcept

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Athena OHDSI mapping
# =============================================================================

# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------


[docs]class AthenaVocabularyId(object): """ Constant-holding class for Athena vocabulary IDs that we care about. """ ICD10CM = "ICD10CM" ICD9CM = "ICD9CM" OPCS4 = "OPCS4" SNOMED = "SNOMED"
[docs]class AthenaRelationshipId(object): r""" Constant-holding class for Athena relationship IDs that we care about. To show all (there are lots!): .. code-block:: bash awk 'BEGIN {FS="\t"}; {print $3}' CONCEPT_RELATIONSHIP.csv | sort -u """ IS_A = "Is a" # "is a child of" MAPS_TO = "Maps to" # converting between vocabularies MAPPED_FROM = "Mapped from" # converting between vocabularies SUBSUMES = "Subsumes" # "is a parent of"
# ----------------------------------------------------------------------------- # TSV row info classes # -----------------------------------------------------------------------------
[docs]class AthenaConceptRow(object): """ Simple information-holding class for ``CONCEPT.csv`` file from https://athena.ohdsi.org/ vocabulary download. """ HEADER = [ "concept_id", "concept_name", "domain_id", "vocabulary_id", "concept_class_id", "standard_concept", "concept_code", "valid_start_date", "valid_end_date", "invalid_reason", ] def __init__( self, concept_id: str, concept_name: str, domain_id: str, vocabulary_id: str, concept_class_id: str, standard_concept: str, concept_code: str, valid_start_date: str, valid_end_date: str, invalid_reason: str, ) -> None: """ Argument order is important. Args: concept_id: Athena concept ID concept_name: Concept name in the originating system domain_id: e.g. "Observation", "Condition" vocabulary_id: e.g. "SNOMED", "ICD10CM" concept_class_id: e.g. "Substance", "3-char nonbill code" standard_concept: ?; e.g. "S" concept_code: concept code in the vocabulary (e.g. SNOMED-CT concept code like "3578611000001105" if vocabulary_id is "SNOMED"; ICD-10 code like "F32.2" if vocabulary_is is "ICD10CM"; etc.) valid_start_date: date in YYYYMMDD format valid_end_date: date in YYYYMMDD format invalid_reason: ? (but one can guess) """ self.concept_id = int(concept_id) self.concept_name = concept_name self.domain_id = domain_id self.vocabulary_id = vocabulary_id self.concept_class_id = concept_class_id self.standard_concept = standard_concept self.concept_code = concept_code self.valid_start_date = valid_start_date self.valid_end_date = valid_end_date self.invalid_reason = invalid_reason # self.sort_context_concept_to_match = None def __repr__(self) -> str: return simple_repr(self, self.HEADER) def __str__(self) -> str: return ( f"Vocabulary {self.vocabulary_id}, concept {self.concept_code} " f"({self.concept_name}) -> Athena concept {self.concept_id}" ) # I looked at sorting them to find the best. Not wise; would need human # review. Just use all valid codes. _ = ''' def set_sort_context_concept_to_match(self, concept: "AthenaConceptRow") -> None: self.sort_context_concept_to_match = concept def __lt__(self, other: "AthenaConceptRow") -> bool: """ Compares using "less than" being equivalent to "preferable to". So, returns True if "self" is better than other, and False if "self" is worse than other; that is, all tests look like "return self is better than other". BINNED. We will use human judgement. """ invalid_s = bool(self.invalid_reason) invalid_o = bool(other.invalid_reason) if invalid_s != invalid_o: # better not to have an "invalid" reason; # empty strings are "less than" full ones return invalid_s < invalid_o if self.valid_end_date != other.valid_end_date: # better to have a later end date return self.valid_end_date > other.valid_end_date if self.valid_start_date != other.valid_start_date: # better to have an earlier start date return self.valid_start_date < other.valid_end_date if self.sort_context_concept_to_match: # Which is closer to our target context? c = self.sort_context_concept_to_match sp = self.match_tuple(c) op = other.match_tuple(c) log.info( "Tie-breaking to match {c}: {s} ({sp} points) vs " "{o} ({op} points)", s=self, sp=sp, o=other, op=op, c=c ) # More matching points is better return self.match_tuple(c) > other.match_tuple(c) log.warning("Tie-breaking {} and {} by ID", self, other) # Arbitrarily, better to have an earlier (lower) concept ID. return self.concept_id < other.concept_id def match_tuple(self, target: "AthenaConceptRow") -> Tuple[float, float]: """ Returns a score reflecting our similarity to the target. See - https://stackoverflow.com/questions/8897593/similarity-between-two-text-documents - https://stackoverflow.com/questions/2380394/simple-implementation-of-n-gram-tf-idf-and-cosine-similarity-in-python - https://spacy.io/usage/vectors-similarity -- data not included - https://radimrehurek.com/gensim/index.html - https://radimrehurek.com/gensim/tut3.html - https://scikit-learn.org/stable/ - https://www.nltk.org/ BINNED. We will use human judgement. """ # noqa self_words = set(x.lower() for x in self.concept_name.split()) other_words = set(x.lower() for x in target.concept_name.split()) # More matching words better n_matching_words = len(self_words & other_words) # More words better (often more specific) n_words = len(self_words) return float(n_matching_words), float(n_words) '''
[docs] def snomed_concept(self) -> SnomedConcept: """ Assuming this Athena concept reflects a SnomedConcept, returns it. (Asserts if it isn't.) """ assert self.vocabulary_id == AthenaVocabularyId.SNOMED return SnomedConcept(int(self.concept_code), self.concept_name)
[docs]class AthenaConceptRelationshipRow(object): """ Simple information-holding class for ``CONCEPT_RELATIONSHIP.csv`` file from https://athena.ohdsi.org/ vocabulary download. """ HEADER = [ "concept_id_1", "concept_id_2", "relationship_id", "valid_start_date", "valid_end_date", "invalid_reason", ] def __init__( self, concept_id_1: str, concept_id_2: str, relationship_id: str, valid_start_date: str, valid_end_date: str, invalid_reason: str, ) -> None: """ Argument order is important. Args: concept_id_1: Athena concept ID #1 concept_id_2: Athena concept ID #2 relationship_id: e.g. "Is a", "Has legal category" valid_start_date: date in YYYYMMDD format valid_end_date: date in YYYYMMDD format invalid_reason: ? (but one can guess) """ self.concept_id_1 = int(concept_id_1) self.concept_id_2 = int(concept_id_2) self.relationship_id = relationship_id self.valid_start_date = valid_start_date self.valid_end_date = valid_end_date self.invalid_reason = invalid_reason def __repr__(self) -> str: return simple_repr(self, self.HEADER) def __str__(self) -> str: return ( f"Athena concept relationship {self.concept_id_1} " f"{self.relationship_id!r} {self.concept_id_2}" )
# ----------------------------------------------------------------------------- # Fetch data from TSV files # ----------------------------------------------------------------------------- # noinspection DuplicatedCode
[docs]def get_athena_concepts( tsv_filename: str = "", cached_concepts: Iterable[AthenaConceptRow] = None, vocabulary_ids: Collection[str] = None, concept_codes: Collection[str] = None, concept_ids: Collection[int] = None, not_vocabulary_ids: Collection[str] = None, not_concept_codes: Collection[str] = None, not_concept_ids: Collection[int] = None, encoding: str = "utf-8", ) -> List[AthenaConceptRow]: """ From the Athena ``CONCEPT.csv`` tab-separated value file, return a list of concepts matching the restriction criteria. Args: tsv_filename: filename cached_concepts: alternative to tsv_filename vocabulary_ids: permissible ``vocabulary_id`` values, or None or an empty list for all concept_codes: permissible ``concept_code`` values, or None or an empty list for all concept_ids: permissible ``concept_id`` values, or None or an empty list for all not_vocabulary_ids: impermissible ``vocabulary_id`` values, or None or an empty list for none not_concept_codes: impermissible ``concept_code`` values, or None or an empty list for none not_concept_ids: impermissible ``concept_id`` values, or None or an empty list for none encoding: encoding for input files Returns: list: of :class:`AthenaConceptRow` objects Test and timing code: .. code-block:: python import logging import timeit logging.basicConfig(level=logging.DEBUG) from cardinal_pythonlib.athena_ohdsi import ( get_athena_concepts, get_athena_concept_relationships, ) concept_filename = "CONCEPT.csv" cr_filename = "CONCEPT_RELATIONSHIP.csv" testcode = "175898006" testid = 46067884 concept_testcode = ''' get_athena_concepts(concept_filename, concept_codes=[testcode]) ''' cr_testcode = ''' get_athena_concept_relationships(cr_filename, concept_id_1_values=[testid]) ''' timeit.timeit(cr_testcode, number=1, globals=globals()) # Initial method: 33.6 s (for 9.9m rows on a Windows laptop). # Chain of generators: 21.5 s. Better. timeit.timeit(concept_testcode, number=1, globals=globals()) # After speedup: 3.9 s for 1.1m rows. """ # noqa assert bool(tsv_filename) != bool( cached_concepts ), "Specify either tsv_filename or cached_concepts" n_rows_read = 0 def gen_rows() -> Generator[AthenaConceptRow, None, None]: nonlocal n_rows_read with open(tsv_filename, "r", encoding=encoding) as tsvin: reader = csv.reader(tsvin, delimiter="\t") header = next(reader, None) if header != AthenaConceptRow.HEADER: raise ValueError( f"Athena concept file has unexpected header: {header!r}; " f"expected {AthenaConceptRow.HEADER!r}" ) for row in reader: n_rows_read += 1 concept = AthenaConceptRow(*row) yield concept def filter_vocab( concepts_: Iterable[AthenaConceptRow], ) -> Generator[AthenaConceptRow, None, None]: for concept in concepts_: if concept.vocabulary_id in vocabulary_ids: yield concept def filter_code( concepts_: Iterable[AthenaConceptRow], ) -> Generator[AthenaConceptRow, None, None]: for concept in concepts_: if concept.concept_code in concept_codes: yield concept def filter_id( concepts_: Iterable[AthenaConceptRow], ) -> Generator[AthenaConceptRow, None, None]: for concept in concepts_: if concept.concept_id in concept_ids: yield concept def filter_not_vocab( concepts_: Iterable[AthenaConceptRow], ) -> Generator[AthenaConceptRow, None, None]: for concept in concepts_: if concept.vocabulary_id not in not_vocabulary_ids: yield concept def filter_not_code( concepts_: Iterable[AthenaConceptRow], ) -> Generator[AthenaConceptRow, None, None]: for concept in concepts_: if concept.concept_code not in not_concept_codes: yield concept def filter_not_id( concepts_: Iterable[AthenaConceptRow], ) -> Generator[AthenaConceptRow, None, None]: for concept in concepts_: if concept.concept_id not in not_concept_ids: yield concept # Build up the fastest pipeline we can. if tsv_filename: log.info(f"Loading Athena concepts from file: {tsv_filename}") gen = gen_rows() else: log.info("Using cached Athena concepts") gen = cached_concepts # Positive checks if vocabulary_ids: gen = filter_vocab(gen) if concept_codes: gen = filter_code(gen) if concept_ids: gen = filter_id(gen) # Negative checks if not_vocabulary_ids: gen = filter_not_vocab(gen) if not_concept_codes: gen = filter_not_code(gen) if not_concept_ids: gen = filter_not_id(gen) concepts = list(concept for concept in gen) log.debug(f"Retrieved {len(concepts)} concepts from {n_rows_read} rows") return concepts
# noinspection DuplicatedCode
[docs]def get_athena_concept_relationships( tsv_filename: str = "", cached_concept_relationships: Iterable[ AthenaConceptRelationshipRow ] = None, # noqa concept_id_1_values: Collection[int] = None, concept_id_2_values: Collection[int] = None, relationship_id_values: Collection[str] = None, not_concept_id_1_values: Collection[int] = None, not_concept_id_2_values: Collection[int] = None, not_relationship_id_values: Collection[str] = None, encoding: str = "utf-8", ) -> List[AthenaConceptRelationshipRow]: """ From the Athena ``CONCEPT_RELATIONSHIP.csv`` tab-separated value file, return a list of relationships matching the restriction criteria. Args: tsv_filename: filename cached_concept_relationships: alternative to tsv_filename concept_id_1_values: permissible ``concept_id_1`` values, or None or an empty list for all concept_id_2_values: permissible ``concept_id_2`` values, or None or an empty list for all relationship_id_values: permissible ``relationship_id`` values, or None or an empty list for all not_concept_id_1_values: impermissible ``concept_id_1`` values, or None or an empty list for none not_concept_id_2_values: impermissible ``concept_id_2`` values, or None or an empty list for none not_relationship_id_values: impermissible ``relationship_id`` values, or None or an empty list for none encoding: encoding for input files Returns: list: of :class:`AthenaConceptRelationshipRow` objects """ assert bool(tsv_filename) != bool( cached_concept_relationships ), "Specify either tsv_filename or cached_concept_relationships" n_rows_read = 0 def gen_rows() -> Generator[AthenaConceptRelationshipRow, None, None]: nonlocal n_rows_read with open(tsv_filename, "r", encoding=encoding) as tsvin: reader = csv.reader(tsvin, delimiter="\t") header = next(reader, None) if header != AthenaConceptRelationshipRow.HEADER: raise ValueError( f"Athena concept relationship file has unexpected header: " f"{header!r}; expected " f"{AthenaConceptRelationshipRow.HEADER!r}" ) for row in reader: n_rows_read += 1 rel = AthenaConceptRelationshipRow(*row) yield rel def filter_rel( rels: Iterable[AthenaConceptRelationshipRow], ) -> Generator[AthenaConceptRelationshipRow, None, None]: for rel in rels: if rel.relationship_id in relationship_id_values: yield rel def filter_c1( rels: Iterable[AthenaConceptRelationshipRow], ) -> Generator[AthenaConceptRelationshipRow, None, None]: for rel in rels: if rel.concept_id_1 in concept_id_1_values: yield rel def filter_c2( rels: Iterable[AthenaConceptRelationshipRow], ) -> Generator[AthenaConceptRelationshipRow, None, None]: for rel in rels: if rel.concept_id_2 in concept_id_2_values: yield rel def filter_not_rel( rels: Iterable[AthenaConceptRelationshipRow], ) -> Generator[AthenaConceptRelationshipRow, None, None]: for rel in rels: if rel.relationship_id not in not_relationship_id_values: yield rel def filter_not_c1( rels: Iterable[AthenaConceptRelationshipRow], ) -> Generator[AthenaConceptRelationshipRow, None, None]: for rel in rels: if rel.concept_id_1 not in not_concept_id_1_values: yield rel def filter_not_c2( rels: Iterable[AthenaConceptRelationshipRow], ) -> Generator[AthenaConceptRelationshipRow, None, None]: for rel in rels: if rel.concept_id_2 not in not_concept_id_2_values: yield rel # Build up the fastest pipeline we can. if tsv_filename: log.info( f"Loading Athena concept relationships from file: " f"{tsv_filename}" ) gen = gen_rows() else: log.info("Using cached Athena concept relationships") gen = cached_concept_relationships # Positive checks if relationship_id_values: gen = filter_rel(gen) if concept_id_1_values: gen = filter_c1(gen) if concept_id_2_values: gen = filter_c2(gen) # Negative checks if not_relationship_id_values: gen = filter_not_rel(gen) if not_concept_id_1_values: gen = filter_not_c1(gen) if not_concept_id_2_values: gen = filter_not_c2(gen) relationships = list(rel for rel in gen) log.debug( f"Retrieved {len(relationships)} relationships from " f"{n_rows_read} rows" ) return relationships