Manual/Manual_How_to_create_user_minibatch_sources.ipynb
In order to make use of CNTK’s (distributed) training functionality, one has to provide input data as an instance of MinibatchSource. In CNTK, there are a variety of means to provide minibatch sources:
This manual explains the last approach: How to create user minibatch source in Python.
A minibatch source is responsible for providing:
Correspondingly, a minibatch source API needs to implement the following $4$ methods (see UserMinibatchSource for details):
Now let's go through th implementation of these $4$ methods step by step.
In the following example, we will detail the steps on how to implement the UserMinibatchSource interface.
First, let's import the necessary packages:
import numpy as np
import cntk as C
from cntk.io import UserMinibatchSource, StreamInformation, MinibatchData
Secondly, let's assume that we have a data set in the following tab seperated text format:
In the fllowing, our toy data set contains 4 sequences:
sample_data = r'''0 |x 560:1 |y 1 0 0 0 0
0 |x 0:1
0 |x 0:1
1 |x 560:1 |y 0 1 0 0 0
1 |x 0:1
1 |x 0:1
1 |x 424:1
2 |x 160:1 |y 0 0 1 0 0
2 |x 5:1
2 |x 6:1
3 |x 460:1 |y 0 0 0 1 0
3 |x 3:1
3 |x 3:1
3 |x 425:1
'''
To implement our example user minibatch source, we first prepare the data access and its meta information:
features = self.data[seq_idx]['features']
labels = self.data[seq_idx]['labels']
This is done by create a private method ``_prepare_data()`` in the example below. We ommit the implementation detail of text format parsing here as the detail is irrelevant to the understanding of the UserMinibatchSource interface. However, the parsing mechanims should be able to keep track of where the current data access point is so that the data feeding process can be restored at any point. In the example, we are tracking the sequence index.
2. Define the meta information of the data: e.g.
self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
The self.fsi and self.lsi define the meta information (see [StreamInformation](https://cntk.ai/pythondocs/cntk.io.html#cntk.io.StreamInformation) for definition ) regarding the features and labels respectively. For example, ``StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))`` specifies that :
a) the "feature" data stream is indentified by ID $0$ (it is required that every data stream is identified by a unique ID),
b) it is sparse,
c) its data type is
np.float32, and
d) its dimension is
(self.f_dim, ).
self.next_seq_idx = 0
class MyMultiWorkerDataSource(UserMinibatchSource):
def __init__(self, f_dim, l_dim):
self.f_dim, self.l_dim = f_dim, l_dim
self._prepare_data()
#setting the state
self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
self.sequences = sorted(self.data)
self.next_seq_idx = 0
super(MyMultiWorkerDataSource, self).__init__()
Do not forget to call the super class' constructor: super(MyMultiWorkerDataSource, self) init() function.
After the preparation is done by the constructor, we can implement stream_infos() simply by returning the list of stream information instances:
def stream_infos(self):
return [self.fsi, self.lsi]
With this method implemented, the underlying minibatch source framework will able to refer to the meta information by names "featuers" and "labels".
Let us first review the function signature of the next_minibatch method:
def next_minibatch(self, num_samples, number_of_workers, worker_rank, device)
This method is invoked by the outer CNTK learning loops with four parameters:
In other words, it is the user minibatch source's responsibility to understand these parameters and provide minibatch data accordingly. The minibatch source need to ensure that
To make the underlying requirement stand out, in the example below we implemented a private function _prepare_nextbatch() to encapsulate details:
def _prepare_nextbatch(self, num_samples, number_of_workers, worker_rank):
# details....
return features, f_sample_count, labels, l_sample_count, sweep_end
This function ensure that features and labels contains the num_samples of samples or less. The sample counts are also returned as f_sample_count and l_sample_count respectively. Note that different data streams might contain different number of samples. In addition, sweep_end tells whether this minibatch is at the end of a sweep of the whole data set.
To define user minibatch source that can be used with distributed learners, e.g. BlockMomentum. We will need to use number_of_workers to cut the data into slices and then return the slices depending on which worker_rank requested the next minibatch. In this private function, we implement a naive logic to distribute the data to the specific worker by skipping sequences if its sequence index modulo the number of workers does not equal to the worker rank:
if (seq_id % number_of_workers) != worker_rank:
continue
This is only for demonstration purpose. In practice, the distribution of data to workers should be based on a more efficient mechanism: e.g. based on how costly the specific worker can access the specific subset of data and the randomization mechanism.
After the data is prepared, we need to convert them into the values that CNTK operators can operate on efficiently. This is done by create various types of cntk.Value instances:
feature_data = C.Value.one_hot(batch=features, num_classes=self.f_dim, device = device)
label_data = C.Value(batch=np.asarray(labels, dtype=np.float32), device = device)
In this example, the feature data are of a special type of sparse data which are created through the cntk.Value.one_hot function --- an element within a sequence is a one-hot vector. The label data are of a type of dense data which are created through the cntk.Value constructor. Note that in these CNTK value constructors, we explicitly specify on which device these values should be constructed. Reall that the device parameter is provided by the outher learning loops.
Finally, we need to create MinibatchData instances and return them in a dictionary with the corresponding StreamInformation instances as keys:
res = {
self.fsi: MinibatchData(feature_data, num_seq, feature_sample_count, sweep_end),
self.lsi: MinibatchData(label_data, num_seq, label_sample_count, sweep_end)}
return res
The constructor of MinibatchData takes 1) the data that are already in the form cntk.Value: i.e. feature_data and label_data here, 2) the number of sequences in the minibatch, 3) the number of samples, and 4) whether it is at the end of a sweep of the whole data set.
All together, we've implemented our next_minibatch() method to provide minibatches of data of specified properties for the outer learning loops to consume.
Firstly, we need to define the state of our user minibatch so that the data feeding process can be restored from the exact point where it was stopped. In our simple example, we just need to know to next sequence index to restore the data feeding process by the following get and restore checkpoints methods:
def get_checkpoint_state(self):
return {'next_seq_idx': self.next_seq_idx}
def restore_from_checkpoint(self, state):
self.next_seq_idx = state['next_seq_idx']
It is easy to see that a checkpoint state is a dictionary from string keys to the corresponding state variable value objects. In this example, it is the next sequence index.
All together we have our complete user minibatch implementation as follows:
class MyMultiWorkerDataSource(UserMinibatchSource):
def __init__(self, f_dim, l_dim):
self.f_dim, self.l_dim = f_dim, l_dim
self._prepare_data()
#setting the state
self.fsi = StreamInformation("features", 0, 'sparse', np.float32, (self.f_dim,))
self.lsi = StreamInformation("labels", 1, 'dense', np.float32, (self.l_dim,))
self.sequences = sorted(self.data)
self.next_seq_idx = 0
super(MyMultiWorkerDataSource, self).__init__()
def _prepare_data(self):
"""
Parse the text and load the data into self.data.
self.data is of the following structure:
sequence id -> "features" -> list of features
and
sequence id -> "labels" -> label
"""
self.data = {}
for line in sample_data.split('\n'):
line = line.strip()
if not line:
continue
seq_id, data = line.split('|', 1)
data = data.split("|")
seq_id = int(seq_id.strip())
if seq_id not in self.data:
self.data[seq_id] = {'features': []}
# Processing features - expecting one per line.
features = data[0].split(" ")
vocab_idx = int(features[1].split(":")[0])
self.data[seq_id]['features'].append(vocab_idx)
# Process label, if exists
if len(data) == 2:
labels = np.asarray([data[1].split(" ")[1:]], dtype=np.float32)
self.data[seq_id]['labels'] = labels
def _prepare_nextbatch(self, num_samples, number_of_workers, worker_rank):
features = []
labels = []
sweep_end = False
f_sample_count = l_sample_count = 0
while max(f_sample_count, l_sample_count) < num_samples:
if self.next_seq_idx == len(self.sequences):
sweep_end = True
self.next_seq_idx = 0
seq_id = self.sequences[self.sequences[self.next_seq_idx]]
#Based on the worker rank, determines whether to add this
#data in the batch: If the sequences doesn't belong to this
#worker, skip it. In practice, this should be based on more
#efficient mechanism: e.g. based on the location of the worker
#and the data location
if (seq_id % number_of_workers) != worker_rank:
continue
feature_data = self.data[seq_id]['features']
label_data = self.data[seq_id]['labels']
if (features or labels) and \
max(f_sample_count+len(feature_data), \
l_sample_count+len(label_data)) > num_samples:
break
f_sample_count += len(feature_data)
features.append(feature_data)
l_sample_count += len(label_data)
labels.append(label_data)
self.next_seq_idx += 1
return features, f_sample_count, labels, l_sample_count, sweep_end
def stream_infos(self):
"""
Override the stream_infos method of the base UserMinibatchSource class
to provide stream meta information.
"""
return [self.fsi, self.lsi]
def next_minibatch(self, num_samples, number_of_workers, worker_rank, device):
"""
Override the next_minibatch method of the base UserMinibatchSource class
to provide minibatch data.
"""
features, feature_sample_count, \
labels, label_sample_count, sweep_end = self._prepare_nextbatch(num_samples,
number_of_workers,
worker_rank)
feature_data = C.Value.one_hot(batch=features, num_classes=self.f_dim, device = device)
label_data = C.Value(batch=np.asarray(labels, dtype=np.float32), device = device)
num_seq = len(features)
res = {
self.fsi: MinibatchData(feature_data, num_seq, feature_sample_count, sweep_end),
self.lsi: MinibatchData(label_data, num_seq, label_sample_count, sweep_end)
}
return res
def get_checkpoint_state(self):
return {'next_seq_idx': self.next_seq_idx}
def restore_from_checkpoint(self, state):
self.next_seq_idx = state['next_seq_idx']
Note that in this example, for simplicity we load the whole data set into the memory. In practice, the minibatch source should depend on the data source state (e.g. the mapping between the requesting next batch data and its logical/physical location in the data storage) to load (or pre-load) the data at the point (or right before) they are requested.
The implemented minitbatch source can then be used wherever a MinibatchSource instance is accepted. For example,
input_dim = 1000
num_output_classes = 5
# instantiating the user minibatch source
mbs = MyMultiWorkerDataSource( input_dim, num_output_classes)
feature = C.sequence.input_variable(shape=(input_dim,))
label = C.input_variable(shape=(num_output_classes,))
# setting up the model
rnn = C.layers.Recurrence(C.layers.LSTM(20), go_backwards=False)(feature)
end = C.sequence.last(rnn)
z = C.layers.Dense(num_output_classes)(end)
loss = C.cross_entropy_with_softmax(z, label)
errs = C.classification_error(z, label)
local_learner = C.sgd(z.parameters,
C.learning_parameter_schedule_per_sample(0.5))
dist_learner = C.distributed.data_parallel_distributed_learner(local_learner)
# and train
trainer = C.Trainer(z, (loss, errs),
[dist_learner],
[C.logging.ProgressPrinter(tag='Training', num_epochs=10)])
input_map = {
feature: mbs.fsi,
label: mbs.lsi
}
session = C.training_session(
trainer = trainer,
mb_source = mbs,
model_inputs_to_streams = input_map,
mb_size = 7,
max_samples = 80,
progress_frequency = 20
)
session.train()
#finalize the distributed learning
C.distributed.Communicator.finalize()
In certain simplified scenarios, we might not want to implement a minibatch source with full functionality.
If parallel data learning is not reqired, we can omit the logic of distributing data to workers. Set number_of_workers = 1 and worker_rank = 0 when overriding the next_minibatch() method.
If checkpoint restoration is not require, we can omit implementing the two checkpoint related methods: get_checkpoint_state() and restore_from_checkpoint().