import base64
import tempfile
import os.path
import numpy as np
from tensorflow import keras as K
import traitlets
import vaex
import vaex.serialize
import vaex.ml.state
import vaex.ml.generate
[docs]
class DataFrameAccessorTensorflow():
def __init__(self, ml):
self.ml = ml
self.df = self.ml.df
[docs]
def to_keras_generator(self, features, target=None, batch_size=1024, parallel=True, shuffle=True, infinite=True, verbose=True):
"""Return a batch generator suitable as a Keras datasource. Note that by default the generator is infinite,
i.e. it loops continuously ovder the data. Thus you need to specify the "steps_per_epoch" arg when fitting a Keras model,
the "validation_steps" when using it for validation, and "steps" when calling the "predict" method of a keras model.
:param features: A list of features.
:param target: The dependent or target column or a list of columns, if any.
:param batch_size: Number of samples per chunk of data. This can be thought of as the batch size.
:param parallel: If True, vaex will evaluate the data chunks in parallel.
:param shuffle: If True, shuffle the data before every pass.
:param infinite: If True, the generator is infinite, i.e. it loops continously over the data. If False, the generator does only one pass over the data.
:parallel verbose: If True, show an info on the recommended "steps_per_epoch" based on the total number of samples and "batch_size".
Example:
>>> import vaex
>>> import vaex.ml
>>> import tensorflow.keras as K
>>> df = vaex.example()
>>> features = ['x', 'y', 'z', 'vx', 'vz', 'vz']
>>> target = 'FeH'
>>> # Scaling the features
>>> df = df.ml.minmax_scaler(features=features)
>>> features = df.get_column_names(regex='^minmax_')
>>> # Create a training generator
>>> train_generator = df.ml.tensorflow.to_keras_generator(features=features, target=target, batch_size=512)
Recommended "steps_per_epoch" arg: 645.0
>>> # Build a neural network model
>>> nn_model = K.Sequential()
>>> nn_model.add(K.layers.Dense(4, activation='tanh'))
>>> nn_model.add(K.layers.Dense(4, activation='tanh'))
>>> nn_model.add(K.layers.Dense(1, activation='linear'))
>>> nn_model.compile(optimizer='sgd', loss='mse')
>>> nn_model.fit(x=train_generator, epochs=3, steps_per_epoch=645)
Epoch 1/3
645/645 [==============================] - 3s 5ms/step - loss: 0.2068
Epoch 2/3
645/645 [==============================] - 3s 5ms/step - loss: 0.1706
Epoch 3/3
645/645 [==============================] - 3s 5ms/step - loss: 0.1705
"""
if verbose:
steps_per_epoch = np.ceil(len(self.df) / batch_size)
print(f'Recommended "steps_per_epoch" arg: {steps_per_epoch}')
def _generator(features, target, chunk_size, parallel, shuffle, infinite):
if shuffle:
df = self.df.shuffle().copy()
else:
df = self.df.copy()
if target is not None:
target = vaex.utils._ensure_list(target)
target = vaex.utils._ensure_strings_from_expressions(target)
n_target_cols = len(target)
column_names = features + target
else:
column_names = features
while True:
if shuffle:
df = self.df.shuffle().copy()
else:
df = self.df.copy()
if target is not None:
for i1, i2, chunk, in df.evaluate_iterator(column_names, chunk_size=chunk_size, parallel=parallel, array_type='numpy'):
chunk_shape = len(chunk[0].shape) + 1
transpose_order = np.arange(1, chunk_shape).tolist() + [0]
X = np.array(chunk[:-n_target_cols]).transpose(transpose_order)
if np.lib.NumpyVersion(np.__version__) >= '2.0.0':
y = np.array(chunk[-n_target_cols:]).T
else:
y = np.array(chunk[-n_target_cols:], copy=False).T
yield (X, y)
else:
for i1, i2, chunk, in df.evaluate_iterator(column_names, chunk_size=chunk_size, parallel=parallel, array_type='numpy'):
chunk_shape = len(chunk[0].shape) + 1
transpose_order = np.arange(1, chunk_shape).tolist() + [0]
X = np.array(chunk).transpose(transpose_order)
yield (X, )
if not infinite:
break
return _generator(features=features, target=target, chunk_size=batch_size, parallel=parallel, shuffle=shuffle, infinite=infinite)
[docs]
@vaex.serialize.register
@vaex.ml.generate.register
class KerasModel(vaex.ml.state.HasState):
'''A simple class that makes a Keras model serializable in the Vaex state, as well as enables lazy transformations of the preditions.
For more infromation on how to use the Keras library, please visit https://keras.io/.
Example:
>>> import vaex
>>> import vaex.ml.tensorflow
>>> import tensorflow.keras as K
>>> df = vaex.example()
>>> df_train, df_test = df.ml.train_test_split(0.2)
>>> features = ['vx', 'vy', 'vz']
>>> target = 'FeH'
>>> gen_train = df_train.ml.tensorflow.to_keras_generator(features=features, target=target, batch_size=512)
Recommended "steps_per_epoch" arg: 516.0
>>> nn_model = K.Sequential()
>>> nn_model.add(K.layers.Dense(3, activation='tanh'))
>>> nn_model.add(K.layers.Dense(1, activation='linear'))
>>> nn_model.compile(optimizer='sgd', loss='mse')
>>> nn_model.fit(x=gen_train, epochs=11, steps_per_epoch=516, verbose=0)
>>> keras_model = vaex.ml.tensorflow.KerasModel(features=features, prediction_name='pred', model=nn_model)
>>> df_test = keras_model.transform(df_test)
>>> print(df_test.head(3)['vx', 'vy', 'vz', 'FeH', 'pred'])
# vx vy vz FeH pred
0 301.155 174.059 27.4275 -1.00539 array([-1.5861175], dtype=float32)
1 -195 170.472 142.53 -1.70867 array([-1.6035867], dtype=float32)
2 -48.6342 171.647 -2.07944 -1.83361 array([-1.5978462], dtype=float32)
'''
features = traitlets.List(traitlets.Unicode(), help='List of features to use when applying the KerasModel.')
prediction_name = traitlets.Unicode(default_value='keras_prediction', help='The name of the virtual column housing the predictions.')
model = traitlets.Any(help='A fitted Keras Model')
custom_objects = traitlets.Dict(help='Optional dictionary mapping names (strings) to custom classes or functions to be used during deserialization. See "tf.keras.models.load_model" for more details.').tag(**vaex.ml.state.serialize_pickle)
def __call__(self, *args):
data2d = np.stack([np.asarray(arg, np.float64) for arg in args], axis=-1)
return self.model.predict(data2d)
[docs]
def fit(self, df):
'''Not implemented: A Placeholder method, put here for potential future developement.
'''
raise NotImplementedError('The `fit` method is not implemented. To satisfy the large number of use-cases and for maximum flexiblity, please fit the model using the `tensorflow.keras` API.')
def state_get(self):
state = super(KerasModel, self).state_get()
with tempfile.TemporaryDirectory() as directory:
filepath = os.path.join(directory, 'model.keras')
self.model.save(filepath)
with open(filepath, 'rb') as f:
data = f.read()
# store model as raw zip bytes base64 encoded
state['model'] = base64.encodebytes(data).decode('ascii')
return state
def state_set(self, state, trusted=True):
state = state.copy()
model_data = state.pop('model')
super(KerasModel, self).state_set(state)
data = base64.decodebytes(model_data.encode('ascii'))
with tempfile.TemporaryDirectory() as directory:
filepath = os.path.join(directory, 'model.keras')
with open(filepath, 'wb') as f:
f.write(data)
self.model = K.models.load_model(filepath, custom_objects=self.custom_objects)