import linecache
import re
import uuid
from dataclasses import is_dataclass
from threading import local
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Mapping,
Optional,
Type,
TypeVar,
)
import attr
from attr import NOTHING, resolve_types
from ._compat import (
adapted_fields,
get_args,
get_origin,
is_annotated,
is_bare,
is_generic,
)
from .generics import deep_copy_with
if TYPE_CHECKING: # pragma: no cover
from cattr.converters import Converter
[docs]@attr.s(slots=True, frozen=True)
class AttributeOverride:
omit_if_default: Optional[bool] = attr.ib(default=None)
rename: Optional[str] = attr.ib(default=None)
omit: bool = attr.ib(default=False) # Omit the field completely.
[docs]def override(omit_if_default=None, rename=None, omit: bool = False):
return AttributeOverride(
omit_if_default=omit_if_default, rename=rename, omit=omit
)
_neutral = AttributeOverride()
_already_generating = local()
T = TypeVar("T")
[docs]def make_dict_unstructure_fn(
cl,
converter,
omit_if_default: bool = False,
_cattrs_use_linecache: bool = True,
**kwargs,
):
"""Generate a specialized dict unstructuring function for an attrs class."""
origin = get_origin(cl)
attrs = adapted_fields(origin or cl) # type: ignore
if any(isinstance(a.type, str) for a in attrs):
# PEP 563 annotations - need to be resolved.
resolve_types(cl)
mapping = {}
if is_generic(cl):
mapping = _generate_mapping(cl, mapping)
for base in getattr(origin, "__orig_bases__", ()):
if is_generic(base) and not str(base).startswith("typing.Generic"):
mapping = _generate_mapping(base, mapping)
break
cl = origin
cl_name = cl.__name__
fn_name = "unstructure_" + cl_name
globs = {}
lines = []
post_lines = []
# We keep track of what we're generating to help with recursive
# class graphs.
try:
working_set = _already_generating.working_set
except AttributeError:
working_set = set()
_already_generating.working_set = working_set
if cl in working_set:
raise RecursionError()
else:
working_set.add(cl)
try:
lines.append(f"def {fn_name}(instance):")
lines.append(" res = {")
for a in attrs:
attr_name = a.name
override = kwargs.pop(attr_name, _neutral)
if override.omit:
continue
kn = attr_name if override.rename is None else override.rename
d = a.default
# For each attribute, we try resolving the type here and now.
# If a type is manually overwritten, this function should be
# regenerated.
handler = None
if a.type is not None:
t = a.type
if isinstance(t, TypeVar):
if t.__name__ in mapping:
t = mapping[t.__name__]
else:
handler = converter.unstructure
elif is_generic(t) and not is_bare(t) and not is_annotated(t):
t = deep_copy_with(t, mapping)
if handler is None:
try:
handler = converter._unstructure_func.dispatch(t)
except RecursionError:
# There's a circular reference somewhere down the line
handler = converter.unstructure
else:
handler = converter.unstructure
is_identity = handler == converter._unstructure_identity
if not is_identity:
unstruct_handler_name = f"unstructure_{attr_name}"
globs[unstruct_handler_name] = handler
invoke = f"{unstruct_handler_name}(instance.{attr_name})"
else:
invoke = f"instance.{attr_name}"
if d is not attr.NOTHING and (
(omit_if_default and override.omit_if_default is not False)
or override.omit_if_default
):
def_name = f"__cattr_def_{attr_name}"
if isinstance(d, attr.Factory):
globs[def_name] = d.factory
if d.takes_self:
post_lines.append(
f" if instance.{attr_name} != {def_name}(instance):"
)
else:
post_lines.append(
f" if instance.{attr_name} != {def_name}():"
)
post_lines.append(f" res['{kn}'] = {invoke}")
else:
globs[def_name] = d
post_lines.append(
f" if instance.{attr_name} != {def_name}:"
)
post_lines.append(f" res['{kn}'] = {invoke}")
else:
# No default or no override.
lines.append(f" '{kn}': {invoke},")
lines.append(" }")
total_lines = lines + post_lines + [" return res"]
script = "\n".join(total_lines)
fname = _generate_unique_filename(
cl, "unstructure", reserve=_cattrs_use_linecache
)
eval(compile(script, fname, "exec"), globs)
fn = globs[fn_name]
if _cattrs_use_linecache:
linecache.cache[fname] = len(script), None, total_lines, fname
finally:
working_set.remove(cl)
return fn
def _generate_mapping(
cl: Type, old_mapping: Dict[str, type]
) -> Dict[str, type]:
mapping = {}
for p, t in zip(get_origin(cl).__parameters__, get_args(cl)):
if isinstance(t, TypeVar):
continue
mapping[p.__name__] = t
if not mapping:
return old_mapping
return mapping
[docs]def make_dict_structure_fn(
cl: Type[T],
converter: "Converter",
_cattrs_forbid_extra_keys: bool = False,
_cattrs_use_linecache: bool = True,
_cattrs_prefer_attrib_converters: bool = False,
**kwargs,
) -> Callable[[Mapping[str, Any]], T]:
"""Generate a specialized dict structuring function for an attrs class."""
mapping = {}
if is_generic(cl):
base = get_origin(cl)
mapping = _generate_mapping(cl, mapping)
cl = base
for base in getattr(cl, "__orig_bases__", ()):
if is_generic(base) and not str(base).startswith("typing.Generic"):
mapping = _generate_mapping(base, mapping)
break
if isinstance(cl, TypeVar):
cl = mapping.get(cl.__name__, cl)
cl_name = cl.__name__
fn_name = "structure_" + cl_name
# We have generic parameters and need to generate a unique name for the function
for p in getattr(cl, "__parameters__", ()):
# This is nasty, I am not sure how best to handle `typing.List[str]` or `TClass[int, int]` as a parameter type here
name_base = mapping[p.__name__]
name = getattr(name_base, "__name__", None) or str(name_base)
name = re.sub(r"[\[\.\] ,]", "_", name)
fn_name += f"_{name}"
globs = {"__c_s": converter.structure, "__cl": cl}
lines = []
post_lines = []
attrs = adapted_fields(cl)
is_dc = is_dataclass(cl)
if any(isinstance(a.type, str) for a in attrs):
# PEP 563 annotations - need to be resolved.
resolve_types(cl)
lines.append(f"def {fn_name}(o, *_):")
lines.append(" res = {")
allowed_fields = set()
for a in attrs:
an = a.name
override = kwargs.pop(an, _neutral)
if override.omit:
continue
t = a.type
if isinstance(t, TypeVar):
t = mapping.get(t.__name__, t)
elif is_generic(t) and not is_bare(t) and not is_annotated(t):
t = deep_copy_with(t, mapping)
# For each attribute, we try resolving the type here and now.
# If a type is manually overwritten, this function should be
# regenerated.
if a.converter is not None and _cattrs_prefer_attrib_converters:
handler = None
elif (
a.converter is not None
and not _cattrs_prefer_attrib_converters
and t is not None
):
handler = converter._structure_func.dispatch(t)
if handler == converter._structure_error:
handler = None
elif t is not None:
handler = converter._structure_func.dispatch(t)
else:
handler = converter.structure
struct_handler_name = f"structure_{an}"
globs[struct_handler_name] = handler
ian = an if (is_dc or an[0] != "_") else an[1:]
kn = an if override.rename is None else override.rename
allowed_fields.add(kn)
globs[f"type_{an}"] = t
if a.default is NOTHING:
if handler:
lines.append(
f" '{ian}': {struct_handler_name}(o['{kn}'], type_{an}),"
)
else:
lines.append(f" '{ian}': o['{kn}'],")
else:
post_lines.append(f" if '{kn}' in o:")
if handler:
post_lines.append(
f" res['{ian}'] = {struct_handler_name}(o['{kn}'], type_{an})"
)
else:
post_lines.append(f" res['{ian}'] = o['{kn}']")
lines.append(" }")
if _cattrs_forbid_extra_keys:
globs["__c_a"] = allowed_fields
post_lines += [
" unknown_fields = set(o.keys()) - __c_a",
" if unknown_fields:",
" raise Exception(",
f" 'Extra fields in constructor for {cl_name}: ' + ', '.join(unknown_fields)"
" )",
]
total_lines = lines + post_lines + [" return __cl(**res)"]
fname = _generate_unique_filename(
cl, "structure", reserve=_cattrs_use_linecache
)
script = "\n".join(total_lines)
eval(compile(script, fname, "exec"), globs)
if _cattrs_use_linecache:
linecache.cache[fname] = len(script), None, total_lines, fname
return globs[fn_name]
[docs]def make_iterable_unstructure_fn(cl: Any, converter, unstructure_to=None):
"""Generate a specialized unstructure function for an iterable."""
handler = converter.unstructure
fn_name = "unstructure_iterable"
# Let's try fishing out the type args.
if getattr(cl, "__args__", None) is not None:
type_arg = get_args(cl)[0]
# We don't know how to handle the TypeVar on this level,
# so we skip doing the dispatch here.
if not isinstance(type_arg, TypeVar):
handler = converter._unstructure_func.dispatch(type_arg)
globs = {"__cattr_seq_cl": unstructure_to or cl, "__cattr_u": handler}
lines = []
lines.append(f"def {fn_name}(iterable):")
lines.append(" res = __cattr_seq_cl(__cattr_u(i) for i in iterable)")
total_lines = lines + [" return res"]
eval(compile("\n".join(total_lines), "", "exec"), globs)
fn = globs[fn_name]
return fn
[docs]def make_hetero_tuple_unstructure_fn(cl: Any, converter, unstructure_to=None):
"""Generate a specialized unstructure function for a heterogenous tuple."""
fn_name = "unstructure_tuple"
type_args = get_args(cl)
# We can do the dispatch here and now.
handlers = [
converter._unstructure_func.dispatch(type_arg)
for type_arg in type_args
]
globs = {f"__cattr_u_{i}": h for i, h in enumerate(handlers)}
if unstructure_to is not tuple:
globs["__cattr_seq_cl"] = unstructure_to or cl
lines = []
lines.append(f"def {fn_name}(tup):")
if unstructure_to is not tuple:
lines.append(" res = __cattr_seq_cl((")
else:
lines.append(" res = (")
for i in range(len(handlers)):
if handlers[i] == converter._unstructure_identity:
lines.append(f" tup[{i}],")
else:
lines.append(f" __cattr_u_{i}(tup[{i}]),")
if unstructure_to is not tuple:
lines.append(" ))")
else:
lines.append(" )")
total_lines = lines + [" return res"]
eval(compile("\n".join(total_lines), "", "exec"), globs)
fn = globs[fn_name]
return fn
[docs]def make_mapping_unstructure_fn(
cl: Any, converter, unstructure_to=None, key_handler=None
):
"""Generate a specialized unstructure function for a mapping."""
kh = key_handler or converter.unstructure
val_handler = converter.unstructure
fn_name = "unstructure_mapping"
# Let's try fishing out the type args.
if getattr(cl, "__args__", None) is not None:
args = get_args(cl)
if len(args) == 2:
key_arg, val_arg = args
else:
# Probably a Counter
key_arg, val_arg = args, Any
# We can do the dispatch here and now.
kh = key_handler or converter._unstructure_func.dispatch(key_arg)
if kh == converter._unstructure_identity:
kh = None
val_handler = converter._unstructure_func.dispatch(val_arg)
if val_handler == converter._unstructure_identity:
val_handler = None
globs = {
"__cattr_mapping_cl": unstructure_to or cl,
"__cattr_k_u": kh,
"__cattr_v_u": val_handler,
}
k_u = "__cattr_k_u(k)" if kh is not None else "k"
v_u = "__cattr_v_u(v)" if val_handler is not None else "v"
lines = []
lines.append(f"def {fn_name}(mapping):")
lines.append(
f" res = __cattr_mapping_cl(({k_u}, {v_u}) for k, v in mapping.items())"
)
total_lines = lines + [" return res"]
eval(compile("\n".join(total_lines), "", "exec"), globs)
fn = globs[fn_name]
return fn
[docs]def make_mapping_structure_fn(
cl: Any, converter, structure_to=dict, key_type=NOTHING, val_type=NOTHING
):
"""Generate a specialized unstructure function for a mapping."""
fn_name = "structure_mapping"
globs = {"__cattr_mapping_cl": structure_to}
lines = []
lines.append(f"def {fn_name}(mapping, _):")
# Let's try fishing out the type args.
if not is_bare(cl):
args = get_args(cl)
if len(args) == 2:
key_arg_cand, val_arg_cand = args
if key_type is NOTHING:
key_type = key_arg_cand
if val_type is NOTHING:
val_type = val_arg_cand
else:
if key_type is not NOTHING and val_type is NOTHING:
(val_type,) = args
elif key_type is NOTHING and val_type is not NOTHING:
(key_type,) = args
else:
# Probably a Counter
(key_type,) = args
val_type = Any
is_bare_dict = val_type is Any and key_type is Any
if not is_bare_dict:
# We can do the dispatch here and now.
key_handler = converter._structure_func.dispatch(key_type)
if key_handler == converter._structure_call:
key_handler = key_type
val_handler = converter._structure_func.dispatch(val_type)
if val_handler == converter._structure_call:
val_handler = val_type
globs["__cattr_k_t"] = key_type
globs["__cattr_v_t"] = val_type
globs["__cattr_k_s"] = key_handler
globs["__cattr_v_s"] = val_handler
k_s = (
"__cattr_k_s(k, __cattr_k_t)"
if key_handler != key_type
else "__cattr_k_s(k)"
)
v_s = (
"__cattr_v_s(v, __cattr_v_t)"
if val_handler != val_type
else "__cattr_v_s(v)"
)
else:
is_bare_dict = True
if is_bare_dict:
# No args, it's a bare dict.
lines.append(" res = dict(mapping)")
else:
lines.append(f" res = {{{k_s}: {v_s} for k, v in mapping.items()}}")
if structure_to is not dict:
lines.append(" res = __cattr_mapping_cl(res)")
total_lines = lines + [" return res"]
eval(compile("\n".join(total_lines), "", "exec"), globs)
fn = globs[fn_name]
return fn
def _generate_unique_filename(cls, func_name, reserve=True):
"""
Create a "filename" suitable for a function being generated.
"""
unique_id = uuid.uuid4()
extra = ""
count = 1
while True:
unique_filename = "<cattrs generated {0} {1}.{2}{3}>".format(
func_name,
cls.__module__,
getattr(cls, "__qualname__", cls.__name__),
extra,
)
if not reserve:
return unique_filename
# To handle concurrency we essentially "reserve" our spot in
# the linecache with a dummy line. The caller can then
# set this value correctly.
cache_line = (1, None, (str(unique_id),), unique_filename)
if (
linecache.cache.setdefault(unique_filename, cache_line)
== cache_line
):
return unique_filename
# Looks like this spot is taken. Try again.
count += 1
extra = "-{0}".format(count)