Source code for python_jsonschema_objects.classbuilder

import python_jsonschema_objects.util as util
import python_jsonschema_objects.validators as validators

import collections
import itertools
import six
import sys

import logging
logger = logging.getLogger(__name__)

# Long is no longer a thing in python3.x
if sys.version_info > (3,):
  long = int

[docs]class ProtocolBase(collections.MutableMapping): """ An instance of a class generated from the provided schema. All properties will be validated according to the definitions provided. However, whether or not all required properties have been provide will *not* be validated. Args: **props: Properties with which to populate the class object Returns: The class object populated with values Raises: validators.ValidationError: If any of the provided properties do not pass validation """ __propinfo__ = {} __required__ = set() __SCHEMA_TYPES__ = { 'array': list, 'boolean': bool, 'integer': int, 'number': (float, int, long), 'null': None, 'string': six.string_types, 'object': dict }
[docs] def as_dict(self): """ Return a dictionary containing the current values of the object. Returns: (dict): The object represented as a dictionary """ out = {} for prop in self: propval = getattr(self, prop) if isinstance(propval, list): out[prop] = [getattr(x, 'as_dict', lambda :x)() for x in propval] elif isinstance(propval, (ProtocolBase, LiteralValue)): out[prop] = propval.as_dict() elif propval is not None: out[prop] = propval return out
def for_json(self): return self.as_dict() def __eq__(self, other): return self.as_dict() == other.as_dict() def __str__(self): inverter = dict((v, k) for k,v in six.iteritems(self.__prop_names__)) props = ["%s" % (inverter.get(k, k),) for k, v in itertools.chain(six.iteritems(self._properties), six.iteritems(self._extended_properties))] return "<%s attributes: %s>" % (self.__class__.__name__, ", ".join(props)) def __repr__(self): inverter = dict((v, k) for k,v in six.iteritems(self.__prop_names__)) props = ["%s=%s" % (inverter.get(k, k), str(v)) for k, v in itertools.chain(six.iteritems(self._properties), six.iteritems(self._extended_properties))] return "<%s %s>" % ( self.__class__.__name__, " ".join(props) ) @classmethod
[docs] def from_json(cls, jsonmsg): """ Create an object directly from a JSON string. Applies general validation after creating the object to check whether all required fields are present. Args: jsonmsg (str): An object encoded as a JSON string Returns: An object of the generated type Raises: ValidationError: if `jsonmsg` does not match the schema `cls` was generated from """ import json msg = json.loads(jsonmsg) obj = cls(**msg) obj.validate() return obj
def __new__(cls, **props): """ Overridden to support oneOf, where we need to instantiate a different class depending on what value we've seen """ if getattr(cls, '__validation__', None) is None: new = super(ProtocolBase, cls).__new__ if new is object.__new__: return new(cls) return new(cls, **props) valid_types = cls.__validation__.get('type', None) if valid_types is None or not isinstance(valid_types, list): new = super(ProtocolBase, cls).__new__ if new is object.__new__: return new(cls) return new(cls, **props) obj = None validation_errors = [] for klass in valid_types: logger.debug("Attempting to instantiate {0} as {1}".format( cls, klass)) try: obj = klass(**props) except validators.ValidationError as e: validation_errors.append((klass, e)) else: break else: # We got nothing raise validators.ValidationError( "Unable to instantiate any valid types: \n" "\n".join("{0}: {1}".format(k, e) for k, e in validation_errors) ) return obj def __init__(self, **props): self._extended_properties = dict() self._properties = dict(zip(self.__prop_names__.values(), [None for x in six.moves.xrange(len(self.__prop_names__))])) for prop in props: try: logger.debug("Setting value for %s' to %s", prop, props[prop]) setattr(self, prop, props[prop]) except validators.ValidationError as e: import sys raise six.reraise(type(e), type(e)(str(e) + " \nwhile setting '{0}' in {1}".format( prop, self.__class__.__name__)), sys.exc_info()[2]) #if len(props) > 0: # self.validate() def __setattr__(self, name, val): if name.startswith("_"): object.__setattr__(self, name, val) elif name in self.__propinfo__: # If its in __propinfo__, then it actually has a property defined. # The property does special validation, so we actually need to # run its setter. We get it from the class definition and call # it directly. XXX Heinous. prop = self.__class__.__dict__[self.__prop_names__[name]] prop.fset(self, val) else: # This is an additional property of some kind typ = getattr(self, '__extensible__', None) if typ is False: raise validators.ValidationError( "Attempted to set unknown property '{0}', " "but 'additionalProperties' is false.".format(name)) if typ is True: # There is no type defined, so just make it a basic literal # Pick the type based on the type of the values valtype = [k for k, t in six.iteritems(self.__SCHEMA_TYPES__) if t is not None and isinstance(val, t)] valtype = valtype[0] val = MakeLiteral(name, valtype, val) elif isinstance(typ, type) and getattr(typ, 'isLiteralClass', None) is True: val = typ(val) elif isinstance(typ, type) and util.safe_issubclass(typ, ProtocolBase): val = typ(**util.coerce_for_expansion(val)) self._extended_properties[name] = val """ Implement collections.MutableMapping methods """ def __iter__(self): import itertools return itertools.chain(six.iterkeys(self._extended_properties), six.iterkeys(self._properties)) def __len__(self): return len(self._extended_properties) + len(self._properties) def __getitem__(self, key): return getattr(self, key) def __setitem__(self, key, val): return setattr(self,key, val) def __delitem__(self, key): return delattr(self, key) def __getattr__(self, name): if name not in self._extended_properties: raise AttributeError("{0} is not a valid property of {1}".format( name, self.__class__.__name__)) return self._extended_properties[name] @classmethod def propinfo(cls, propname): if propname not in cls.__propinfo__: return {} return cls.__propinfo__[propname] def serialize(self): self.validate() enc = util.ProtocolJSONEncoder() return enc.encode(self)
[docs] def validate(self): """ Applies all defined validation to the current state of the object, and raises an error if they are not all met. Raises: ValidationError: if validations do not pass """ propname = lambda x: self.__prop_names__[x] missing = [x for x in self.__required__ if propname(x) not in self._properties or self._properties[propname(x)] is None] if len(missing) > 0: raise validators.ValidationError( "'{0}' are required attributes for {1}" .format(missing, self.__class__.__name__)) for prop, val in six.iteritems(self._properties): if val is None: continue if isinstance(val, ProtocolBase): val.validate() elif getattr(val, 'isLiteralClass', None) is True: val.validate() elif isinstance(val, list): for subval in val: subval.validate() else: # This object is of the wrong type, but just try setting it # The property setter will enforce its correctness # and handily coerce its type at the same time setattr(self, prop, val) return True
def MakeLiteral(name, typ, value, **properties): properties.update({'type': typ}) klass = type(str(name), tuple((LiteralValue,)), { '__propinfo__': { '__literal__': properties} }) return klass(value) class LiteralValue(object): """Docstring for LiteralValue """ isLiteralClass = True def __init__(self, value, typ=None): """@todo: to be defined :value: @todo """ self._value = value self.validate() def as_dict(self): return self.for_json() def for_json(self): return self._value @classmethod def propinfo(cls, propname): if propname not in cls.__propinfo__: return {} return cls.__propinfo__[propname] def serialize(self): self.validate() enc = util.ProtocolJSONEncoder() return enc.encode(self) def __repr__(self): return "<%s %s>" % ( self.__class__.__name__, str(self._value) ) def validate(self): info = self.propinfo('__literal__') # this duplicates logic in validators.ArrayValidator.check_items; unify it. for param, paramval in sorted(six.iteritems(info), key=lambda x: x[0].lower() != 'type'): validator = validators.registry(param) if validator is not None: validator(paramval, self._value, info) def __cmp__(self, other): if isinstance(other, six.integer_types): return cmp(int(self), other) elif isinstance(other, six.string_types): return cmp(str(self), other) elif isinstance(other, float): return cmp(float(self), other) else: return cmp(id(self), id(other)) def __int__(self): return int(self._value) def __float__(self): return float(self._value) def __str__(self): return str(self._value) class ClassBuilder(object): def __init__(self, resolver): self.resolver = resolver self.resolved = {} def resolve_classes(self, iterable): pp = [] for elem in iterable: if '$ref' in elem: ref = elem['$ref'] uri = util.resolve_ref_uri(self.resolver.resolution_scope, ref) if uri in self.resolved: pp.append(self.resolved[uri]) else: with self.resolver.resolving(ref) as resolved: self.resolved[uri] = self.construct( uri, resolved, (ProtocolBase,)) pp.append(self.resolved[uri]) else: pp.append(elem) return pp def construct(self, uri, *args, **kw): """ Wrapper to debug things """ logger.debug("Constructing {0}".format(uri)) ret = self._construct(uri, *args, **kw) logger.debug("Constructed {0}".format(ret)) return ret def _construct(self, uri, clsdata, parent=(ProtocolBase,)): if 'anyOf' in clsdata: raise NotImplementedError( "anyOf is not supported as bare property") elif 'allOf' in clsdata: potential_parents = self.resolve_classes(clsdata['allOf']) parents = [] for p in potential_parents: if isinstance(p, dict): # This is additional constraints clsdata.update(p) elif util.safe_issubclass(p, ProtocolBase): parents.append(p) self.resolved[uri] = self._build_object( uri, clsdata, parents) return self.resolved[uri] elif '$ref' in clsdata: if 'type' in clsdata and util.safe_issubclass( clsdata['type'], (ProtocolBase, LiteralValue)): # It's possible that this reference was already resolved, in which # case it will have its type parameter set logger.debug("Using previously resolved type " "(with different URI) for %s", uri) self.resolved[uri] = clsdata['type'] elif uri in self.resolved: logger.debug("Using previously resolved object for %s", uri) else: logger.debug("Resolving object for %s", uri) with self.resolver.resolving(uri) as resolved: self.resolved[uri] = None # Set incase there is a circular reference in schema definition self.resolved[uri] = self._build_object( uri, resolved, parent) return self.resolved[uri] elif clsdata.get('type') == 'array' and 'items' in clsdata: self.resolved[uri] = self._build_object( uri, clsdata, parent) elif (clsdata.get('type', None) == 'object' or clsdata.get('properties', None) is not None or clsdata.get('additionalProperties', False)): self.resolved[uri] = self._build_object( uri, clsdata, parent) return self.resolved[uri] elif clsdata.get('type') in ('integer', 'number', 'string', 'boolean', 'null'): self.resolved[uri] = self._build_literal( uri, clsdata) return self.resolved[uri] elif 'enum' in clsdata: obj = self._build_literal(uri, clsdata) self.resolved[uri] = obj return obj elif 'type' in clsdata and util.safe_issubclass(clsdata['type'], ProtocolBase): self.resolved[uri] = clsdata.get('type') return self.resolved[uri] else: raise NotImplementedError( "Unable to parse schema object '{0}' with " "no type and no reference".format(clsdata)) def _build_literal(self, nm, clsdata): """@todo: Docstring for _build_literal :nm: @todo :clsdata: @todo :returns: @todo """ cls = type(str(nm), tuple((LiteralValue,)), { '__propinfo__': { '__literal__': clsdata} }) return cls def _build_object(self, nm, clsdata, parents): logger.debug("Building object {0}".format(nm)) props = {} properties = {} for p in parents: properties = util.propmerge(properties, p.__propinfo__) if 'properties' in clsdata: properties = util.propmerge(properties, clsdata['properties']) name_translation = {} for prop, detail in properties.items(): properties[prop]['raw_name'] = prop name_translation[prop] = prop.replace('@', '') prop = name_translation[prop] if detail.get('type', None) == 'object': uri = "{0}/{1}_{2}".format(nm, prop, "<anonymous>") self.resolved[uri] = self.construct( uri, detail, (ProtocolBase,)) props[prop] = make_property(prop, {'type': self.resolved[uri]}, self.resolved[uri].__doc__) properties[prop]['type'] = self.resolved[uri] elif 'type' not in detail and '$ref' in detail: ref = detail['$ref'] uri = util.resolve_ref_uri(self.resolver.resolution_scope, ref) if uri not in self.resolved: with self.resolver.resolving(ref) as resolved: self.resolved[uri] = self.construct( uri, resolved, (ProtocolBase,)) props[prop] = make_property(prop, {'type': self.resolved[uri]}, self.resolved[uri].__doc__) properties[prop]['$ref'] = uri properties[prop]['type'] = self.resolved[uri] elif 'oneOf' in detail: potential = self.resolve_classes(detail['oneOf']) logger.debug("Designating {0} as oneOf {1}".format(prop, potential)) desc = detail[ 'description'] if 'description' in detail else "" props[prop] = make_property(prop, {'type': potential}, desc ) elif 'type' in detail and detail['type'] == 'array': if 'items' in detail and isinstance(detail['items'], dict): if '$ref' in detail['items']: uri = util.resolve_ref_uri( self.resolver.resolution_scope, detail['items']['$ref']) typ = self.construct(uri, detail['items']) propdata = { 'type': 'array', 'validator': validators.ArrayValidator.create( uri, item_constraint=typ)} else: uri = "{0}/{1}_{2}".format(nm, prop, "<anonymous_field>") try: typ = self.construct(uri, detail['items']) propdata = {'type': 'array', 'validator': validators.ArrayValidator.create(uri, item_constraint=typ, addl_constraints=detail)} except NotImplementedError: typ = detail['items'] propdata = {'type': 'array', 'validator': validators.ArrayValidator.create(uri, item_constraint=typ, addl_constraints=detail)} props[prop] = make_property(prop, propdata, typ.__doc__) elif 'items' in detail: typs = [] for i, elem in enumerate(detail['items']): uri = "{0}/{1}/<anonymous_{2}>".format(nm, prop, i) typ = self.construct(uri, detail['items']) typs.append(typ) props[prop] = make_property(prop, {'type': 'tuple', 'items': typ}, typ.__doc__) else: desc = detail[ 'description'] if 'description' in detail else "" uri = "{0}/{1}".format(nm, prop) typ = self.construct(uri, detail) props[prop] = make_property(prop, {'type': typ}, desc) """ If this object itself has a 'oneOf' designation, then make the validation 'type' the list of potential objects. """ if 'oneOf' in clsdata: klasses = self.resolve_classes(clsdata['oneOf']) # Need a validation to check that it meets one of them props['__validation__'] = {'type': klasses} props['__extensible__'] = True if 'additionalProperties' in clsdata: addlProp = clsdata['additionalProperties'] if addlProp is False: props['__extensible__'] = False elif addlProp is True: props['__extensible__'] = True else: if '$ref' in addlProp: refs = self.resolve_classes([addlProp]) else: uri = "{0}/{1}_{2}".format(nm, "<additionalProperties>", "<anonymous>") self.resolved[uri] = self.construct( uri, addlProp, (ProtocolBase,)) refs = [self.resolved[uri]] props['__extensible__'] = refs[0] props['__prop_names__'] = name_translation props['__propinfo__'] = properties required = set.union(*[p.__required__ for p in parents]) if 'required' in clsdata: for prop in clsdata['required']: required.add(prop) invalid_requires = [req for req in required if req not in props['__propinfo__']] if len(invalid_requires) > 0: raise validators.ValidationError("Schema Definition Error: {0} schema requires " "'{1}', but properties are not defined" .format(nm, invalid_requires)) props['__required__'] = required cls = type(str(nm.split('/')[-1]), tuple(parents), props) return cls def make_property(prop, info, desc=""): def getprop(self): try: return self._properties[prop] except KeyError: raise AttributeError("No such attribute") def setprop(self, val): if isinstance(info['type'], (list, tuple)): ok = False errors = [] type_checks = [] for typ in info['type']: if not isinstance(typ, dict): type_checks.append(typ) continue typ = ProtocolBase.__SCHEMA_TYPES__[typ['type']] if typ == None: typ = type(None) if isinstance(typ, (list, tuple)): type_checks.extend(typ) else: type_checks.append(typ) for typ in type_checks: if isinstance(val, typ): ok = True break elif hasattr(typ, 'isLiteralClass'): try: validator = typ(val) except Exception as e: errors.append( "Failed to coerce to '{0}': {1}".format(typ, e)) pass else: validator.validate() ok = True break elif util.safe_issubclass(typ, ProtocolBase): # force conversion- thus the val rather than validator assignment try: val = typ(**util.coerce_for_expansion(val)) except Exception as e: errors.append( "Failed to coerce to '{0}': {1}".format(typ, e)) pass else: val.validate() ok = True break if not ok: errstr = "\n".join(errors) raise validators.ValidationError( "Object must be one of {0}: \n{1}".format(info['type'], errstr)) elif info['type'] == 'array': instance = info['validator'](val) val = instance.validate() elif getattr(info['type'], 'isLiteralClass', False) is True: if not isinstance(val, info['type']): validator = info['type'](val) validator.validate() elif util.safe_issubclass(info['type'], ProtocolBase): if not isinstance(val, info['type']): val = info['type'](**util.coerce_for_expansion(val)) val.validate() else: raise TypeError("Unknown object type: '{0}'".format(info['type'])) self._properties[prop] = val def delprop(self): if prop in self.__required__: raise AttributeError("'%s' is required" % prop) else: del self._properties[prop] return property(getprop, setprop, delprop, desc)