import numbers
import warnings
import tlz as toolz
from .. import base, utils
from ..blockwise import blockwise as core_blockwise
from ..delayed import unpack_collections
from ..highlevelgraph import HighLevelGraph
[docs]def blockwise(
func,
out_ind,
*args,
name=None,
token=None,
dtype=None,
adjust_chunks=None,
new_axes=None,
align_arrays=True,
concatenate=None,
meta=None,
**kwargs,
):
"""Tensor operation: Generalized inner and outer products
A broad class of blocked algorithms and patterns can be specified with a
concise multi-index notation. The ``blockwise`` function applies an in-memory
function across multiple blocks of multiple inputs in a variety of ways.
Many dask.array operations are special cases of blockwise including
elementwise, broadcasting, reductions, tensordot, and transpose.
Parameters
----------
func : callable
Function to apply to individual tuples of blocks
out_ind : iterable
Block pattern of the output, something like 'ijk' or (1, 2, 3)
*args : sequence of Array, index pairs
Sequence like (x, 'ij', y, 'jk', z, 'i')
**kwargs : dict
Extra keyword arguments to pass to function
dtype : np.dtype
Datatype of resulting array.
concatenate : bool, keyword only
If true concatenate arrays along dummy indices, else provide lists
adjust_chunks : dict
Dictionary mapping index to function to be applied to chunk sizes
new_axes : dict, keyword only
New indexes and their dimension lengths
Examples
--------
2D embarrassingly parallel operation from two arrays, x, and y.
>>> import operator, numpy as np, dask.array as da
>>> x = da.from_array([[1, 2],
... [3, 4]], chunks=(1, 2))
>>> y = da.from_array([[10, 20],
... [0, 0]])
>>> z = blockwise(operator.add, 'ij', x, 'ij', y, 'ij', dtype='f8')
>>> z.compute()
array([[11, 22],
[ 3, 4]])
Outer product multiplying a by b, two 1-d vectors
>>> a = da.from_array([0, 1, 2], chunks=1)
>>> b = da.from_array([10, 50, 100], chunks=1)
>>> z = blockwise(np.outer, 'ij', a, 'i', b, 'j', dtype='f8')
>>> z.compute()
array([[ 0, 0, 0],
[ 10, 50, 100],
[ 20, 100, 200]])
z = x.T
>>> z = blockwise(np.transpose, 'ji', x, 'ij', dtype=x.dtype)
>>> z.compute()
array([[1, 3],
[2, 4]])
The transpose case above is illustrative because it does transposition
both on each in-memory block by calling ``np.transpose`` and on the order
of the blocks themselves, by switching the order of the index ``ij -> ji``.
We can compose these same patterns with more variables and more complex
in-memory functions
z = X + Y.T
>>> z = blockwise(lambda x, y: x + y.T, 'ij', x, 'ij', y, 'ji', dtype='f8')
>>> z.compute()
array([[11, 2],
[23, 4]])
Any index, like ``i`` missing from the output index is interpreted as a
contraction (note that this differs from Einstein convention; repeated
indices do not imply contraction.) In the case of a contraction the passed
function should expect an iterable of blocks on any array that holds that
index. To receive arrays concatenated along contracted dimensions instead
pass ``concatenate=True``.
Inner product multiplying a by b, two 1-d vectors
>>> def sequence_dot(a_blocks, b_blocks):
... result = 0
... for a, b in zip(a_blocks, b_blocks):
... result += a.dot(b)
... return result
>>> z = blockwise(sequence_dot, '', a, 'i', b, 'i', dtype='f8')
>>> z.compute()
250
Add new single-chunk dimensions with the ``new_axes=`` keyword, including
the length of the new dimension. New dimensions will always be in a single
chunk.
>>> def f(a):
... return a[:, None] * np.ones((1, 5))
>>> z = blockwise(f, 'az', a, 'a', new_axes={'z': 5}, dtype=a.dtype)
New dimensions can also be multi-chunk by specifying a tuple of chunk
sizes. This has limited utility as is (because the chunks are all the
same), but the resulting graph can be modified to achieve more useful
results (see ``da.map_blocks``).
>>> z = blockwise(f, 'az', a, 'a', new_axes={'z': (5, 5)}, dtype=x.dtype)
>>> z.chunks
((1, 1, 1), (5, 5))
If the applied function changes the size of each chunk you can specify this
with a ``adjust_chunks={...}`` dictionary holding a function for each index
that modifies the dimension size in that index.
>>> def double(x):
... return np.concatenate([x, x])
>>> y = blockwise(double, 'ij', x, 'ij',
... adjust_chunks={'i': lambda n: 2 * n}, dtype=x.dtype)
>>> y.chunks
((2, 2), (2,))
Include literals by indexing with None
>>> z = blockwise(operator.add, 'ij', x, 'ij', 1234, None, dtype=x.dtype)
>>> z.compute()
array([[1235, 1236],
[1237, 1238]])
"""
out = name
new_axes = new_axes or {}
# Input Validation
if len(set(out_ind)) != len(out_ind):
raise ValueError(
"Repeated elements not allowed in output index",
[k for k, v in toolz.frequencies(out_ind).items() if v > 1],
)
new = (
set(out_ind)
- {a for arg in args[1::2] if arg is not None for a in arg}
- set(new_axes or ())
)
if new:
raise ValueError("Unknown dimension", new)
from .core import normalize_arg, unify_chunks
if align_arrays:
chunkss, arrays = unify_chunks(*args)
else:
arginds = [(a, i) for (a, i) in toolz.partition(2, args) if i is not None]
chunkss = {}
# For each dimension, use the input chunking that has the most blocks;
# this will ensure that broadcasting works as expected, and in
# particular the number of blocks should be correct if the inputs are
# consistent.
for arg, ind in arginds:
for c, i in zip(arg.chunks, ind):
if i not in chunkss or len(c) > len(chunkss[i]):
chunkss[i] = c
arrays = args[::2]
for k, v in new_axes.items():
if not isinstance(v, tuple):
v = (v,)
chunkss[k] = v
arginds = zip(arrays, args[1::2])
numblocks = {}
dependencies = []
arrays = []
# Normalize arguments
argindsstr = []
for arg, ind in arginds:
if ind is None:
arg = normalize_arg(arg)
arg, collections = unpack_collections(arg)
dependencies.extend(collections)
else:
if (
hasattr(arg, "ndim")
and hasattr(ind, "__len__")
and arg.ndim != len(ind)
):
raise ValueError(
"Index string %s does not match array dimension %d"
% (ind, arg.ndim)
)
numblocks[arg.name] = arg.numblocks
arrays.append(arg)
arg = arg.name
argindsstr.extend((arg, ind))
# Normalize keyword arguments
kwargs2 = {}
for k, v in kwargs.items():
v = normalize_arg(v)
v, collections = unpack_collections(v)
dependencies.extend(collections)
kwargs2[k] = v
# Finish up the name
if not out:
out = "{}-{}".format(
token or utils.funcname(func).strip("_"),
base.tokenize(func, out_ind, argindsstr, dtype, **kwargs),
)
graph = core_blockwise(
func,
out,
out_ind,
*argindsstr,
numblocks=numblocks,
dependencies=dependencies,
new_axes=new_axes,
concatenate=concatenate,
**kwargs2,
)
graph = HighLevelGraph.from_collections(
out, graph, dependencies=arrays + dependencies
)
chunks = [chunkss[i] for i in out_ind]
if adjust_chunks:
for i, ind in enumerate(out_ind):
if ind in adjust_chunks:
if callable(adjust_chunks[ind]):
chunks[i] = tuple(map(adjust_chunks[ind], chunks[i]))
elif isinstance(adjust_chunks[ind], numbers.Integral):
chunks[i] = tuple(adjust_chunks[ind] for _ in chunks[i])
elif isinstance(adjust_chunks[ind], (tuple, list)):
if len(adjust_chunks[ind]) != len(chunks[i]):
raise ValueError(
f"Dimension {i} has {len(chunks[i])} blocks, adjust_chunks "
f"specified with {len(adjust_chunks[ind])} blocks"
)
chunks[i] = tuple(adjust_chunks[ind])
else:
raise NotImplementedError(
"adjust_chunks values must be callable, int, or tuple"
)
chunks = tuple(chunks)
if meta is None:
from .utils import compute_meta
meta = compute_meta(func, dtype, *args[::2], **kwargs)
return new_da_object(graph, out, chunks, meta=meta, dtype=dtype)
def atop(*args, **kwargs):
warnings.warn("The da.atop function has moved to da.blockwise")
return blockwise(*args, **kwargs)
from .core import new_da_object