"""
Vaex is a library for dealing with larger than memory DataFrames (out of core).
The most important class (datastructure) in vaex is the :class:`.DataFrame`. A DataFrame is obtained by either opening
the example dataset:
>>> import vaex
>>> df = vaex.example()
Or using :func:`open` to open a file.
>>> df1 = vaex.open("somedata.hdf5")
>>> df2 = vaex.open("somedata.fits")
>>> df2 = vaex.open("somedata.arrow")
>>> df4 = vaex.open("somedata.csv")
Or connecting to a remove server:
>>> df_remote = vaex.open("http://try.vaex.io/nyc_taxi_2015")
A few strong features of vaex are:
* Performance: works with huge tabular data, process over a billion (> 10\\ :sup:`9`\\ ) rows/second.
* Expression system / Virtual columns: compute on the fly, without wasting ram.
* Memory efficient: no memory copies when doing filtering/selections/subsets.
* Visualization: directly supported, a one-liner is often enough.
* User friendly API: you will only need to deal with a DataFrame object, and tab completion + docstring will help you out: `ds.mean<tab>`, feels very similar to Pandas.
* Very fast statistics on N dimensional grids such as histograms, running mean, heatmaps.
Follow the tutorial at https://docs.vaex.io/en/latest/tutorial.html to learn how to use vaex.
""" # -*- coding: utf-8 -*-
from __future__ import print_function
import glob
import six
import vaex.dataframe
import vaex.dataset
from vaex.functions import register_function
from . import stat
# import vaex.file
# import vaex.export
from .delayed import delayed
from .groupby import *
from . import agg
import vaex.datasets
# import vaex.plot
# from vaex.dataframe import DataFrame
# del ServerRest, DataFrame
import vaex.settings
import logging
import pkg_resources
import os
from functools import reduce
try:
from . import version
except:
import sys
print("version file not found, please run git/hooks/post-commit or git/hooks/post-checkout and/or install them as hooks (see git/README)", file=sys.stderr)
raise
__version__ = version.versionstring
# __pre_release_name__ = version.pre_release
__version_tuple__ = version.versiontuple
__program_name__ = "vaex"
# __version_name__ = version.versiontring
# __release_name_ = version.versiontring[:]
# __clean_release__ = "%d.%d.%d" % (__version_tuple__)
__full_name__ = __program_name__ + "-" + __version__
# __clean_name__ = __program_name__ + "-" + __clean_release__
__build_name__ = __full_name__ + "-" + version.osname
[docs]def app(*args, **kwargs):
"""Create a vaex app, the QApplication mainloop must be started.
In ipython notebook/jupyter do the following:
>>> import vaex.ui.main # this causes the qt api level to be set properly
>>> import vaex
Next cell:
>>> %gui qt
Next cell:
>>> app = vaex.app()
From now on, you can run the app along with jupyter
"""
import vaex.ui.main
return vaex.ui.main.VaexApp()
def _convert_name(filenames, shuffle=False):
'''Convert a filename (or list of) to a filename with .hdf5 and optionally a -shuffle suffix'''
if not isinstance(filenames, (list, tuple)):
filenames = [filenames]
base = filenames[0]
if shuffle:
base += '-shuffle'
if len(filenames) > 1:
return base + "_and_{}_more.hdf5".format(len(filenames)-1)
else:
return base + ".hdf5"
[docs]def open(path, convert=False, shuffle=False, copy_index=True, *args, **kwargs):
"""Open a DataFrame from file given by path.
Example:
>>> df = vaex.open('sometable.hdf5')
>>> df = vaex.open('somedata*.csv', convert='bigdata.hdf5')
:param str or list path: local or absolute path to file, or glob string, or list of paths
:param convert: convert files to an hdf5 file for optimization, can also be a path
:param bool shuffle: shuffle converted DataFrame or not
:param args: extra arguments for file readers that need it
:param kwargs: extra keyword arguments
:param bool copy_index: copy index when source is read via pandas
:return: return a DataFrame on succes, otherwise None
:rtype: DataFrame
S3 support:
Vaex supports streaming in hdf5 files from Amazon AWS object storage S3.
Files are by default cached in $HOME/.vaex/file-cache/s3 such that successive access
is as fast as native disk access. The following url parameters control S3 options:
* anon: Use anonymous access or not (false by default). (Allowed values are: true,True,1,false,False,0)
* use_cache: Use the disk cache or not, only set to false if the data should be accessed once. (Allowed values are: true,True,1,false,False,0)
* profile_name and other arguments are passed to :py:class:`s3fs.core.S3FileSystem`
All arguments can also be passed as kwargs, but then arguments such as `anon` can only be a boolean, not a string.
Examples:
>>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5?anon=true')
>>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5', anon=True) # Note that anon is a boolean, not the string 'true'
>>> df = vaex.open('s3://mybucket/path/to/file.hdf5?profile_name=myprofile')
"""
import vaex
try:
if path in aliases:
path = aliases[path]
if path.startswith("http://") or path.startswith("ws://"): # TODO: think about https and wss
server, name = path.rsplit("/", 1)
url = urlparse(path)
if '?' in name:
name = name[:name.index('?')]
extra_args = {key: values[0] for key, values in parse_qs(url.query).items()}
if 'token' in extra_args:
kwargs['token'] = extra_args['token']
if 'token_trusted' in extra_args:
kwargs['token_trusted'] = extra_args['token_trusted']
server = vaex.server(server, **kwargs)
dataframe_map = server.datasets(as_dict=True)
if name not in dataframe_map:
raise KeyError("no such DataFrame '%s' at server, possible names: %s" % (name, " ".join(dataframe_map.keys())))
return dataframe_map[name]
if path.startswith("cluster"):
import vaex.distributed
return vaex.distributed.open(path, *args, **kwargs)
else:
import vaex.file
import glob
if isinstance(path, six.string_types):
paths = [path]
else:
paths = path
filenames = []
for path in paths:
# TODO: can we do glob with s3?
if path.startswith('s3://'):
filenames.append(path)
else:
# sort to get predicatable behaviour (useful for testing)
filenames.extend(list(sorted(glob.glob(path))))
ds = None
if len(filenames) == 0:
raise IOError('Could not open file: {}, it does not exist'.format(path))
filename_hdf5 = _convert_name(filenames, shuffle=shuffle)
filename_hdf5_noshuffle = _convert_name(filenames, shuffle=False)
if len(filenames) == 1:
path = filenames[0]
naked_path = path
if '?' in naked_path:
naked_path = naked_path[:naked_path.index('?')]
ext = os.path.splitext(naked_path)[1]
if os.path.exists(filename_hdf5) and convert: # also check mtime?
if convert:
ds = vaex.file.open(filename_hdf5)
else:
ds = vaex.file.open(filename_hdf5, *args, **kwargs)
else:
if ext == '.csv' or naked_path.endswith(".csv.bz2"): # special support for csv.. should probably approach it a different way
ds = from_csv(path, copy_index=copy_index, **kwargs)
else:
ds = vaex.file.open(path, *args, **kwargs)
if convert and ds:
ds.export_hdf5(filename_hdf5, shuffle=shuffle)
ds = vaex.file.open(filename_hdf5) # argument were meant for pandas?
if ds is None:
if os.path.exists(path):
raise IOError('Could not open file: {}, did you install vaex-hdf5? Is the format supported?'.format(path))
if os.path.exists(path):
raise IOError('Could not open file: {}, it does not exist?'.format(path))
elif len(filenames) > 1:
if convert not in [True, False]:
filename_hdf5 = convert
else:
filename_hdf5 = _convert_name(filenames, shuffle=shuffle)
if os.path.exists(filename_hdf5) and convert: # also check mtime
ds = open(filename_hdf5)
else:
# with ProcessPoolExecutor() as executor:
# executor.submit(read_csv_and_convert, filenames, shuffle=shuffle, **kwargs)
DataFrames = []
for filename in filenames:
DataFrames.append(open(filename, convert=bool(convert), shuffle=shuffle, **kwargs))
ds = vaex.dataframe.DataFrameConcatenated(DataFrames)
if convert:
ds.export_hdf5(filename_hdf5, shuffle=shuffle)
ds = vaex.file.open(filename_hdf5, *args, **kwargs)
if ds is None:
raise IOError('Unknown error opening: {}'.format(path))
return ds
except:
logging.getLogger("vaex").error("error opening %r" % path)
raise
[docs]def open_many(filenames):
"""Open a list of filenames, and return a DataFrame with all DataFrames concatenated.
:param list[str] filenames: list of filenames/paths
:rtype: DataFrame
"""
dfs = []
for filename in filenames:
filename = filename.strip()
if filename and filename[0] != "#":
dfs.append(open(filename))
return vaex.dataframe.DataFrameConcatenated(dfs=dfs)
[docs]def from_samp(username=None, password=None):
"""Connect to a SAMP Hub and wait for a single table load event, disconnect, download the table and return the DataFrame.
Useful if you want to send a single table from say TOPCAT to vaex in a python console or notebook.
"""
print("Waiting for SAMP message...")
import vaex.samp
t = vaex.samp.single_table(username=username, password=password)
return from_astropy_table(t.to_table())
[docs]def from_astropy_table(table):
"""Create a vaex DataFrame from an Astropy Table."""
import vaex.file.other
return vaex.file.other.DatasetAstropyTable(table=table)
[docs]def from_dict(data):
"""Create an in memory dataset from a dict with column names as keys and list/numpy-arrays as values
Example
>>> data = {'A':[1,2,3],'B':['a','b','c']}
>>> vaex.from_dict(data)
# A B
0 1 'a'
1 2 'b'
2 3 'c'
:param data: A dict of {column:[value, value,...]}
:rtype: DataFrame
"""
return vaex.from_arrays(**data)
[docs]def from_items(*items):
"""Create an in memory DataFrame from numpy arrays, in contrast to from_arrays this keeps the order of columns intact (for Python < 3.6).
Example
>>> import vaex, numpy as np
>>> x = np.arange(5)
>>> y = x ** 2
>>> vaex.from_items(('x', x), ('y', y))
# x y
0 0 0
1 1 1
2 2 4
3 3 9
4 4 16
:param items: list of [(name, numpy array), ...]
:rtype: DataFrame
"""
import numpy as np
df = vaex.dataframe.DataFrameArrays("array")
for name, array in items:
df.add_column(name, np.asanyarray(array))
return df
[docs]def from_arrays(**arrays):
"""Create an in memory DataFrame from numpy arrays.
Example
>>> import vaex, numpy as np
>>> x = np.arange(5)
>>> y = x ** 2
>>> vaex.from_arrays(x=x, y=y)
# x y
0 0 0
1 1 1
2 2 4
3 3 9
4 4 16
>>> some_dict = {'x': x, 'y': y}
>>> vaex.from_arrays(**some_dict) # in case you have your columns in a dict
# x y
0 0 0
1 1 1
2 2 4
3 3 9
4 4 16
:param arrays: keyword arguments with arrays
:rtype: DataFrame
"""
import numpy as np
import six
from .column import Column
df = vaex.dataframe.DataFrameArrays("array")
for name, array in arrays.items():
if isinstance(array, Column):
df.add_column(name, array)
else:
array = np.asanyarray(array)
df.add_column(name, array)
return df
[docs]def from_arrow_table(table):
"""Creates a vaex DataFrame from an arrow Table.
:rtype: DataFrame
"""
from vaex_arrow.convert import vaex_df_from_arrow_table
return vaex_df_from_arrow_table(table=table)
def from_scalars(**kwargs):
"""Similar to from_arrays, but convenient for a DataFrame of length 1.
Example:
>>> import vaex
>>> df = vaex.from_scalars(x=1, y=2)
:rtype: DataFrame
"""
import numpy as np
return from_arrays(**{k: np.array([v]) for k, v in kwargs.items()})
[docs]def from_pandas(df, name="pandas", copy_index=True, index_name="index"):
"""Create an in memory DataFrame from a pandas DataFrame.
:param: pandas.DataFrame df: Pandas DataFrame
:param: name: unique for the DataFrame
>>> import vaex, pandas as pd
>>> df_pandas = pd.from_csv('test.csv')
>>> df = vaex.from_pandas(df_pandas)
:rtype: DataFrame
"""
import six
import pandas as pd
import numpy as np
vaex_df = vaex.dataframe.DataFrameArrays(name)
def add(name, column):
values = column.values
if isinstance(values, pd.core.arrays.integer.IntegerArray):
values = np.ma.array(values._data, mask=values._mask)
try:
vaex_df.add_column(name, values)
except Exception as e:
print("could not convert column %s, error: %r, will try to convert it to string" % (name, e))
try:
values = values.astype("S")
vaex_df.add_column(name, values)
except Exception as e:
print("Giving up column %s, error: %r" % (name, e))
for name in df.columns:
add(name, df[name])
if copy_index:
add(index_name, df.index)
return vaex_df
[docs]def from_ascii(path, seperator=None, names=True, skip_lines=0, skip_after=0, **kwargs):
"""
Create an in memory DataFrame from an ascii file (whitespace seperated by default).
>>> ds = vx.from_ascii("table.asc")
>>> ds = vx.from_ascii("table.csv", seperator=",", names=["x", "y", "z"])
:param path: file path
:param seperator: value seperator, by default whitespace, use "," for comma seperated values.
:param names: If True, the first line is used for the column names, otherwise provide a list of strings with names
:param skip_lines: skip lines at the start of the file
:param skip_after: skip lines at the end of the file
:param kwargs:
:rtype: DataFrame
"""
import vaex.ext.readcol as rc
ds = vaex.dataframe.DataFrameArrays(path)
if names not in [True, False]:
namelist = names
names = False
else:
namelist = None
data = rc.readcol(path, fsep=seperator, asdict=namelist is None, names=names, skipline=skip_lines, skipafter=skip_after, **kwargs)
if namelist:
for name, array in zip(namelist, data.T):
ds.add_column(name, array)
else:
for name, array in data.items():
ds.add_column(name, array)
return ds
def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, copy_index=True, **kwargs):
""" A method to read a JSON file using pandas, and convert to a DataFrame directly.
:param str path_or_buffer: a valid JSON string or file-like, default: None
The string could be a URL. Valid URL schemes include http, ftp, s3,
gcs, and file. For file URLs, a host is expected. For instance, a local
file could be ``file://localhost/path/to/table.json``
:param str orient: Indication of expected JSON string format. Allowed values are
``split``, ``records``, ``index``, ``columns``, and ``values``.
:param bool precise_float: Set to enable usage of higher precision (strtod) function when
decoding string to double values. Default (False) is to use fast but less precise builtin functionality
:param bool lines: Read the file as a json object per line.
:rtype: DataFrame
"""
# Check for unsupported kwargs
if kwargs.get('typ') == 'series':
raise ValueError('`typ` must be set to `"frame"`.')
if kwargs.get('numpy') == True:
raise ValueError('`numpy` must be set to `False`.')
if kwargs.get('chunksize') is not None:
raise ValueError('`chunksize` must be `None`.')
import pandas as pd
return from_pandas(pd.read_json(path_or_buffer, orient=orient, precise_float=precise_float, lines=lines, **kwargs),
copy_index=copy_index)
[docs]def from_csv(filename_or_buffer, copy_index=True, **kwargs):
"""Shortcut to read a csv file using pandas and convert to a DataFrame directly.
:rtype: DataFrame
"""
import pandas as pd
return from_pandas(pd.read_csv(filename_or_buffer, **kwargs), copy_index=copy_index)
def read_csv(filepath_or_buffer, **kwargs):
'''Alias to from_csv.'''
return from_csv(filepath_or_buffer, **kwargs)
def read_csv_and_convert(path, shuffle=False, copy_index=True, **kwargs):
'''Convert a path (or glob pattern) to a single hdf5 file, will open the hdf5 file if exists.
Example:
>>> vaex.read_csv_and_convert('test-*.csv', shuffle=True) # this may take a while
>>> vaex.read_csv_and_convert('test-*.csv', shuffle=True) # 2nd time it is instant
:param str path: path of file or glob pattern for multiple files
:param bool shuffle: shuffle DataFrame when converting to hdf5
:param bool copy_index: by default pandas will create an index (row number), set to false if you want to drop that
:param kwargs: parameters passed to pandas' read_cvs
'''
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
filenames = glob.glob(path)
if len(filenames) > 1:
filename_hdf5 = _convert_name(filenames, shuffle=shuffle)
filename_hdf5_noshuffle = _convert_name(filenames, shuffle=False)
if not os.path.exists(filename_hdf5):
if not os.path.exists(filename_hdf5_noshuffle):
# with ProcessPoolExecutor() as executor:
# executor.submit(read_csv_and_convert, filenames, shuffle=shuffle, **kwargs)
for filename in filenames:
read_csv_and_convert(filename, shuffle=shuffle, copy_index=copy_index, **kwargs)
ds = open_many([_convert_name(k, shuffle=shuffle) for k in filenames])
else:
ds = open(filename_hdf5_noshuffle)
ds.export_hdf5(filename_hdf5, shuffle=shuffle)
return open(filename_hdf5)
else:
filename = filenames[0]
filename_hdf5 = _convert_name(filename, shuffle=shuffle)
filename_hdf5_noshuffle = _convert_name(filename, shuffle=False)
if not os.path.exists(filename_hdf5):
if not os.path.exists(filename_hdf5_noshuffle):
df = pd.read_csv(filename, **kwargs)
ds = from_pandas(df, copy_index=copy_index)
else:
ds = open(filename_hdf5_noshuffle)
ds.export_hdf5(filename_hdf5, shuffle=shuffle)
return open(filename_hdf5)
aliases = vaex.settings.main.auto_store_dict("aliases")
# py2/p3 compatibility
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
[docs]def server(url, **kwargs):
"""Connect to hostname supporting the vaex web api.
:param str hostname: hostname or ip address of server
:return vaex.dataframe.ServerRest: returns a server object, note that it does not connect to the server yet, so this will always succeed
:rtype: ServerRest
"""
from vaex.remote import ServerRest
url = urlparse(url)
if url.scheme == "ws":
websocket = True
else:
websocket = False
assert url.scheme in ["ws", "http"]
port = url.port
base_path = url.path
hostname = url.hostname
return vaex.remote.ServerRest(hostname, base_path=base_path, port=port, websocket=websocket, **kwargs)
[docs]def example(download=True):
"""Returns an example DataFrame which comes with vaex for testing/learning purposes.
:rtype: DataFrame
"""
from . import utils
path = utils.get_data_file("helmi-dezeeuw-2000-10p.hdf5")
if path is None and download:
return vaex.datasets.helmi_de_zeeuw_10percent.fetch()
return open(path) if path else None
def zeldovich(dim=2, N=256, n=-2.5, t=None, scale=1, seed=None):
"""Creates a zeldovich DataFrame.
"""
import vaex.file
return vaex.file.other.Zeldovich(dim=dim, N=N, n=n, t=t, scale=scale)
def set_log_level_debug():
"""set log level to debug"""
import logging
logging.getLogger("vaex").setLevel(logging.DEBUG)
def set_log_level_info():
"""set log level to info"""
import logging
logging.getLogger("vaex").setLevel(logging.INFO)
def set_log_level_warning():
"""set log level to warning"""
import logging
logging.getLogger("vaex").setLevel(logging.WARNING)
def set_log_level_exception():
"""set log level to exception"""
import logging
logging.getLogger("vaex").setLevel(logging.FATAL)
def set_log_level_off():
"""Disabled logging"""
import logging
logging.disable(logging.CRITICAL)
format = "%(levelname)s:%(threadName)s:%(name)s:%(message)s"
logging.basicConfig(level=logging.INFO, format=format)
# logging.basicConfig(level=logging.DEBUG)
set_log_level_warning()
import_script = os.path.expanduser("~/.vaex/vaex_import.py")
if os.path.exists(import_script):
try:
with open(import_script) as f:
code = compile(f.read(), import_script, 'exec')
exec(code)
except:
import traceback
traceback.print_stack()
logger = logging.getLogger('vaex')
def register_dataframe_accessor(name, cls=None, override=False):
"""Registers a new accessor for a dataframe
See vaex.geo for an example.
"""
def wrapper(cls):
old_value = getattr(vaex.dataframe.DataFrame, name, None)
if old_value is not None and override is False:
raise ValueError("DataFrame already has a property/accessor named %r (%r)" % (name, old_value) )
def get_accessor(self):
if name in self.__dict__:
return self.__dict__[name]
else:
self.__dict__[name] = cls(self)
return self.__dict__[name]
setattr(vaex.dataframe.DataFrame, name, property(get_accessor))
return cls
if cls is None:
return wrapper
else:
return wrapper(cls)
for entry in pkg_resources.iter_entry_points(group='vaex.namespace'):
logger.warning('(DEPRECATED, use vaex.dataframe.accessor) adding vaex namespace: ' + entry.name)
try:
add_namespace = entry.load()
add_namespace()
except Exception:
logger.exception('issue loading ' + entry.name)
_df_lazy_accessors = {}
class _lazy_accessor(object):
def __init__(self, name, scope, loader):
"""When adding an accessor geo.cone, scope=='geo', name='cone', scope may be falsy"""
self.loader = loader
self.name = name
self.scope = scope
def __call__(self, obj):
if self.name in obj.__dict__:
return obj.__dict__[self.name]
else:
cls = self.loader()
accessor = cls(obj)
obj.__dict__[self.name] = accessor
fullname = self.name
if self.scope:
fullname = self.scope + '.' + self.name
if fullname in _df_lazy_accessors:
for name, scope, loader in _df_lazy_accessors[fullname]:
assert fullname == scope
setattr(cls, name, property(_lazy_accessor(name, scope, loader)))
return obj.__dict__[self.name]
def _add_lazy_accessor(name, loader, target_class=vaex.dataframe.DataFrame):
"""Internal use see tests/internal/accessor_test.py for usage
This enables us to have df.foo.bar accessors that lazily loads the modules.
"""
parts = name.split('.')
target_class = vaex.dataframe.DataFrame
if len(parts) == 1:
setattr(target_class, parts[0], property(_lazy_accessor(name, None, loader)))
else:
scope = ".".join(parts[:-1])
if scope not in _df_lazy_accessors:
_df_lazy_accessors[scope] = []
_df_lazy_accessors[scope].append((parts[-1], scope, loader))
for entry in pkg_resources.iter_entry_points(group='vaex.dataframe.accessor'):
logger.debug('adding vaex accessor: ' + entry.name)
def loader(entry=entry):
return entry.load()
_add_lazy_accessor(entry.name, loader)
for entry in pkg_resources.iter_entry_points(group='vaex.plugin'):
logger.debug('adding vaex plugin: ' + entry.name)
try:
add_namespace = entry.load()
add_namespace()
except Exception:
logger.exception('issue loading ' + entry.name)
def concat(dfs):
'''Concatenate a list of DataFrames.
:rtype: DataFrame
'''
ds = reduce((lambda x, y: x.concat(y)), dfs)
return ds
def vrange(start, stop, step=1, dtype='f8'):
"""Creates a virtual column which is the equivalent of numpy.arange, but uses 0 memory"""
from .column import ColumnVirtualRange
return ColumnVirtualRange(start, stop, step, dtype)
def string_column(strings):
from vaex_arrow.convert import column_from_arrow_array
import pyarrow as pa
return column_from_arrow_array(pa.array(strings))