Source code for category_theory.par_operations
import functools
import typing
from itertools import zip_longest
import dask
from .core import Monoid
[docs]
def chunkify(
chunk_size: int,
iterable: typing.Iterable[typing.Any],
fillvalue: typing.Any = None,
) -> typing.Iterable[typing.Iterable[typing.Any]]:
"""Split iterable into chunks of size `chunk_size`.
If all chunks does not add up, it will use the
`fillvalue` in the remaining spots
Parameters
----------
chunk_size : int
Number of elements in each chunk
iterable : typing.Iterable[typing.Any]
The iterable that should be chunkified
fillvalue : typing.Any, optional
A value to put in those places when the
chunk size does not add up, by default None
Returns
-------
typing.Iterable[typing.Iterable[typing.Any]]
A list of new iterables, each being of size `chunk_size`.
Example
-------
.. code:: python
>> iterable = (1, 2, 3, 4, 5)
>> chunkify(3, iterable, fillvalue=None)
((1, 2, 3), (4, 5, None))
"""
args = [iter(iterable)] * chunk_size
return zip_longest(fillvalue=fillvalue, *args)
[docs]
def fold(
iterable: typing.Iterable[Monoid],
cls: typing.Type,
chunk_size=1000,
) -> typing.Any:
"""Fold an iterable of Monoids together using the identity
element as initial.
Parameters
----------
iterable : typing.Iterable[Monoid]
An iterable of monoids that should be squashed
cls : typing.Type
The type of the monoid
Returns
-------
typing.Any
The reduction of the iterable.
"""
output = []
for chunk in chunkify(chunk_size, iterable, fillvalue=cls.e()):
future = dask.delayed(functools.reduce)(lambda x, y: x + y, chunk, cls.e())
output.append(future)
return dask.delayed(functools.reduce)(lambda x, y: x + y, output, cls.e())