import python_jsonschema_objects.util as util
import python_jsonschema_objects.validators as validators
import python_jsonschema_objects.pattern_properties as pattern_properties
from python_jsonschema_objects.literals import LiteralValue
import copy
import collections
import itertools
import six
import sys
import logging
import python_jsonschema_objects.wrapper_types
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
# Long is no longer a thing in python3.x
if sys.version_info > (3,):
long = int
[docs]class ProtocolBase(collections.abc.MutableMapping):
"""An instance of a class generated from the provided
schema. All properties will be validated according to
the definitions provided. However, whether or not all required
properties have been provide will *not* be validated.
Args:
**props: Properties with which to populate the class object
Returns:
The class object populated with values
Raises:
validators.ValidationError: If any of the provided properties
do not pass validation
"""
__propinfo__ = {}
__required__ = set()
__has_default__ = set()
__object_attr_list__ = set(["_properties", "_extended_properties"])
[docs] def as_dict(self):
"""Return a dictionary containing the current values
of the object.
Returns:
(dict): The object represented as a dictionary
"""
out = {}
for prop in self:
propval = getattr(self, prop)
if hasattr(propval, "for_json"):
out[prop] = propval.for_json()
elif isinstance(propval, list):
out[prop] = [getattr(x, "for_json", lambda: x)() for x in propval]
elif isinstance(propval, (ProtocolBase, LiteralValue)):
out[prop] = propval.as_dict()
elif propval is not None:
out[prop] = propval
return out
def for_json(self):
return self.as_dict()
def __eq__(self, other):
if not isinstance(other, ProtocolBase):
return False
return self.as_dict() == other.as_dict()
def __str__(self):
inverter = dict((v, k) for k, v in six.iteritems(self.__prop_names__))
props = sorted(
[
"%s" % (inverter.get(k, k),)
for k, v in itertools.chain(
six.iteritems(self._properties),
six.iteritems(self._extended_properties),
)
]
)
return "<%s attributes: %s>" % (self.__class__.__name__, ", ".join(props))
def __repr__(self):
inverter = dict((v, k) for k, v in six.iteritems(self.__prop_names__))
props = sorted(
[
"%s=%s" % (inverter.get(k, k), repr(v))
for k, v in itertools.chain(
six.iteritems(self._properties),
six.iteritems(self._extended_properties),
)
]
)
return "<%s %s>" % (self.__class__.__name__, " ".join(props))
[docs] @classmethod
def from_json(cls, jsonmsg):
"""Create an object directly from a JSON string.
Applies general validation after creating the
object to check whether all required fields are
present.
Args:
jsonmsg (str): An object encoded as a JSON string
Returns:
An object of the generated type
Raises:
ValidationError: if `jsonmsg` does not match the schema
`cls` was generated from
"""
import json
msg = json.loads(jsonmsg)
obj = cls(**msg)
obj.validate()
return obj
def __deepcopy__(self, memo):
return self.__class__(**self.as_dict())
def __new__(cls, **props):
"""Overridden to support oneOf, where we need to
instantiate a different class depending on what
value we've seen"""
if getattr(cls, "__validation__", None) is None:
new = super(ProtocolBase, cls).__new__
if new is object.__new__:
return new(cls)
return new(cls, **props)
valid_types = cls.__validation__.get("type", None)
if valid_types is None or not isinstance(valid_types, list):
new = super(ProtocolBase, cls).__new__
if new is object.__new__:
return new(cls)
return new(cls, **props)
obj = None
validation_errors = []
for klass in valid_types:
logger.debug(
util.lazy_format("Attempting to instantiate {0} as {1}", cls, klass)
)
try:
obj = klass(**props)
obj.validate()
except validators.ValidationError as e:
validation_errors.append((klass, e))
else:
break
else: # We got nothing
raise validators.ValidationError(
"Unable to instantiate any valid types: \n"
"".join("{0}: {1}\n".format(k, e) for k, e in validation_errors)
)
return obj
def __init__(self, **props):
self._extended_properties = dict()
self._properties = dict(
zip(
self.__prop_names__.values(),
[None for x in six.moves.xrange(len(self.__prop_names__))],
)
)
# To support defaults, we have to actually execute the constructors
# but only for the ones that have defaults set.
for name in self.__has_default__:
if name not in props:
default_value = copy.deepcopy(self.__propinfo__[name]["default"])
logger.debug(
util.lazy_format("Initializing '{0}' to '{1}'", name, default_value)
)
setattr(self, name, default_value)
for prop in props:
try:
logger.debug(
util.lazy_format(
"Setting value for '{0}' to {1}", prop, props[prop]
)
)
if props[prop] is not None:
setattr(self, prop, props[prop])
except validators.ValidationError as e:
import sys
raise six.reraise(
type(e),
type(e)(
str(e)
+ " \nwhile setting '{0}' in {1}".format(
prop, self.__class__.__name__
)
),
sys.exc_info()[2],
)
if getattr(self, "__strict__", None):
self.validate()
def __setattr__(self, name, val):
name = str(name)
if name in self.__object_attr_list__:
object.__setattr__(self, name, val)
elif name in self.__propinfo__:
# If its in __propinfo__, then it actually has a property defined.
# The property does special validation, so we actually need to
# run its setter. We get it from the class definition and call
# it directly. XXX Heinous.
prop = getattr(self.__class__, self.__prop_names__[name])
prop.__set__(self, val)
else:
# This is an additional property of some kind
try:
val = self.__extensible__.instantiate(name, val)
except Exception as e:
raise validators.ValidationError(
"Attempted to set unknown property '{0}': {1} ".format(name, e)
)
self._extended_properties[name] = val
""" Implement collections.MutableMapping methods """
def __iter__(self):
import itertools
return itertools.chain(
six.iterkeys(self._extended_properties), six.iterkeys(self._properties)
)
def __len__(self):
return len(self._extended_properties) + len(self._properties)
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
raise KeyError(key)
def __getattr__(self, name):
name = str(name)
if name in self.__prop_names__:
raise AttributeError(name)
if name in self._extended_properties:
return self._extended_properties[name]
raise AttributeError(
"{0} is not a valid property of {1}".format(name, self.__class__.__name__)
)
def __setitem__(self, key, val):
key = str(key)
return setattr(self, key, val)
def __delitem__(self, key):
key = str(key)
return delattr(self, key)
def __delattr__(self, name):
name = str(name)
if name in self._extended_properties:
del self._extended_properties[name]
return
if name in self.__prop_names__:
prop = getattr(self.__class__, self.__prop_names__[name])
prop.__delete__(self)
return
return object.__delattr__(self, name)
@classmethod
def propinfo(cls, propname):
if propname not in cls.__propinfo__:
return {}
return cls.__propinfo__[propname]
def serialize(self, **opts):
self.validate()
enc = util.ProtocolJSONEncoder(**opts)
return enc.encode(self)
[docs] def validate(self):
"""Applies all defined validation to the current
state of the object, and raises an error if
they are not all met.
Raises:
ValidationError: if validations do not pass
"""
missing = self.missing_property_names()
if len(missing) > 0:
raise validators.ValidationError(
"'{0}' are required attributes for {1}".format(
missing, self.__class__.__name__
)
)
for prop, val in six.iteritems(self._properties):
if val is None:
continue
if isinstance(val, ProtocolBase):
val.validate()
elif getattr(val, "isLiteralClass", None) is True:
val.validate()
elif isinstance(val, list):
for subval in val:
subval.validate()
else:
# This object is of the wrong type, but just try setting it
# The property setter will enforce its correctness
# and handily coerce its type at the same time
setattr(self, prop, val)
return True
[docs] def missing_property_names(self):
"""
Returns a list of properties which are required and missing.
Properties are excluded from this list if they are allowed to be null.
:return: list of missing properties.
"""
propname = lambda x: self.__prop_names__[x]
missing = []
for x in self.__required__:
# Allow the null type
propinfo = self.propinfo(propname(x))
null_type = False
if "type" in propinfo:
type_info = propinfo["type"]
null_type = (
type_info == "null"
or isinstance(type_info, (list, tuple))
and "null" in type_info
)
elif "oneOf" in propinfo:
for o in propinfo["oneOf"]:
type_info = o.get("type")
if (
type_info
and type_info == "null"
or isinstance(type_info, (list, tuple))
and "null" in type_info
):
null_type = True
break
if (propname(x) not in self._properties and null_type) or (
self._properties[propname(x)] is None and not null_type
):
missing.append(x)
return missing
class TypeRef(object):
def __init__(self, ref_uri, resolved):
self._resolved = resolved
self._ref_uri = ref_uri
self._class = None
self.__doc__ = "Reference to {}".format(ref_uri)
@property
def ref_class(self):
if self._class is None:
self._class = self._resolved.get(self._ref_uri)
return self._class
def __call__(self, *args, **kwargs):
cls = self.ref_class
return cls(*args, **kwargs)
def __str__(self):
return self.__doc__
def __repr__(self):
return "<{}>".format(self.__doc__)
class TypeProxy(object):
slots = ("__title__", "_types")
def __init__(self, types, title=None):
self.__title__ = title
self._types = types
def from_json(self, jsonmsg):
import json
msg = json.loads(jsonmsg)
obj = self(**msg)
obj.validate()
return obj
def __call__(self, *a, **kw):
validation_errors = []
valid_types = self._types
for klass in valid_types:
logger.debug(
util.lazy_format(
"Attempting to instantiate {0} as {1}", self.__class__, klass
)
)
try:
obj = klass(*a, **kw)
obj.validate()
except TypeError as e:
validation_errors.append((klass, e))
except validators.ValidationError as e:
validation_errors.append((klass, e))
else:
return obj
else: # We got nothing
raise validators.ValidationError(
"Unable to instantiate any valid types: \n"
"".join("{0}: {1}\n".format(k, e) for k, e in validation_errors)
)
class ClassBuilder(object):
def __init__(self, resolver):
self.resolver = resolver
self.resolved = {}
self.under_construction = set()
def expand_references(self, source_uri, iterable):
"""Give an iterable of jsonschema descriptors, expands any
of them that are $ref objects, and otherwise leaves them alone.
"""
pp = []
for elem in iterable:
if "$ref" in elem:
pp.append(self.resolve_type(elem["$ref"], source_uri))
else:
pp.append(elem)
return pp
def resolve_type(self, ref, source):
""" Return a resolved type for a URI, potentially constructing one if necessary"""
uri = util.resolve_ref_uri(self.resolver.resolution_scope, ref)
if uri in self.resolved:
return self.resolved[uri]
elif uri in self.under_construction:
logger.debug(
util.lazy_format(
"Using a TypeRef to avoid a cyclic reference for {0} -> {1} ",
uri,
source,
)
)
return TypeRef(uri, self.resolved)
else:
logger.debug(
util.lazy_format(
"Resolving direct reference object {0} -> {1}", source, uri
)
)
with self.resolver.resolving(ref) as resolved:
self.resolved[uri] = self.construct(uri, resolved, (ProtocolBase,))
return self.resolved[uri]
def construct(self, uri, *args, **kw):
""" Wrapper to debug things """
logger.debug(util.lazy_format("Constructing {0}", uri))
if ("override" not in kw or kw["override"] is False) and uri in self.resolved:
logger.debug(util.lazy_format("Using existing {0}", uri))
assert self.resolved[uri] is not None
return self.resolved[uri]
else:
ret = self._construct(uri, *args, **kw)
logger.debug(util.lazy_format("Constructed {0}", ret))
return ret
def _construct(self, uri, clsdata, parent=(ProtocolBase,), **kw):
if "anyOf" in clsdata:
raise NotImplementedError("anyOf is not supported as bare property")
elif "oneOf" in clsdata:
"""If this object itself has a 'oneOf' designation,
then construct a TypeProxy.
"""
klasses = self.construct_objects(clsdata["oneOf"], uri)
logger.debug(
util.lazy_format("Designating {0} as TypeProxy for {1}", uri, klasses)
)
self.resolved[uri] = TypeProxy(klasses, title=clsdata.get("title"))
return self.resolved[uri]
elif "allOf" in clsdata:
potential_parents = self.expand_references(uri, clsdata["allOf"])
parents = []
for p in potential_parents:
if isinstance(p, dict):
# This is additional constraints
clsdata.update(p)
elif util.safe_issubclass(p, ProtocolBase):
parents.append(p)
self.resolved[uri] = self._build_object(uri, clsdata, parents, **kw)
return self.resolved[uri]
elif "$ref" in clsdata:
if "type" in clsdata and util.safe_issubclass(
clsdata["type"], (ProtocolBase, LiteralValue)
):
# It's possible that this reference was already resolved, in which
# case it will have its type parameter set
logger.debug(
util.lazy_format(
"Using previously resolved type "
"(with different URI) for {0}",
uri,
)
)
self.resolved[uri] = clsdata["type"]
elif uri in self.resolved:
logger.debug(
util.lazy_format("Using previously resolved object for {0}", uri)
)
else:
ref = clsdata["$ref"]
typ = self.resolve_type(ref, uri)
self.resolved[uri] = typ
return self.resolved[uri]
elif clsdata.get("type") == "array" and "items" in clsdata:
clsdata_copy = {}
clsdata_copy.update(clsdata)
self.resolved[
uri
] = python_jsonschema_objects.wrapper_types.ArrayWrapper.create(
uri,
item_constraint=clsdata_copy.pop("items"),
classbuilder=self,
**clsdata_copy
)
return self.resolved[uri]
elif isinstance(clsdata.get("type"), list):
types = []
for i, item_detail in enumerate(clsdata["type"]):
subdata = {k: v for k, v in six.iteritems(clsdata) if k != "type"}
subdata["type"] = item_detail
types.append(self._build_literal(uri + "_%s" % i, subdata))
self.resolved[uri] = TypeProxy(types)
return self.resolved[uri]
elif (
clsdata.get("type", None) == "object"
or clsdata.get("properties", None) is not None
or clsdata.get("additionalProperties", False)
):
self.resolved[uri] = self._build_object(uri, clsdata, parent, **kw)
return self.resolved[uri]
elif clsdata.get("type") in ("integer", "number", "string", "boolean", "null"):
self.resolved[uri] = self._build_literal(uri, clsdata)
return self.resolved[uri]
elif "enum" in clsdata:
obj = self._build_literal(uri, clsdata)
self.resolved[uri] = obj
return obj
elif "type" in clsdata and util.safe_issubclass(clsdata["type"], ProtocolBase):
self.resolved[uri] = clsdata.get("type")
return self.resolved[uri]
else:
raise NotImplementedError(
"Unable to parse schema object '{0}' with "
"no type and no reference".format(clsdata)
)
def _build_literal(self, nm, clsdata):
"""@todo: Docstring for _build_literal
:nm: @todo
:clsdata: @todo
:returns: @todo
"""
cls = type(
str(nm),
tuple((LiteralValue,)),
{
"__propinfo__": {
"__literal__": clsdata,
"__title__": clsdata.get("title"),
"__default__": clsdata.get("default"),
}
},
)
return cls
def _build_object(self, nm, clsdata, parents, **kw):
logger.debug(util.lazy_format("Building object {0}", nm))
# To support circular references, we tag objects that we're
# currently building as "under construction"
self.under_construction.add(nm)
props = {}
defaults = set()
properties = {}
for p in parents:
properties = util.propmerge(properties, p.__propinfo__)
if "properties" in clsdata:
properties = util.propmerge(properties, clsdata["properties"])
name_translation = {}
for prop, detail in properties.items():
logger.debug(util.lazy_format("Handling property {0}.{1}", nm, prop))
properties[prop]["raw_name"] = prop
name_translation[prop] = prop.replace("@", "")
prop = name_translation[prop]
if detail.get("default", None) is not None:
defaults.add(prop)
if detail.get("type", None) == "object":
uri = "{0}/{1}_{2}".format(nm, prop, "<anonymous>")
self.resolved[uri] = self.construct(uri, detail, (ProtocolBase,))
props[prop] = make_property(
prop, {"type": self.resolved[uri]}, self.resolved[uri].__doc__
)
properties[prop]["type"] = self.resolved[uri]
elif "type" not in detail and "$ref" in detail:
ref = detail["$ref"]
typ = self.resolve_type(ref, ".".join([nm, prop]))
props[prop] = make_property(prop, {"type": typ}, typ.__doc__)
properties[prop]["$ref"] = ref
properties[prop]["type"] = typ
elif "oneOf" in detail:
potential = self.expand_references(nm, detail["oneOf"])
logger.debug(
util.lazy_format("Designating {0} as oneOf {1}", prop, potential)
)
desc = detail["description"] if "description" in detail else ""
props[prop] = make_property(prop, {"type": potential}, desc)
elif "type" in detail and detail["type"] == "array":
if "items" in detail and isinstance(detail["items"], dict):
if "$ref" in detail["items"]:
typ = self.resolve_type(detail["items"]["$ref"], nm)
constraints = copy.copy(detail)
constraints["strict"] = kw.get("strict")
propdata = {
"type": "array",
"validator": python_jsonschema_objects.wrapper_types.ArrayWrapper.create(
nm, item_constraint=typ, **constraints
),
}
else:
uri = "{0}/{1}_{2}".format(nm, prop, "<anonymous_field>")
try:
if "oneOf" in detail["items"]:
typ = TypeProxy(
self.construct_objects(
detail["items"]["oneOf"], uri
)
)
else:
typ = self.construct(uri, detail["items"])
constraints = copy.copy(detail)
constraints["strict"] = kw.get("strict")
propdata = {
"type": "array",
"validator": python_jsonschema_objects.wrapper_types.ArrayWrapper.create(
uri, item_constraint=typ, **constraints
),
}
except NotImplementedError:
typ = detail["items"]
constraints = copy.copy(detail)
constraints["strict"] = kw.get("strict")
propdata = {
"type": "array",
"validator": python_jsonschema_objects.wrapper_types.ArrayWrapper.create(
uri, item_constraint=typ, **constraints
),
}
props[prop] = make_property(prop, propdata, typ.__doc__)
elif "items" in detail:
typs = []
for i, elem in enumerate(detail["items"]):
uri = "{0}/{1}/<anonymous_{2}>".format(nm, prop, i)
typ = self.construct(uri, elem)
typs.append(typ)
props[prop] = make_property(prop, {"type": typs})
else:
desc = detail["description"] if "description" in detail else ""
uri = "{0}/{1}".format(nm, prop)
typ = self.construct(uri, detail)
props[prop] = make_property(prop, {"type": typ}, desc)
props["__extensible__"] = pattern_properties.ExtensibleValidator(
nm, clsdata, self
)
props["__prop_names__"] = name_translation
props["__propinfo__"] = properties
required = set.union(*[p.__required__ for p in parents])
if "required" in clsdata:
for prop in clsdata["required"]:
required.add(prop)
invalid_requires = [req for req in required if req not in props["__propinfo__"]]
if len(invalid_requires) > 0:
raise validators.ValidationError(
"Schema Definition Error: {0} schema requires "
"'{1}', but properties are not defined".format(nm, invalid_requires)
)
props["__required__"] = required
props["__has_default__"] = defaults
if required and kw.get("strict"):
props["__strict__"] = True
props["__title__"] = clsdata.get("title")
cls = type(str(nm.split("/")[-1]), tuple(parents), props)
self.under_construction.remove(nm)
return cls
def construct_objects(self, oneOfList, uri):
return [
self.construct(uri + "_%s" % i, item_detail)
if "$ref" not in item_detail
else self.resolve_type(item_detail["$ref"], uri + "_%s" % i)
for i, item_detail in enumerate(oneOfList)
]
def make_property(prop, info, desc=""):
from . import descriptors
prop = descriptors.AttributeDescriptor(prop, info, desc)
return prop