Back to Pytorch

Example IterDataPipe

torch/utils/data/dataframes_pipes.ipynb

2.11.05.0 KB
Original Source

[RFC] How DataFrames (DF) and DataPipes (DP) work together

python
from importlib import reload
import torch
reload(torch)
from torch.utils.data import IterDataPipe
python
# Example IterDataPipe
class ExampleIterPipe(IterDataPipe):
    def __init__(self, range = 20) -> None:
        self.range = range
    def __iter__(self):
        yield from self.range

def get_dataframes_pipe(range = 10, dataframe_size = 7):
    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)

def get_regular_pipe(range = 10):
    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))

Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe.

python
print('DataFrames Pipe')
dp = get_dataframes_pipe()
for i in dp:
    print(i)

print('Regular DataPipe')
dp = get_regular_pipe()
for i in dp:
    print(i)

You can iterate over raw DF using raw_iterator

python
dp = get_dataframes_pipe()
for i in dp.raw_iterator():
    print(i)

Operations over DF Pipe is captured

python
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
print(dp.ops_str())

Captured operations executed on __next__ calls of constructed DataPipe

python
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
for i in dp.raw_iterator():
    print(i)

shuffle of DataFramePipe effects rows in individual manner

python
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
print('Raw DataFrames iterator')
for i in dp.raw_iterator():
    print(i)

print('Regular DataFrames iterator')
for i in dp:
    print(i)


# this is similar to shuffle of regular DataPipe
dp = get_regular_pipe()
dp = dp.shuffle()
print('Regular iterator')
for i in dp:
    print(i)

You can continue mixing DF and DP operations

python
dp = get_dataframes_pipe(dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.shuffle()
dp = dp - 17
dp['y'] = dp.y * 10000
for i in dp.raw_iterator():
    print(i)

Batching combines everything into list it is possible to nest lists. List may have any number of DataFrames as soon as total number of rows equal to batch size.

python
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
dp = dp.batch(2)
print("Iterate over DataFrame batches")
for v in dp:
    print(v)

# this is similar to batching of regular DataPipe
dp = get_regular_pipe()
dp = dp.shuffle()
dp = dp.batch(2)
print("Iterate over regular batches")
for i in dp:
    print(i)

Some details about internal storage of batched DataFrames and how they are iterated

python
dp = get_dataframes_pipe(dataframe_size = 3)
dp = dp.shuffle()
dp = dp.batch(2)
for i in dp:
    print("Type: ", type(i))
    print("As string: ", i)
    print("Iterated regularly:")
    print('-- batch start --')
    for item in i:
        print(item)
    print('-- batch end --')
    print("Iterated in inner format (for developers):")
    print('-- df batch start --')
    for item in i.raw_iterator():
        print(item)
    print('-- df batch end --')

concat should work only of DF with same schema, this code should produce an error

python
# TODO!
# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)
# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)
# dp['y'] = dp.i * 100 + dp.j - 2.7
# dp = dp.concat(dp0)
# for i,v in enumerate(dp.raw_iterator()):
#     print(v)

unbatch of list with DataFrame works similarly to regular unbatch. Note: DataFrame sizes might change

python
dp = get_dataframes_pipe(range = 18, dataframe_size = 3)
dp['y'] = dp.i * 100 + dp.j - 2.7
dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)

# Here is bug with unbatching which doesn't detect DF type.
dp['z'] = dp.y - 100

for i in dp.raw_iterator():
    print(i)

map applied to individual rows, nesting_level argument used to penetrate batching

python
dp = get_dataframes_pipe(range = 10, dataframe_size = 3)
dp = dp.map(lambda x: x + 1111)
dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)
print("Iterate over DataFrame batches")
for i in dp:
    print(i)

# Similarly works on row level for classic DataPipe elements
dp = get_regular_pipe(range = 10)
dp = dp.map(lambda x: (x[0] + 1111, x[1]))
dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)
print("Iterate over regular batches")
for i in dp:
    print(i)


filter applied to individual rows, nesting_level argument used to penetrate batching

python
dp = get_dataframes_pipe(range = 30, dataframe_size = 3)
dp = dp.filter(lambda x: x.i > 5)
dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)
print("Iterate over DataFrame batches")
for i in dp:
    print(i)

# Similarly works on row level for classic DataPipe elements
dp = get_regular_pipe(range = 30)
dp = dp.filter(lambda x: x[0] > 5)
dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)
print("Iterate over regular batches")
for i in dp:
    print(i)