Source code for yatel.qbj.functions

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#===============================================================================
# DOCS
#===============================================================================

"""QBJ functions domain.

"""

#===============================================================================
# IMPORTS
#===============================================================================

import inspect, collections
import datetime as dt

from yatel import stats
from yatel import db
from yatel.cluster import kmeans as _kmeans


#===============================================================================
# MAP
#===============================================================================

FUNCTIONS = collections.OrderedDict()

PRIVATE_FUNC_DATA = ["func"]


#==============================================================================
# CLASS
#==============================================================================

#: doc
QBJFunction = collections.namedtuple(
    "QBJFunction", ["name", "doc", "func"]
)


#===============================================================================
# REGISTER FUNCTION
#===============================================================================

def qbjfunction(name=None, doc=None):

    def _dec(func):
        qbjfunc = QBJFunction(
            name=name or func.__name__,
            doc=doc or func.__doc__,
            func=func
        )
        FUNCTIONS[qbjfunc.name] = qbjfunc
        if doc is not None:
            func.__doc__ = doc
        return func

    return _dec


def pformat_data(fname):
    fdata = FUNCTIONS[fname]
    data = {}
    for k, v in fdata._asdict().items():
        if k not in PRIVATE_FUNC_DATA:
            data[k] = v
    return data


def execute(name, nw, *args, **kwargs):
    return FUNCTIONS[name].func(nw, *args, **kwargs)


#==============================================================================
# HELP FUNCTION
#==============================================================================

@qbjfunction()
[docs]def ping(nw): """Always return True """ return True
@qbjfunction()
[docs]def help(nw, fname=None): """Returns a list of all functions if ``fname`` is not specified or ``None``, otherwise documentation for ``fname``. """ if fname is None: return list(FUNCTIONS.keys()) return pformat_data(fname) #============================================================================== # YATEL NETWORK #==============================================================================
@qbjfunction(doc=db.YatelNetwork.haplotypes.__doc__)
[docs]def haplotypes(nw): return nw.haplotypes()
@qbjfunction(doc=db.YatelNetwork.haplotype_by_id.__doc__)
[docs]def haplotype_by_id(nw, hap_id): return nw.haplotype_by_id(hap_id)
@qbjfunction(doc=db.YatelNetwork.haplotypes_by_environment.__doc__)
[docs]def haplotypes_by_environment(nw, env=None, **kwargs): return nw.haplotypes_by_environment(env=env, **kwargs)
@qbjfunction(doc=db.YatelNetwork.edges.__doc__)
[docs]def edges(nw): return nw.edges()
@qbjfunction(doc=db.YatelNetwork.edges_by_haplotype.__doc__)
[docs]def edges_by_haplotype(nw, hap): return nw.edges_by_haplotype(hap)
@qbjfunction(doc=db.YatelNetwork.edges_by_environment.__doc__)
[docs]def edges_by_environment(nw, env=None, **kwargs): return nw.edges_by_environment(env=env, **kwargs)
@qbjfunction(doc=db.YatelNetwork.facts.__doc__)
[docs]def facts(nw): return nw.facts()
@qbjfunction(doc=db.YatelNetwork.facts_by_haplotype.__doc__)
[docs]def facts_by_haplotype(nw, hap): return nw.facts_by_haplotype(hap)
@qbjfunction(doc=db.YatelNetwork.facts_by_environment.__doc__)
[docs]def facts_by_environment(nw, env=None, **kwargs): return nw.facts_by_environment(env=env, **kwargs)
@qbjfunction(doc=db.YatelNetwork.describe.__doc__)
[docs]def describe(nw): return nw.describe()
@qbjfunction(doc=db.YatelNetwork.environments.__doc__)
[docs]def environments(nw, facts_attrs=None): return nw.environments(facts_attrs=facts_attrs) #============================================================================== # STATS #==============================================================================
@qbjfunction(doc=stats.amax.__doc__)
[docs]def amax(nw, env=None, **kwargs): return stats.amax(nw, env=env, **kwargs)
@qbjfunction(doc=stats.amin.__doc__)
[docs]def amin(nw, env=None, **kwargs): return stats.amin (nw, env=env, **kwargs)
@qbjfunction(doc=stats.average.__doc__)
[docs]def average(nw, env=None, **kwargs): return stats.average(nw, env=env, **kwargs)
@qbjfunction(doc=stats.env2weightarray.__doc__)
[docs]def env2weightarray(nw, env=None, **kwargs): return stats.env2weightarray(nw, env=env, **kwargs)
@qbjfunction(doc=stats.kurtosis.__doc__)
[docs]def kurtosis(nw, env=None, **kwargs): return stats.kurtosis(nw, env=env, **kwargs)
@qbjfunction(doc=stats.max.__doc__)
[docs]def max(nw, env=None, **kwargs): return stats.max(nw, env=env, **kwargs)
@qbjfunction(doc=stats.median.__doc__)
[docs]def median(nw, env=None, **kwargs): return stats.median(nw, env=env, **kwargs)
@qbjfunction(doc=stats.min.__doc__)
[docs]def min(nw, env=None, **kwargs): return stats.min(nw, env=env, **kwargs)
@qbjfunction(doc=stats.mode.__doc__)
[docs]def mode(nw, env=None, **kwargs): return stats.mode(nw, env=env, **kwargs)
@qbjfunction(doc=stats.percentile.__doc__)
[docs]def percentile(nw, q, env=None, **kwargs): return stats.percentile(nw, q, env=env, **kwargs)
@qbjfunction(doc=stats.range.__doc__)
[docs]def range(nw, env=None, **kwargs): return stats.range(nw, env=env, **kwargs)
@qbjfunction(doc=stats.env2weightarray.__doc__)
[docs]def std(nw, env=None, **kwargs): return stats.std(nw, env=env, **kwargs)
@qbjfunction(doc=stats.sum.__doc__)
[docs]def sum(nw, env=None, **kwargs): return stats.sum(nw, env=env, **kwargs)
@qbjfunction(doc=stats.var.__doc__)
[docs]def var(nw, env=None, **kwargs): return stats.var(nw, env=env, **kwargs)
@qbjfunction(doc=stats.variation.__doc__)
[docs]def variation(nw, env=None, **kwargs): return stats.variation(nw, env=env, **kwargs) #============================================================================== # DM #==============================================================================
@qbjfunction(doc=_kmeans.kmeans)
[docs]def kmeans(nw, envs, k_or_guess, whiten=False, coords=None, *args, **kwargs): """Performs k-means on a set of all environments defined by ``fact_attrs`` of a network. Parameters ---------- nw : :py:class:`yatel.db.YatelNetwork` Network source of environments to classify. envs : iterable of :py:class:`yatel.dom.Environments` or dicts Represents all the environments to be clustered. k_or_guess : int or ndarray The number of centroids to generate. A code is assigned to each centroid, which is also the row index of the centroid in the code_book matrix generated. The initial k centroids are chosen by randomly selecting observations from the observation matrix. Alternatively, passing a k by N array specifies the initial k centroids. whiten : bool execute ``scipy.cluster.vq.whiten`` function over the observation array before executing subjacent *scipy kmeans*. coordc : None or callable If ``coordc`` is ``None`` generates use `hap_in_env_coords` function. Otherwise `coordc` must be a callable with 2 arguments: - ``nw`` network source of environments to classify. - ``env`` the environment to calculate the coordinates and must return an array of coordinates for the given network environment. args : arguments for scipy kmeans kwargs : keywords arguments for scipy kmeans Returns ------- coodebook : an array kxn of k centroids A k by N array of k centroids. The i’th centroid codebook[i] is represented with the code i. The centroids and codes generated represent the lowest distortion seen, not necessarily the globally minimal distortion. distortion : the value of the distortion The distortion between the observations passed and the centroids generated. """ def dcoords(nw, env): return coords[env] coordc = dcoords if coords else None return _kmeans.kmeans( nw=nw, envs=envs, k_or_guess=k_or_guess, whiten=whiten, coordc=coordc, *args, **kwargs ) #=============================================================================== # GENERIC ITERATION #===============================================================================
@qbjfunction()
[docs]def slice(nw, iterable, f, t=None): """Returns ``iterable`` from F-th element to T-th element. Parameters ---------- nw : :py:class:`yatel.db.YatelNetwork` network source of data. iterable : iterator iterable object. f : int Starting point. t : int or None Finishing point. """ if t is None: return iterable[f:] return iterable[f:t]
@qbjfunction()
[docs]def size(nw, iterable): """Returns size of ``iterable``. """ return len(iterable)
@qbjfunction()
[docs]def sort(nw, iterable, key=None, dkey=None, reverse=False): """Sorts ``iterable`` using one of it's keys as reference. Parameters ---------- nw : :py:class:`yatel.db.YatelNetwork` network source of data. iterable : iterator iterable object. key : str or None A key of objects in ``iterable`` to sort. dkey : str or None Defalut key to sort. """ def keyextractor(elem): if isinstance(elem, collections.Mapping): return elem.get(key, dkey) return getattr(elem, key, dkey) if key is None: return sorted(iterable, reverse=reverse) return sorted(iterable, key=keyextractor, reverse=reverse)
@qbjfunction()
[docs]def index(nw, iterable, value, start=None, end=None): """Return the lowest index in ``iterable`` where ``value`` is found. Returns ``-1`` if not found. Parameters ---------- nw : :py:class:`yatel.db.YatelNetwork` network source of data. iterable : iterator iterable object. value : Value to look for. start : int or None Starting point. end : int or None Finishing point. """ try: if start is None and end is None: return iterable.index(value) if end is None: return iterable.index(value, start) return iterable.index(value, start, end) except: return -1 #============================================================================== # DATE AND TIME #==============================================================================
@qbjfunction()
[docs]def now(nw, *args, **kwargs): """Return the current local date and time.""" return dt.datetime.now()
@qbjfunction()
[docs]def utcnow(nw, *args, **kwargs): """Return the current UTC date and time.""" return dt.datetime.utcnow()
@qbjfunction()
[docs]def today(nw, *args, **kwargs): """Return the current local date.""" return dt.date.today()
@qbjfunction()
[docs]def utctoday(nw, *args, **kwargs): """Return ``date`` object of current UTC date and time.""" return dt.datetime.utcnow().date()
@qbjfunction()
[docs]def time(nw, *args, **kwargs): """Return ``time`` object of current local date and time..""" return dt.datetime.now().time()
@qbjfunction()
[docs]def utctime(nw, *args, **kwargs): """Return ``time`` object of current UTC date and time.""" return dt.datetime.utcnow().time()
@qbjfunction def get_from_time(nw, datetime_instance, dtformat, *args, **kwargs): """Return a str from a `datetime_instance` that is a ``datetime`` object, according to specified format in ``dtformat``. About `format https://docs.python.org/2/library/datetime.html#strftime-strptime-behavior`_ """ return datetime_instance.strftime(dtformat) #============================================================================== # ARITMETICS #============================================================================== @qbjfunction()
[docs]def minus(nw, minuend, sust): """Return subtraction of ``sust`` from ``minuend``.""" return minuend - sust
@qbjfunction()
[docs]def times(nw, t0, t1): """Return multiplication of ``t0`` by ``t1`` """ return t0 * t1
@qbjfunction()
[docs]def div(nw, dividend, divider): """Return division of ``dividend`` by ``divider``.""" return dividend / float(divider)
@qbjfunction()
[docs]def floor(nw, dividend, divider): """Return mod from division operation between ``dividend`` and ``divider``.""" return dividend % float(divider)
@qbjfunction()
[docs]def pow(nw, radix, exp): """Return exponentiation of ``radix`` to ``exp``. """ return radix ** exp
@qbjfunction()
[docs]def xroot(nw, radix, root): """Computes nth root with given ``radix`` and ``root``.""" return radix ** (1/float(root))
@qbjfunction()
[docs]def count(nw, iterable, to_count): """Returns the number of occurrences of ``to_count`` in ``iterable``.""" return collections.Counter(iterable)[to_count] #============================================================================== # STRING #==============================================================================
@qbjfunction()
[docs]def split(nw, string, s=None, maxsplit=None): """Return a list of the words of ``string``. With ``s`` as separator and maximum number of split by ``maxsplit``. Parameters ---------- string : str String to split. s : str ot None Used as separator if given, if ``None`` uses whitespace characters as separators. maxsplit : int or None Maximum number of split on `string` and the remainder of the string is returned as the final element of the list, if ``None`` no limit. """ if s is None and maxsplit is None: return string.split() elif maxsplit is None: return string.split(s) return string.split(s, maxsplit)
@qbjfunction()
[docs]def rsplit(nw, string, s=None, maxsplit=None): """Return a list of the words of ``string``, scanning ``s`` from the end. With ``s`` as separator and maximum number of split by ``maxsplit``. Parameters ---------- string : str String to split. s : str ot None Used as separator if given, if ``None`` uses whitespace characters as separators. maxsplit : int or None Maximum number of split on ``string`` and the remainder of the string is returned as the first element of the list, if ``None`` no limit. """ if s is None and maxsplit is None: return string.rsplit() elif maxsplit is None: return string.rsplit(s) return string.rsplit(s, maxsplit)
@qbjfunction()
[docs]def strip(nw, string, chars=None): """Return a copy of ``string`` with leading and trailing characters removed. if ``chars`` is ``None`` whitespaces are removed otherwise the characters in the string will be stripped from the both ends. """ if chars is None: return string.strip() return string.strip(chars)
@qbjfunction()
[docs]def lstrip(nw, string, chars=None): """Return a copy of ``string`` with leading characters removed. If ``chars`` is omitted or ``None``, whitespace characters are removed. If given and not ``None``, ``chars`` must be a string; the characters in the string will be stripped from the beginning of the string. """ if chars is None: return string.lstrip() return string.lstrip(chars)
@qbjfunction()
[docs]def rstrip(nw, string, chars=None): """Return a copy of `string` with trailing characters removed. If ``chars`` is omitted or ``None``, whitespace characters are removed. If given and not ``None``, ``chars`` must be a string; the characters in the string will be stripped from the end of the string. """ if chars is None: return string.rstrip() return string.rstrip(chars)
@qbjfunction()
[docs]def join(nw, joiner, to_join): """Concatenate a list or tuple of words with intervening occurrences of ``joiner``. """ return joiner.join(to_join)
@qbjfunction()
[docs]def upper(nw, string): """Return a copy of ``string``, with lower case letters converted to upper case. """ return string.upper()
@qbjfunction()
[docs]def lower(nw, string): """Return a copy of ``string``, with upper case letters converted to lower case. """ return string.lower()
@qbjfunction()
[docs]def title(nw, string): """Returns a copy of ``string`` in which first characters of all the words are capitalized. """ return string.title()
@qbjfunction()
[docs]def capitalize(nw, string): """Return a copy of ``string`` with its first character capitalized and the rest lowercased. """ return string.capitalize()
@qbjfunction()
[docs]def isalnum(nw, string): """Return true if all characters in ``string`` are alphanumeric and there is at least one character, false otherwise. """ return string.isalnum()
@qbjfunction()
[docs]def isalpha(nw, string): """Return true if all characters in ``string`` are alphabetic and there is at least one character, false otherwise. """ return string.isalpha()
@qbjfunction()
[docs]def isdigit(nw, string): """Return true if all characters in ``string`` are digits and there is at least one character, false otherwise. """ return string.isdigit()
@qbjfunction()
[docs]def startswith(nw, string, prefix, start=None, end=None): """Return True if ``string`` starts with the ``prefix``, otherwise return False. ``prefix`` can also be a tuple of prefixes to look for. With optional ``start``, test ``string`` beginning at that position. With optional ``end``, stop comparing string at that position. """ if start is None and end is None: return string.startswith(prefix) if end is None: return string.startswith(prefix, start) return string.startswith(prefix, start, end)
@qbjfunction()
[docs]def endswith(nw, string, suffix, start=None, end=None): """Return True if ``string`` ends with the specified ``suffix``, otherwise return False. ``suffix`` can also be a tuple of suffixes to look for. With optional ``start``, test beginning at that position. With optional ``end``, stop comparing at that position. """ if start is None and end is None: return string.endswith(suffix) if end is None: return string.endswith(suffix, start) return string.endswith(suffix, start, end)
@qbjfunction()
[docs]def istitle(nw, string): """Return true if ``string`` is a titlecased string and there is at least one character.""" return string.istitle()
@qbjfunction()
[docs]def isupper(nw, string): """Return ``true`` if all cased characters in ``string`` are uppercase and there is at least one cased character, ``false`` otherwise. """ return string.isupper()
@qbjfunction()
[docs]def isspace(nw, string): """Return ``true`` if there are only whitespace characters in ``string`` and there is at least one character, ``false`` otherwise. """ return string.isspace()
@qbjfunction()
[docs]def islower(nw, string): """Return ``true`` if all cased characters in ``string`` are lowercase and there is at least one cased character, ``false`` otherwise. """ return string.islower()
@qbjfunction()
[docs]def swapcase(nw, string): """Return a copy of ``string``, with lower case letters converted to upper case and vice versa. """ return string.swapcase()
@qbjfunction()
[docs]def replace(nw, string, old, new, count=None): """Return a copy of ``string`` with all occurrences of ``old`` replaced by ``new``. If ``count`` is given, the first ``count`` occurrences are replaced. """ if count is None: return string.replace(old, new) return string.replace(old, new, count)
@qbjfunction()
[docs]def find(nw, string, subs, start=None, end=None): """Return the lowest index in ``string`` where the substring ``sub`` is found such that ``sub`` is wholly contained in ``string[start:end]``. Return -1 on failure. Defaults for ``start`` and ``end`` and interpretation of negative values is the same as for slices. """ try: if start is None and end is None: return string.find(subs) if end is None: return string.find(subs, start) return string.find(subs, start, end) except: return -1
@qbjfunction()
[docs]def rfind(nw, string, subs, start=None, end=None): """Return the highest index in ``string`` where the substring ``sub`` is found. """ try: if start is None and end is None: return string.rfind(subs) if end is None: return string.rfind(subs, start) return string.rfind(subs, start, end) except: return -1 # TODO: Regex, trigonometric, and math contants SUPPORT #=============================================================================== # MAIN #===============================================================================
if __name__ == "__main__": print(__doc__)