Source code for memote.suite.reporting.bag

# -*- coding: utf-8 -*-

# Copyright 2017 Novo Nordisk Foundation Center for Biosustainability,
# Technical University of Denmark.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities that handle a `dask.bag`."""

from __future__ import absolute_import

from builtins import dict, zip

import io
import logging
from operator import itemgetter
from os.path import exists

try:
    import simplejson as json
except ImportError:
    import json

import pandas as pd
import dask.bag as db
from colorama import Fore

LOGGER = logging.getLogger(__name__)


[docs]class ResultBagWrapper(object): """Report-specific wrapper around a `dask.bag`.""" def __init__(self, files, **kwargs): """ Load (JSON) documents into memory managed by a `dask.bag`. The order of the `files` argument determines the order of rows in data frames returned by other methods. Parameters ---------- files : iterable A list of filenames that should contain valid JSON. """ super(ResultBagWrapper, self).__init__(**kwargs) # load all into memory and avoid strange dask JSON object expectations objects = list() for filename in files: if not exists(filename): LOGGER.warning( Fore.YELLOW + "Expected file %s is missing." + Fore.RESET, filename) # noqa: W503 continue with io.open(filename) as file_h: objects.append(json.load(file_h)) if len(objects) == 0: raise RuntimeError("None of the expected JSON files were found!") self._bag = db.from_sequence(objects, npartitions=1) self._index = None
[docs] def build_index(self): """Build a data index either from timestamps and commit hashes.""" LOGGER.debug("Building index...") expected = pd.DataFrame({ "timestamp": pd.Series(dtype="datetime64[ns]"), "commit_hash": pd.Series(dtype="str") }) df = self._bag.pluck("meta", dict()).to_dataframe(expected).compute() df.set_index( "commit_hash", drop=True, inplace=True, verify_integrity=True) trunc = 5 res = df.index.str[:trunc] while len(res.unique()) < len(df): trunc += 1 res = df.index.str[:trunc] df["commit_hash"] = res.copy() df.sort_values("timestamp", inplace=True, kind="mergesort") self._index = df LOGGER.debug("%s", str(df))
def _assert_index_presence(self): """Ensure that the index was built.""" if self._index is None: raise ValueError( "No index present. Please call method `build_index` first.")
[docs] def get_model_ids(self): """Get unique model IDs. Should typically be of length one.""" return self._bag.pluck("report").pluck("test_basic").\ pluck("model_id").distinct().compute()
[docs] def get_basic_dataframe(self): """Create basic information data frame.""" LOGGER.debug("Collecting basic information from bag.") self._assert_index_presence() columns = ("commit", "num_genes", "num_reactions", "num_metabolites", "num_metabolites_no_formula", "metabolites_no_charge", "reactions_no_GPR", # "metabolic_coverage", # noqa "ngam_reaction") data = pd.DataFrame(list(self._bag.map(_get_basics)), columns=columns) data.set_index("commit", inplace=True) return self._index.join(data)
[docs] def get_consistency_dataframe(self): """Create consistency information data frame.""" LOGGER.debug("Collecting consistency information from bag.") self._assert_index_presence() columns = ("commit", "is_consistent", "unconserved_metabolites", "magic_atp_production", "imbalanced_reactions", "blocked_reactions", "looped_reactions") data = pd.DataFrame(list(self._bag.map(_get_consistency)), columns=columns) data.set_index("commit", inplace=True) return self._index.join(data)
[docs] def get_syntax_dataframe(self): """Create syntax information data frame.""" LOGGER.debug("Collecting syntax information from bag.") self._assert_index_presence() columns = ("commit", # "reaction_compartment_suffix", # noqa "reaction_metabolite_compartment", # "untagged_normal_transport", # noqa # "untagged_abc_transport", # noqa # "uppercase_metabolites", # noqa "untagged_demand", "false_demand", "untagged_sink", "false_sink", "untagged_exchange", "false_exchange") data = pd.DataFrame(list(self._bag.map(_get_syntax)), columns=columns) data.set_index("commit", inplace=True) return self._index.join(data)
[docs] def get_biomass_dataframe(self): """Create biomass information data frame.""" LOGGER.debug("Collecting biomass information from bag.") self._assert_index_presence() columns = ("commit", "biomass_ids", "biomass_sum", "biomass_default_flux", "num_default_blocked_precursors", "num_open_blocked_precursors", "gam_in_biomass") data = pd.DataFrame(self._bag.map(_get_biomass).fold( list.__iadd__, initial=list()).compute(), columns=columns) data.set_index("commit", inplace=True) return self._index.join(data)
def _get_basics(elem): """Collect results from `test_basic`.""" tmp = elem["report"]["test_basic"] return (elem["meta"]["commit_hash"], tmp["num_genes"], tmp["num_reactions"], tmp["num_metabolites"], len(tmp["metabolites_no_formula"]), len(tmp["metabolites_no_charge"]), len(tmp["reactions_no_GPR"]), # tmp["metabolic_coverage"], # noqa tmp["ngam_reaction"]) def _get_consistency(elem): """Collect results from `test_basic`.""" tmp = elem["report"]["test_consistency"] looped = tmp["looped_reactions"] if "looped_reactions" in tmp else \ float("nan") return (elem["meta"]["commit_hash"], tmp["is_consistent"], len(tmp["unconserved_metabolites"]), tmp["magic_atp_production"], len(tmp["imbalanced_reactions"]), len(tmp["blocked_reactions"]), looped) def _get_syntax(elem): """Collect results from `test_basic`.""" tmp = elem["report"]["test_consistency"] return (elem["meta"]["commit_hash"], # len(tmp["reaction_compartment_suffix"]), # noqa len(tmp["reaction_metabolite_compartment"]), # len(tmp["untagged_normal_transport"]), # noqa # len(tmp["untagged_abc_transport"]), # noqa # len(tmp["uppercase_metabolites"]), # noqa len(tmp["untagged_demand"]), len(tmp["false_demand"]), len(tmp["untagged_sink"]), len(tmp["false_sink"]), len(tmp["untagged_exchange"]), len(tmp["false_exchange"])) def _get_biomass(elem): """Collect results from `test_biomass`.""" tmp = elem["report"]["test_biomass"] commit = elem["meta"]["commit_hash"] columns = itemgetter( "biomass_ids", "biomass_sum", "biomass_default_flux", "default_blocked_precursors", "open_blocked_precursors", "gam_in_biomass") res = [ (commit, rxn, bio_sum, def_flux, len(def_blocked), len(open_blocked), gam) for (rxn, bio_sum, def_flux, def_blocked, open_blocked, gam) in zip(*columns(tmp))] return res