torch/utils/data/dataframes_pipes.ipynb
from importlib import reload
import torch
reload(torch)
from torch.utils.data import IterDataPipe
# Example IterDataPipe
class ExampleIterPipe(IterDataPipe):
def __init__(self, range = 20) -> None:
self.range = range
def __iter__(self):
yield from self.range
def get_dataframes_pipe(range = 10, dataframe_size = 7):
return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)
def get_regular_pipe(range = 10):
return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))
Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe.
print('DataFrames Pipe')
dp = get_dataframes_pipe()
for i in dp:
print(i)
print('Regular DataPipe')
dp = get_regular_pipe()
for i in dp:
print(i)
You can iterate over raw DF using raw_iterator
dp = get_dataframes_pipe()
for i in dp.raw_iterator():
print(i)
Operations over DF Pipe is captured
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
print(dp.ops_str())
Captured operations executed on __next__ calls of constructed DataPipe
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
for i in dp.raw_iterator():
print(i)
shuffle of DataFramePipe effects rows in individual manner
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
print('Raw DataFrames iterator')
for i in dp.raw_iterator():
print(i)
print('Regular DataFrames iterator')
for i in dp:
print(i)
# this is similar to shuffle of regular DataPipe
dp = get_regular_pipe()
dp = dp.shuffle()
print('Regular iterator')
for i in dp:
print(i)
You can continue mixing DF and DP operations
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.shuffle()
dp = dp - 17
dp['y'] = dp.y * 10000
for i in dp.raw_iterator():
print(i)
Batching combines everything into list it is possible to nest lists. List may have any number of DataFrames as soon as total number of rows equal to batch size.
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
dp = dp.batch(2)
print("Iterate over DataFrame batches")
for v in dp:
print(v)
# this is similar to batching of regular DataPipe
dp = get_regular_pipe()
dp = dp.shuffle()
dp = dp.batch(2)
print("Iterate over regular batches")
for i in dp:
print(i)
Some details about internal storage of batched DataFrames and how they are iterated
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
dp = dp.batch(2)
for i in dp:
print("Type: ", type(i))
print("As string: ", i)
print("Iterated regularly:")
print('-- batch start --')
for item in i:
print(item)
print('-- batch end --')
print("Iterated in inner format (for developers):")
print('-- df batch start --')
for item in i.raw_iterator():
print(item)
print('-- df batch end --')
concat should work only of DF with same schema, this code should produce an error
# TODO!
# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)
# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)
# dp['y'] = dp.i * 100 + dp.j - 2.7
# dp = dp.concat(dp0)
# for i,v in enumerate(dp.raw_iterator()):
# print(v)
unbatch of list with DataFrame works similarly to regular unbatch.
Note: DataFrame sizes might change
dp = get_dataframes_pipe(range = 18, dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)
# Here is bug with unbatching which doesn't detect DF type.
dp['z'] = dp.y - 100
for i in dp.raw_iterator():
print(i)
map applied to individual rows, nesting_level argument used to penetrate batching
dp = get_dataframes_pipe(range = 10, dataframe_size = 3)
dp = dp.map(lambda x: x + 1111)
dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)
print("Iterate over DataFrame batches")
for i in dp:
print(i)
# Similarly works on row level for classic DataPipe elements
dp = get_regular_pipe(range = 10)
dp = dp.map(lambda x: (x[0] + 1111, x[1]))
dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)
print("Iterate over regular batches")
for i in dp:
print(i)
filter applied to individual rows, nesting_level argument used to penetrate batching
dp = get_dataframes_pipe(range = 30, dataframe_size = 3)
dp = dp.filter(lambda x: x.i > 5)
dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)
print("Iterate over DataFrame batches")
for i in dp:
print(i)
# Similarly works on row level for classic DataPipe elements
dp = get_regular_pipe(range = 30)
dp = dp.filter(lambda x: x[0] > 5)
dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)
print("Iterate over regular batches")
for i in dp:
print(i)