summary
from nbdev.showdoc import *
bs = 4
letters = list(string.ascii_lowercase)

DataLoader

fa_collate[source]

fa_collate(t)

#e.g. x is int, y is tuple
t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])

t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])

fa_convert[source]

fa_convert(t)

t0 = array([1,2])
t = [t0,(t0,t0)]

test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])

class SkipItemException[source]

SkipItemException() :: Exception

Common base class for all non-exit exceptions.

class DataLoader[source]

DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)

Data loader. Combines a dataset and a sampler, and provides an iterable over the given dataset.

The :class:~torch.utils.data.DataLoader supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning.

See :py:mod:torch.utils.data documentation page for more details.

Arguments: dataset (Dataset): dataset from which to load the data. batch_size (int, optional): how many samples per batch to load (default: 1). shuffle (bool, optional): set to True to have the data reshuffled at every epoch (default: False). sampler (Sampler, optional): defines the strategy to draw samples from the dataset. If specified, :attr:shuffle must be False. batch_sampler (Sampler, optional): like :attr:sampler, but returns a batch of indices at a time. Mutually exclusive with :attr:batch_size, :attr:shuffle, :attr:sampler, and :attr:drop_last. num_workers (int, optional): how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0) collate_fn (callable, optional): merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. pin_memory (bool, optional): If True, the data loader will copy Tensors into CUDA pinned memory before returning them. If your data elements are a custom type, or your :attr:collate_fn returns a batch that is a custom type, see the example below. drop_last (bool, optional): set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False) timeout (numeric, optional): if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0) worker_init_fn (callable, optional): If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

.. warning:: If the spawn start method is used, :attr:worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See :ref:multiprocessing-best-practices on more details related to multiprocessing in PyTorch.

.. note:: len(dataloader) heuristic is based on the length of the sampler used. When :attr:dataset is an :class:~torch.utils.data.IterableDataset, len(dataset) (if implemented) is returned instead, regardless of multi-process loading configurations, because PyTorch trust user :attr:dataset code in correctly handling multi-process loading to avoid duplicate data. See Dataset Types for more details on these two types of datasets and how :class:~torch.utils.data.IterableDataset interacts with Multi-process data loading.

Override item and use the default infinite sampler to get a stream of unknown length (stop() when you want to stop the stream).

class RandDL(DataLoader):
    def create_item(self, s):
        r = random.random()
        return r if r<0.95 else stop()

L(RandDL())
(#6) [0.14786435518384977,0.2650330681194001,0.6431452951690536,0.09458671548019948,0.8696155162247328,0.29837767606261467]
L(RandDL(bs=4, drop_last=True)).map(len)
(#0) []
dl = RandDL(bs=4, num_workers=4, drop_last=True)
L(dl).map(len)
(#28) [4,4,4,4,4,4,4,4,4,4...]
test_eq(dl.fake_l.num_workers, 4)
with dl.fake_l.no_multiproc(): 
    test_eq(dl.fake_l.num_workers, 0)
    L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)
def _rand_item(s):
    r = random.random()
    return r if r<0.95 else stop()

L(DataLoader(create_item=_rand_item))
(#48) [0.3975703088063097,0.9300043261284903,0.21348763857582242,0.414557522602783,0.10987859373433917,0.8210019528840916,0.3265409454362096,0.37081412769534416,0.3355937609498155,0.8700538355944498...]

If you don't set bs, then dataset is assumed to provide an iterator or a __getitem__ that returns a batch.

ds1 = DataLoader(letters)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

test_shuffled(L(DataLoader(letters, shuffle=True)), letters)

ds1 = DataLoader(letters, indexed=False)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)

t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t3.map(tensor))

ds4 = DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)

If you do set bs, then dataset is assumed to provide an iterator or a __getitem__ that returns a single item of a batch.

def twoepochs(d): return ' '.join(''.join(o) for _ in range(2) for o in d)
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')

ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')

ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))

ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)

it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])
class SleepyDL(list):
    def __getitem__(self,i):
        time.sleep(random.random()/50)
        return super().__getitem__(i)

t = SleepyDL(letters)

%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)

dl = DataLoader(t, shuffle=True, num_workers=1)
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl))
CPU times: user 3.83 ms, sys: 0 ns, total: 3.83 ms
Wall time: 223 ms
CPU times: user 6.58 ms, sys: 15.6 ms, total: 22.2 ms
Wall time: 183 ms
CPU times: user 13.5 ms, sys: 21.5 ms, total: 35 ms
Wall time: 126 ms
class SleepyQueue():
    "Simulate a queue with varying latency"
    def __init__(self, q): self.q=q
    def __iter__(self):
        while True:
            time.sleep(random.random()/100)
            try: yield self.q.get_nowait()
            except queues.Empty: return

q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)

%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30))
CPU times: user 7.89 ms, sys: 26.9 ms, total: 34.8 ms
Wall time: 89.4 ms
class A(TensorBase): pass

for nw in (0,2):
    t = A(tensor([1,2]))
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = first(dl)
    test_eq(type(b), A)

    t = (A(tensor([1,2])),)
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = first(dl)
    test_eq(type(b[0]), A)
class A(TensorBase): pass
t = A(tensor(1,2))

tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = first(tdl)
test_eq(type(b), A)

# Unknown attributes are delegated to `dataset`
test_eq(tdl.pop(), tensor(1,2))