torch/utils/data/standard_pipes.ipynb
from torch.utils.data import IterDataPipe
# Example IterDataPipe
class ExampleIterPipe(IterDataPipe):
def __init__(self, range = 20) -> None:
self.range = range
def __iter__(self):
yield from self.range
Function: batch
Description:
Alternatives:
Arguments:
batch_size: int desired batch sizeunbatch_level:int = 0 if specified calls unbatch(unbatch_level=unbatch_level) on source datapipe before batching (see unbatch)drop_last: bool = FalseExample:
Classic batching produce partial batches by default
dp = ExampleIterPipe(10).batch(3)
for i in dp:
print(i)
To drop incomplete batches add drop_last argument
dp = ExampleIterPipe(10).batch(3, drop_last = True)
for i in dp:
print(i)
Sequential calling of batch produce nested batches
dp = ExampleIterPipe(30).batch(3).batch(2)
for i in dp:
print(i)
It is possible to unbatch source data before applying the new batching rule using unbatch_level argument
dp = ExampleIterPipe(30).batch(3).batch(2).batch(10, unbatch_level=-1)
for i in dp:
print(i)
Function: unbatch
Description:
Alternatives:
Arguments:
unbatch_level:int = 1
Example:
dp = ExampleIterPipe(10).batch(3).shuffle().unbatch()
for i in dp:
print(i)
By default unbatching is applied only on the first layer, to unbatch deeper use unbatch_level argument
dp = ExampleIterPipe(40).batch(2).batch(4).batch(3).unbatch(unbatch_level = 2)
for i in dp:
print(i)
Setting unbatch_level to -1 will unbatch to the lowest level
dp = ExampleIterPipe(40).batch(2).batch(4).batch(3).unbatch(unbatch_level = -1)
for i in dp:
print(i)
Function: map
Description:
Alternatives:
Arguments:
nesting_level: int = 0Example:
dp = ExampleIterPipe(10).map(lambda x: x * 2)
for i in dp:
print(i)
map by default applies function to every mini-batch as a whole
dp = ExampleIterPipe(10).batch(3).map(lambda x: x * 2)
for i in dp:
print(i)
To apply function on individual items of the mini-batch use nesting_level argument
dp = ExampleIterPipe(10).batch(3).batch(2).map(lambda x: x * 2, nesting_level = 2)
for i in dp:
print(i)
Setting nesting_level to -1 will apply map function to the lowest level possible
dp = ExampleIterPipe(10).batch(3).batch(2).batch(2).map(lambda x: x * 2, nesting_level = -1)
for i in dp:
print(i)
Function: filter
Description:
Alternatives:
Arguments:
nesting_level: int = 0drop_empty_batches = True whether empty many batches dropped or not.Example:
dp = ExampleIterPipe(10).filter(lambda x: x % 2 == 0)
for i in dp:
print(i)
Classic filter by default applies filter function to every mini-batches as a whole
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda x: len(x) > 2)
for i in dp:
print(i)
You can apply filter function on individual elements by setting nesting_level argument
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda x: x > 4, nesting_level = 1)
for i in dp:
print(i)
If mini-batch ends with zero elements after filtering default behaviour would be to drop them from the response. You can override this behaviour using drop_empty_batches argument.
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda x: x > 4, nesting_level = -1, drop_empty_batches = False)
for i in dp:
print(i)
dp = ExampleIterPipe(20)
dp = dp.batch(3).batch(2).batch(2).filter(lambda x: x < 4 or x > 9 , nesting_level = -1, drop_empty_batches = False)
for i in dp:
print(i)
Function: shuffle
Description:
Alternatives:
Arguments:
unbatch_level:int = 0 if specified calls unbatch(unbatch_level=unbatch_level) on source datapipe before batching (see unbatch)buffer_size: int = 10000Example:
dp = ExampleIterPipe(10).shuffle()
for i in dp:
print(i)
shuffle operates on input mini-batches similar as on individual items
dp = ExampleIterPipe(10).batch(3).shuffle()
for i in dp:
print(i)
To shuffle elements across batches use shuffle(unbatch_level) followed by batch pattern
dp = ExampleIterPipe(10).batch(3).shuffle(unbatch_level = -1).batch(3)
for i in dp:
print(i)
Function: collate
Description:
Alternatives:
Arguments:
Example:
dp = ExampleIterPipe(10).batch(3).collate()
for i in dp:
print(i)
Function: groupby
Usage: dp.groupby(lambda x: x[0])
Description: Batching items by combining items with same key into same batch
Arguments:
group_key_fngroup_size - yield resulted group as soon as group_size elements accumulatedguaranteed_group_size:int = Noneunbatch_level:int = 0 if specified calls unbatch(unbatch_level=unbatch_level) on source datapipe before batching (see unbatch)As datasteam can be arbitrary large, grouping is done on best effort basis and there is no guarantee that same key will never present in the different groups. You can call it local groupby where locallity is the one DataPipe process/thread.
dp = ExampleIterPipe(10).shuffle().groupby(lambda x: x % 3)
for i in dp:
print(i)
By default group key function is applied to entire input (mini-batch)
dp = ExampleIterPipe(10).batch(3).groupby(lambda x: len(x))
for i in dp:
print(i)
It is possible to unnest items from the mini-batches using unbatch_level argument
dp = ExampleIterPipe(10).batch(3).groupby(lambda x: x % 3, unbatch_level = 1)
for i in dp:
print(i)
When internal buffer (defined by buffer_size) is overfilled, groupby will yield biggest group available
dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, buffer_size = 5)
for i in dp:
print(i)
groupby will produce group_size sized batches on as fast as possible basis
dp = ExampleIterPipe(18).shuffle().groupby(lambda x: x % 3, group_size = 3)
for i in dp:
print(i)
Remaining groups must be at least guaranteed_group_size big.
dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, group_size = 3, guaranteed_group_size = 2)
for i in dp:
print(i)
Without defined group_size function will try to accumulate at least guaranteed_group_size elements before yielding resulted group
dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, guaranteed_group_size = 2)
for i in dp:
print(i)
This behaviour becomes noticeable when data is bigger than buffer and some groups getting evicted before gathering all potential items
dp = ExampleIterPipe(15).groupby(lambda x: x % 3, guaranteed_group_size = 2, buffer_size = 6)
for i in dp:
print(i)
With randomness involved you might end up with incomplete groups (so next example expected to fail in most cases)
dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, guaranteed_group_size = 2, buffer_size = 6)
for i in dp:
print(i)
To avoid this error and drop incomplete groups, use drop_remaining argument
dp = ExampleIterPipe(15).shuffle().groupby(lambda x: x % 3, guaranteed_group_size = 2, buffer_size = 6, drop_remaining = True)
for i in dp:
print(i)
Function: zip
Description:
Alternatives:
Arguments:
Example:
_dp = ExampleIterPipe(5).shuffle()
dp = ExampleIterPipe(5).zip(_dp)
for i in dp:
print(i)
Function: fork
Description:
Alternatives:
Arguments:
Example:
dp = ExampleIterPipe(2)
dp1, dp2, dp3 = dp.fork(3)
for i in dp1 + dp2 + dp3:
print(i)
Function: demux
Description:
Alternatives:
Arguments:
Example:
dp = ExampleIterPipe(10)
dp1, dp2, dp3 = dp.demux(3, lambda x: x % 3)
for i in dp2:
print(i)
Function: mux
Description:
Alternatives:
Arguments:
Example:
dp1 = ExampleIterPipe(3)
dp2 = ExampleIterPipe(3).map(lambda x: x * 10)
dp3 = ExampleIterPipe(3).map(lambda x: x * 100)
dp = dp1.mux(dp2, dp3)
for i in dp:
print(i)
Function: concat
Description: Returns DataPipes with elements from the first datapipe following by elements from second datapipes
Alternatives:
`dp = dp.concat(dp2, dp3)`
`dp = dp.concat(*datapipes_list)`
Example:
dp = ExampleIterPipe(4)
dp2 = ExampleIterPipe(3)
dp = dp.concat(dp2)
for i in dp:
print(i)