Source code for vaex.agg

import os
import numpy as np

from .column import str_type
from .stat import _Statistic
from vaex import encoding


on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if not on_rtd:
    import vaex.superagg


aggregates = {}


def register(f, name=None):
    name = name or f.__name__
    aggregates[name] = f
    return f


@encoding.register('aggregation')
class aggregation_encoding:
    @staticmethod
    def encode(encoding, agg):
        return agg.encode(encoding)

    @staticmethod
    def decode(encoding, agg_spec):
        agg_spec = agg_spec.copy()
        type = agg_spec.pop('aggregation')
        f = aggregates[type]
        args = []
        if type == '_sum_moment':
            if 'parameters' in agg_spec:  # renameing between spec and implementation
                agg_spec['moment'] = agg_spec.pop('parameters')[0]
        if type == 'first':
            args = agg_spec.pop('expression')
        return f(*args, **agg_spec)


class AggregatorDescriptor(object):
    def __repr__(self):
        return 'vaex.agg.{}({!r})'.format(self.short_name, str(self.expression))


    def finish(self, value):
        return value

class AggregatorDescriptorBasic(AggregatorDescriptor):
    def __init__(self, name, expression, short_name, multi_args=False, agg_args=[], selection=None, edges=False):
        self.name = name
        self.short_name = short_name
        self.expression = str(expression)
        self.agg_args = agg_args
        self.edges = edges
        self.selection = selection
        if not multi_args:
            if self.expression == '*':
                self.expressions = []
            else:
                self.expressions = [self.expression]
        else:
            self.expressions = expression

    def encode(self, encoding):
        spec = {'aggregation': self.short_name}
        if len(self.expressions) == 0:
            pass
        elif len(self.expressions) == 1:
            spec['expression'] = self.expression
        else:
            spec['expression'] = [str(k) for k in self.expressions]
        if self.selection:
            spec['selection'] = self.selection
        if self.edges:
            spec['edges'] = True
        if self.agg_args:
            spec['parameters'] = self.agg_args
        return spec

    def pretty_name(self, id=None):
        id = id or "_".join(map(str, self.expressions))
        return '{0}_{1}'.format(id, self.short_name)

    def _prepare_types(self, df):
        if self.expression == '*':
            self.dtype_in = np.dtype('int64')
            self.dtype_out = np.dtype('int64')
        else:
            self.dtype_in = df[str(self.expressions[0])].dtype
            self.dtype_out = self.dtype_in
            if self.short_name == "count":
                self.dtype_out = np.dtype('int64')
            if self.short_name in ['sum', 'summoment']:
                self.dtype_out = vaex.utils.upcast(self.dtype_in)

    def add_operations(self, agg_task, **kwargs):
        df = agg_task.df
        self._prepare_types(df)
        value = agg_task.add_aggregation_operation(self, **kwargs)
        @vaex.delayed
        def finish(value):
            return self.finish(value)
        return finish(value)

    def _create_operation(self, df, grid):
        agg_op_type = vaex.utils.find_type_from_dtype(vaex.superagg, self.name + "_", self.dtype_in)
        agg_op = agg_op_type(grid, *self.agg_args)
        return agg_op

    def get_result(self, agg_operation):
        grid = np.asarray(agg_operation)
        if not self.edges:
            grid = vaex.utils.extract_central_part(grid)
        return grid


class AggregatorDescriptorNUnique(AggregatorDescriptorBasic):
    def __init__(self, name, expression, short_name, dropmissing, dropnan, selection=None, edges=False):
        super(AggregatorDescriptorNUnique, self).__init__(name, expression, short_name, selection=selection, edges=edges)
        self.dropmissing = dropmissing
        self.dropnan = dropnan

    def encode(self, encoding):
        spec = super().encode(encoding)
        if self.dropmissing:
            spec['dropmissing'] = self.dropmissing
        if self.dropnan:
            spec['dropnan'] = self.dropnan
        return spec

    def _create_operation(self, df, grid):
        self.dtype_in = df[str(self.expressions[0])].dtype
        self.dtype_out = np.dtype('int64')
        agg_op_type = vaex.utils.find_type_from_dtype(vaex.superagg, self.name + "_", self.dtype_in)
        agg_op = agg_op_type(grid, self.dropmissing, self.dropnan)
        return agg_op


[docs]class AggregatorDescriptorMulti(AggregatorDescriptor): """Uses multiple operations/aggregation to calculate the final aggretation""" def __init__(self, name, expression, short_name, selection=None, edges=False): self.name = name self.short_name = short_name self.expression = str(expression) self.expressions = [self.expression] self.selection = selection self.edges = edges def pretty_name(self, id=None): id = id or "_".join(map(str, self.expressions)) return '{0}_{1}'.format(id, self.short_name)
[docs]class AggregatorDescriptorMean(AggregatorDescriptorMulti): def __init__(self, name, expression, short_name="mean", selection=None, edges=False): super(AggregatorDescriptorMean, self).__init__(name, expression, short_name, selection=selection, edges=edges) def add_operations(self, agg_task, **kwargs): expression = expression_sum = expression = agg_task.df[str(self.expression)] # ints, floats and bools are upcasted if expression_sum.dtype.kind in "buif": expression = expression_sum = expression_sum.astype('float64') sum_agg = sum(expression_sum, selection=self.selection, edges=self.edges) count_agg = count(expression, selection=self.selection, edges=self.edges) task_sum = sum_agg.add_operations(agg_task, **kwargs) task_count = count_agg.add_operations(agg_task, **kwargs) self.dtype_in = sum_agg.dtype_in self.dtype_out = sum_agg.dtype_out @vaex.delayed def finish(sum, count): sum = np.array(sum) dtype = sum.dtype if sum.dtype.kind == 'M': sum = sum.view('uint64') count = count.view('uint64') with np.errstate(divide='ignore', invalid='ignore'): mean = sum / count if dtype.kind != mean.dtype.kind: # TODO: not sure why view does not work mean = mean.astype(dtype) return mean return finish(task_sum, task_count)
[docs]class AggregatorDescriptorVar(AggregatorDescriptorMulti): def __init__(self, name, expression, short_name="var", ddof=0, selection=None, edges=False): super(AggregatorDescriptorVar, self).__init__(name, expression, short_name, selection=selection, edges=edges) self.ddof = ddof def add_operations(self, agg_task, **kwargs): expression_sum = expression = agg_task.df[str(self.expression)] expression = expression_sum = expression.astype('float64') sum_moment = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) sum_ = sum(str(expression_sum), selection=self.selection, edges=self.edges) count_ = count(str(expression), selection=self.selection, edges=self.edges) task_sum_moment = sum_moment.add_operations(agg_task, **kwargs) task_sum = sum_.add_operations(agg_task, **kwargs) task_count = count_.add_operations(agg_task, **kwargs) self.dtype_in = sum_.dtype_in self.dtype_out = sum_.dtype_out @vaex.delayed def finish(sum_moment, sum, count): sum = np.array(sum) dtype = sum.dtype if sum.dtype.kind == 'M': sum = sum.view('uint64') sum_moment = sum_moment.view('uint64') count = count.view('uint64') with np.errstate(divide='ignore', invalid='ignore'): mean = sum / count raw_moments2 = sum_moment/count variance = (raw_moments2 - mean**2) #* count/(count-self.ddof) if dtype.kind != mean.dtype.kind: # TODO: not sure why view does not work variance = variance.astype(dtype) return self.finish(variance) return finish(task_sum_moment, task_sum, task_count)
[docs]class AggregatorDescriptorStd(AggregatorDescriptorVar): def finish(self, value): return value**0.5
[docs]@register def count(expression='*', selection=None, edges=False): '''Creates a count aggregation''' return AggregatorDescriptorBasic('AggCount', expression, 'count', selection=selection, edges=edges)
[docs]@register def sum(expression, selection=None, edges=False): '''Creates a sum aggregation''' return AggregatorDescriptorBasic('AggSum', expression, 'sum', selection=selection, edges=edges)
[docs]@register def mean(expression, selection=None, edges=False): '''Creates a mean aggregation''' return AggregatorDescriptorMean('mean', expression, 'mean', selection=selection, edges=edges)
[docs]@register def min(expression, selection=None, edges=False): '''Creates a min aggregation''' return AggregatorDescriptorBasic('AggMin', expression, 'min', selection=selection, edges=edges)
@register def _sum_moment(expression, moment, selection=None, edges=False): '''Creates a sum of moment aggregator''' return AggregatorDescriptorBasic('AggSumMoment', expression, '_sum_moment', agg_args=[moment], selection=selection, edges=edges)
[docs]@register def max(expression, selection=None, edges=False): '''Creates a max aggregation''' return AggregatorDescriptorBasic('AggMax', expression, 'max', selection=selection, edges=edges)
[docs]@register def first(expression, order_expression, selection=None, edges=False): '''Creates a max aggregation''' return AggregatorDescriptorBasic('AggFirst', [expression, order_expression], 'first', multi_args=True, selection=selection, edges=edges)
[docs]@register def std(expression, ddof=0, selection=None, edges=False): '''Creates a standard deviation aggregation''' return AggregatorDescriptorStd('std', expression, 'std', ddof=ddof, selection=selection, edges=edges)
[docs]@register def var(expression, ddof=0, selection=None, edges=False): '''Creates a variance aggregation''' return AggregatorDescriptorVar('var', expression, 'var', ddof=ddof, selection=selection, edges=edges)
[docs]@register def nunique(expression, dropna=False, dropnan=False, dropmissing=False, selection=None, edges=False): """Aggregator that calculates the number of unique items per bin. :param expression: Expression for which to calculate the unique items :param dropmissing: do not count missing values :param dropnan: do not count nan values :param dropna: short for any of the above, (see :func:`Expression.isna`) """ if dropna: dropnan = True dropmissing = True return AggregatorDescriptorNUnique('AggNUnique', expression, 'nunique', dropmissing, dropnan, selection=selection, edges=edges)
# @register # def covar(x, y): # '''Creates a standard deviation aggregation''' # return _Statistic('covar', x, y) # @register # def correlation(x, y): # '''Creates a standard deviation aggregation''' # return _Statistic('correlation', x, y)