Back to Vaex

Machine Learning with vaex.ml

docs/source/tutorial_ml.ipynb

4.19.021.9 KB
Original Source

Machine Learning with vaex.ml

If you want to try out this notebook with a live Python kernel, use mybinder:

<a class="reference external image-reference" href="https://mybinder.org/v2/gh/vaexio/vaex/latest?filepath=docs%2Fsource%2Ftutorial_ml.ipynb"></a>

The vaex.ml package brings some machine learning algorithms to vaex. If you installed the individual subpackages (vaex-core, vaex-hdf5, ...) instead of the vaex metapackage, you may need to install it by running pip install vaex-ml, or conda install -c conda-forge vaex-ml.

The API of vaex.ml stays close to that of scikit-learn, while providing better performance and the ability to efficiently perform operations on data that is larger than the available RAM. This page is an overview and a brief introduction to the capabilities offered by vaex.ml.

python
import vaex
vaex.multithreading.thread_count_default = 8
import vaex.ml

import numpy as np
import matplotlib.pyplot as plt

We will use the well known Iris flower and Titanic passenger list datasets, two classical datasets for machine learning demonstrations.

python
df = vaex.datasets.iris()
df
python
df.scatter(df.petal_length, df.petal_width, c_expr=df.class_);

Preprocessing

Scaling of numerical features

vaex.ml packs the common numerical scalers:

  • vaex.ml.StandardScaler - Scale features by removing their mean and dividing by their variance;
  • vaex.ml.MinMaxScaler - Scale features to a given range;
  • vaex.ml.RobustScaler - Scale features by removing their median and scaling them according to a given percentile range;
  • vaex.ml.MaxAbsScaler - Scale features by their maximum absolute value.

The usage is quite similar to that of scikit-learn, in the sense that each transformer implements the .fit and .transform methods.

python
features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
scaler = vaex.ml.StandardScaler(features=features, prefix='scaled_')
scaler.fit(df)
df_trans = scaler.transform(df)
df_trans

The output of the .transform method of any vaex.ml transformer is a shallow copy of a DataFrame that contains the resulting features of the transformations in addition to the original columns. A shallow copy means that this new DataFrame just references the original one, and no extra memory is used. In addition, the resulting features, in this case the scaled numerical features are virtual columns, which do not take any memory but are computed on the fly when needed. This approach is ideal for working with very large datasets.

Encoding of categorical features

vaex.ml contains several categorical encoders:

  • vaex.ml.LabelEncoder - Encoding features with as many integers as categories, startinfg from 0;
  • vaex.ml.OneHotEncoder - Encoding features according to the one-hot scheme;
  • vaex.ml.MultiHotEncoder - Encoding features according to the multi-hot scheme (binary vector);
  • vaex.ml.FrequencyEncoder - Encode features by the frequency of their respective categories;
  • vaex.ml.BayesianTargetEncoder - Encode categories with the mean of their target value;
  • vaex.ml.WeightOfEvidenceEncoder - Encode categories their weight of evidence value.

The following is a quick example using the Titanic dataset.

python
df =  vaex.datasets.titanic()
df.head(5)
python
label_encoder = vaex.ml.LabelEncoder(features=['embarked'])
one_hot_encoder = vaex.ml.OneHotEncoder(features=['embarked'])
multi_hot_encoder = vaex.ml.MultiHotEncoder(features=['embarked'])
freq_encoder = vaex.ml.FrequencyEncoder(features=['embarked'])
bayes_encoder = vaex.ml.BayesianTargetEncoder(features=['embarked'], target='survived')
woe_encoder = vaex.ml.WeightOfEvidenceEncoder(features=['embarked'], target='survived')

df = label_encoder.fit_transform(df)
df = one_hot_encoder.fit_transform(df)
df = multi_hot_encoder.fit_transform(df)
df = freq_encoder.fit_transform(df)
df = bayes_encoder.fit_transform(df)
df = woe_encoder.fit_transform(df)

df.head(5)

Notice that the transformed features are all included in the resulting DataFrame and are appropriately named. This is excellent for the construction of various diagnostic plots, and engineering of more complex features. The fact that the resulting (encoded) features take no memory, allows one to try out or combine a variety of preprocessing steps without spending any extra memory.

Feature Engineering

KBinsDiscretizer

With the KBinsDiscretizer you can convert a continous into a discrete feature by binning the data into specified intervals. You can specify the number of bins, the strategy on how to determine their size:

  • "uniform" - all bins have equal sizes;
  • "quantile" - all bins have (approximately) the same number of samples in them;
  • "kmeans" - values in each bin belong to the same 1D cluster as determined by the KMeans algorithm.
python
kbdisc = vaex.ml.KBinsDiscretizer(features=['age'], n_bins=5, strategy='quantile')
df = kbdisc.fit_transform(df)
df.head(5)

GroupBy Transformer

The GroupByTransformer is a handy feature in vaex-ml that lets you perform a groupby aggregations on the training data, and then use those aggregations as features in the training and test sets.

python
gbt = vaex.ml.GroupByTransformer(by='pclass', agg={'age': ['mean', 'std'],
                                                   'fare': ['mean', 'std'],
                                                  })
df = gbt.fit_transform(df)
df.head(5)

CycleTransformer

The CycleTransformer provides a strategy for transforming cyclical features, such as angles or time. This is done by considering each feature to be describing a polar coordinate system, and converting it to Cartesian coorindate system. This is shown to help certain ML models to achieve better performance.

python
df = vaex.from_arrays(days=[0, 1, 2, 3, 4, 5, 6])
cyctrans = vaex.ml.CycleTransformer(n=7, features=['days'])
cyctrans.fit_transform(df)

Dimensionality reduction

Principal Component Analysis

The PCA implemented in vaex.ml can scale to a very large number of samples, even if that data we want to transform does not fit into RAM. To demonstrate this, let us do a PCA transformation on the Iris dataset. For this example, we have replicated this dataset thousands of times, such that it contains over 1 billion samples.

python
df = vaex.datasets.iris_1e9()
n_samples = len(df)
print(f'Number of samples in DataFrame: {n_samples:,}')
python
features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
pca = vaex.ml.PCA(features=features, n_components=4)
pca.fit(df, progress='widget')

The PCA transformer implemented in vaex.ml can be fit in well under a minute, even when the data comprises 4 columns and 1 billion rows.

python
df_trans = pca.transform(df)
df_trans

Recall that the transformed DataFrame, which includes the PCA components, takes no extra memory.

Incremental PCA

The PCA implementation in vaex is very fast, but more so for "tall" DataFrames, i.e. DataFrames that have many rows, but not many columns. For DataFrames that have hundreds of columns, it is more efficient to use an Incremental PCA method. vaex.ml provides a convenient method that essentialy wraps sklearn.decomposition.IncrementalPCA, the fitting of which is more efficient for "wide" DataFrames.

The usage is practically identical to the regular PCA method. Consider the following example:

python
n_samples = 100_000
n_columns = 50
data_dict = {f'feat_{i}': np.random.normal(0, i+1, size=n_samples) for i in range(n_columns)}
df = vaex.from_dict(data_dict)


features = df.get_column_names()
pca = vaex.ml.PCAIncremental(n_components=10, features=features, batch_size=42_000)
pca.fit(df, progress='widget')
pca.transform(df)

Note that you need scikit-learn installed to only fit the PCAIncremental transformer. The the transform method does not rely on scikit-learn being installed.

Random projections

Random projections is another popular way of doing dimensionality reduction, especially when the dimensionality of the data is very high. vaex.ml conveniently wraps both scikit-learn.random_projection.GaussianRandomProjection and scikit-learn.random_projection.SparseRandomProjection in a single vaex.ml transformer.

python
rand_proj = vaex.ml.RandomProjections(features=features, n_components=10)
rand_proj.fit(df)
rand_proj.transform(df)

Clustering

K-Means

vaex.ml implements a fast and scalable K-Means clustering algorithm. The usage is similar to that of scikit-learn.

python
import vaex.ml.cluster

df = vaex.datasets.iris()

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
kmeans = vaex.ml.cluster.KMeans(features=features, n_clusters=3, max_iter=100, verbose=True, random_state=42)
kmeans.fit(df)

df_trans = kmeans.transform(df)
df_trans

K-Means is an unsupervised algorithm, meaning that the predicted cluster labels in the transformed dataset do not necessarily correspond to the class label. We can map the predicted cluster identifiers to match the class labels, making it easier to construct diagnostic plots.

python
df_trans['predicted_kmean_map'] = df_trans.prediction_kmeans.map(mapper={0: 1, 1: 2, 2: 0})
df_trans

Now we can construct simple scatter plots, and see that in the case of the Iris dataset, K-Means does a pretty good job splitting the data into 3 classes.

python
fig = plt.figure(figsize=(12, 5))

plt.subplot(121)
df_trans.scatter(df_trans.petal_length, df_trans.petal_width, c_expr=df_trans.class_)
plt.title('Original classes')

plt.subplot(122)
df_trans.scatter(df_trans.petal_length, df_trans.petal_width, c_expr=df_trans.predicted_kmean_map)
plt.title('Predicted classes')

plt.tight_layout()
plt.show()

As with any algorithm implemented in vaex.ml, K-Means can be used on billions of samples. Fitting takes under 2 minutes when applied on the oversampled Iris dataset, numbering over 1 billion samples.

python
df = vaex.datasets.iris_1e9()
n_samples = len(df)
print(f'Number of samples in DataFrame: {n_samples:,}')
python
%%time

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
kmeans = vaex.ml.cluster.KMeans(features=features, n_clusters=3, max_iter=100, verbose=True, random_state=31)
kmeans.fit(df)

Supervised learning

While vaex.ml does not yet implement any supervised machine learning models, it does provide wrappers to several popular libraries such as scikit-learn, XGBoost, LightGBM and CatBoost.

The main benefit of these wrappers is that they turn the models into vaex.ml transformers. This means the models become part of the DataFrame state and thus can be serialized, and their predictions can be returned as virtual columns. This is especially useful for creating various diagnostic plots and evaluating performance metrics at no memory cost, as well as building ensembles.

Scikit-Learn example

The vaex.ml.sklearn module provides convenient wrappers to the scikit-learn estimators. In fact, these wrappers can be used with any library that follows the API convention established by scikit-learn, i.e. implements the .fit and .transform methods.

Here is an example:

python
from vaex.ml.sklearn import Predictor
from sklearn.ensemble import GradientBoostingClassifier

df = vaex.datasets.iris()

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
target = 'class_'

model = GradientBoostingClassifier(random_state=42)
vaex_model = Predictor(features=features, target=target, model=model, prediction_name='prediction')

vaex_model.fit(df=df)

df = vaex_model.transform(df)
df

One can still train a predictive model on datasets that are too big to fit into memory by leveraging the on-line learners provided by scikit-learn. The vaex.ml.sklearn.IncrementalPredictor conveniently wraps these learners and provides control on how the data is passed to them from a vaex DataFrame.

Let us train a model on the oversampled Iris dataset which comprises over 1 billion samples.

python
from vaex.ml.sklearn import IncrementalPredictor
from sklearn.linear_model import SGDClassifier

df = vaex.datasets.iris_1e9()

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
target = 'class_'

model = SGDClassifier(learning_rate='constant', eta0=0.0001, random_state=42)
vaex_model = IncrementalPredictor(features=features, target=target, model=model, 
                                  batch_size=500_000, partial_fit_kwargs={'classes':[0, 1, 2]})

vaex_model.fit(df=df, progress='widget')

df = vaex_model.transform(df)
df

XGBoost example

Libraries such as XGBoost provide more options such as validation during training and early stopping for example. We provide wrappers that keeps close to the native API of these libraries, in addition to the scikit-learn API.

While the following example showcases the XGBoost wrapper, vaex.ml implements similar wrappers for LightGBM and CatBoost.

python
from vaex.ml.xgboost import XGBoostModel

df = vaex.datasets.iris_1e5()
df_train, df_test = df.ml.train_test_split(test_size=0.2, verbose=False)

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
target = 'class_'

params = {'learning_rate': 0.1,
          'max_depth': 3, 
          'num_class': 3, 
          'objective': 'multi:softmax',
          'subsample': 1,
          'random_state': 42,
          'n_jobs': -1}


booster = XGBoostModel(features=features, target=target, num_boost_round=500, params=params)
booster.fit(df=df_train, evals=[(df_train, 'train'), (df_test, 'test')], early_stopping_rounds=5)

df_test = booster.transform(df_train)
df_test

CatBoost example

The CatBoost library supports summing up models. With this feature, we can use CatBoost to train a model using data that is otherwise too large to fit in memory. The idea is to train a single CatBoost model per chunk of data, and than sum up the invidiual models to create a master model. To use this feature via vaex.ml just specify the batch_size argument in the CatBoostModel wrapper. One can also specify additional options such as the strategy on how to sum up the individual models, or how they should be weighted.

python
import numpy as np\nfrom vaex.ml.catboost import CatBoostModel

df = vaex.datasets.iris_1e8()
df_train, df_test = df.ml.train_test_split(test_size=0.2, verbose=False)

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
target = 'class_'

params = {
    'leaf_estimation_method': 'Gradient',
    'learning_rate': 0.1,
    'max_depth': 3,
    'bootstrap_type': 'Bernoulli',
    'subsample': 0.8,
    'sampling_frequency': 'PerTree',
    'colsample_bylevel': 0.8,
    'reg_lambda': 1,
    'objective': 'MultiClass',
    'eval_metric': 'MultiClass',
    'random_state': 42,
    'verbose': 0,
}

booster = CatBoostModel(features=features, target=target, num_boost_round=23, 
                        params=params, prediction_type='Class', batch_size=11_000_000)
booster.fit(df=df_train, progress='widget')

df_test = booster.transform(df_train)
df_test

Keras example

Keras is the most popular high-level API to building neural network models with tensorflow as its backend. Neural networks can have very diverse and complicated architectures, and their training loops can be both simple and sophisticated. This is why, at least for now, we leave the users to train their keras models as they normaly would, and in vaex-ml provides a simple wrapper for serialization and lazy evaluation of those models. In addition, vaex-ml also provides a convenience method to turn a DataFrame into a generator, suitable for training of Keras models. See the example below.

python
import vaex.ml.tensorflow
import tensorflow.keras as K

df = vaex.example()
df_train, df_valid, df_test = df.split_random([0.8, 0.1, 0.1], random_state=42)

features = ['x', 'y', 'z', 'vx', 'vy', 'vz']
target = 'FeH'

# Scaling the features
df_train = df_train.ml.minmax_scaler(features=features)
features = df_train.get_column_names(regex='^minmax_')

# Apply preprocessing to the validation
state_prep = df_train.state_get()
df_valid.state_set(state_prep)

# Generators for the train and validation sets
gen_train = df_train.ml.tensorflow.to_keras_generator(features=features, target=target, batch_size=512)
gen_valid = df_valid.ml.tensorflow.to_keras_generator(features=features, target=target, batch_size=512)

# Create and fit a simple Sequential Keras model
nn_model = K.Sequential()
nn_model.add(K.layers.Dense(3, activation='tanh'))
nn_model.add(K.layers.Dense(1, activation='linear'))
nn_model.compile(optimizer='sgd', loss='mse')
nn_model.fit(x=gen_train, validation_data=gen_valid, epochs=11, steps_per_epoch=516, validation_steps=65)

# Serialize the model
keras_model = vaex.ml.tensorflow.KerasModel(features=features, prediction_name='keras_pred', model=nn_model)
df_train = keras_model.transform(df_train)

# Apply all the transformations to the test set
state = df_train.state_get()
df_test.state_set(state)

# Preview the results
df_test.head(5)

River example

River is an up-and-coming library for online learning, and provides a variety of models that can learn incrementally. While most of the river models currently support per-sample training, few do support mini-batch training which is extremely fast - a great synergy to do machine learning with vaex.

python
from vaex.ml.incubator.river import RiverModel
from river.linear_model import LinearRegression
from river import optim


df = vaex.datasets.iris_1e9()
df_train, df_test = df.ml.train_test_split(test_size=0.2, verbose=False)

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
target = 'class_'

river_model = RiverModel(features=features,
                         target=target,
                         model=LinearRegression(optimizer=optim.SGD(0.001), intercept_lr=0.001),
                         prediction_name='prediction_raw',
                         batch_size=500_000)
river_model.fit(df_train, progress='widget')
river_model.transform(df_test)

Metrics

vaex-ml also provides several of the most common evaluation metrics for classification and regression tasks. These metrics are implemented in vaex-ml and thus are evaluated out-of-core, so you do not need to materialize the target and predicted columns.

Here is a list of the currently supported metrics:

  • Classification (binary, and macro-average for multiclass problems):
    • Accuracy
    • Precision
    • Recall
    • F1-score
    • Confusion matrix
    • Classification report (a convenience method, which prints out the accuracy, precision, recall, and F1-score at the same time)
    • Matthews Correlation Coeficient
  • Regression
    • Mean Absolute Error
    • Mean Squared Error
    • R<sup>2</sup> Correlation Score

Here is a simple example:

python
import vaex.ml.metrics
from sklearn.linear_model import LogisticRegression

df = vaex.datasets.iris()
df_train, df_test = df.split_random([0.8, 0.2], random_state=55)

features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
target = 'class_'

model = LogisticRegression(random_state=42)
vaex_model = Predictor(features=features, target=target, model=model, prediction_name='pred')

vaex_model.fit(df=df_train)

df_test = vaex_model.transform(df_test)

print(df_test.ml.metrics.classification_report(df_test.class_, df_test.pred, average='macro'))

State transfer - pipelines made easy

Each vaex DataFrame consists of two parts: data and state. The data is immutable, and any operation such as filtering, adding new columns, or applying transformers or predictive models just modifies the state. This is extremely powerful concept and can completely redefine how we imagine machine learning pipelines.

As an example, let us once again create a model based on the Iris dataset. Here, we will create a couple of new features, do a PCA transformation, and finally train a predictive model.

python
# Load data and split it in train and test sets
df = vaex.datasets.iris()
df_train, df_test = df.ml.train_test_split(test_size=0.2, verbose=False)

# Create new features
df_train['petal_ratio'] = df_train.petal_length / df_train.petal_width
df_train['sepal_ratio'] = df_train.sepal_length / df_train.sepal_width

# Do a PCA transformation
features = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width', 'petal_ratio', 'sepal_ratio']
pca = vaex.ml.PCA(features=features, n_components=6)
df_train = pca.fit_transform(df_train)

# Display the training DataFrame at this stage
df_train

At this point, we are ready to train a predictive model. In this example, let's use LightGBM with its scikit-learn API.

python
import lightgbm

features = df_train.get_column_names(regex='^PCA')

booster = lightgbm.LGBMClassifier()

vaex_model = Predictor(model=booster, features=features, target='class_')

vaex_model.fit(df=df_train)
df_train = vaex_model.transform(df_train)

df_train

The final df_train DataFrame contains all the features we created, including the predictions right at the end. Now, we would like to apply the same transformations to the test set. All we need to do, is to simply extract the state from df_train and apply it to df_test. This will propagate all the changes that were made to the training set on the test set.

python
state = df_train.state_get()

df_test.state_set(state)
df_test

And just like that df_test contains all the columns, transformations and the prediction we modelled on the training set. The state can be easily serialized to disk in a form of a JSON file. This makes deployment of a machine learning model as trivial as simply copying a JSON file from one environment to another.

python
df_train.state_write('./iris_model.json')

df_test.state_load('./iris_model.json')
df_test