Source code for plone.tiles.data

import logging
import urllib

from zope.interface import implements, implementer, Interface
from zope.component import adapts, adapter, queryUtility, getMultiAdapter
from zope.component.interfaces import ComponentLookupError

from zope.schema import getFieldsInOrder, getFields
from zope.schema.interfaces import ISequence

from zope.annotation.interfaces import IAnnotations

from plone.tiles.interfaces import ITileType
from plone.tiles.interfaces import ITile
from plone.tiles.interfaces import IPersistentTile
from plone.tiles.interfaces import ITileDataManager
from plone.tiles.interfaces import ITileDataContext
from plone.tiles.interfaces import IFieldTypeConverter

from persistent.dict import PersistentDict

try:
    import json
except:
    import simplejson as json


ANNOTATIONS_KEY_PREFIX = u'plone.tiles.data'
LOGGER = logging.getLogger('plone.tiles')


[docs]class TransientTileDataManager(object): """A data manager for transient tile data, which reads data from the request query string. """ implements(ITileDataManager) adapts(ITile) def __init__(self, tile): self.tile = tile self.tileType = queryUtility(ITileType, name=tile.__name__) # try to use a '_tiledata' parameter in the request, falling # back to the request.form object itself if not found request = self.tile.request if '_tiledata' in request.form: self.data = json.loads(request.form['_tiledata']) else: self.data = request.form
[docs] def get(self): # If we don't have a schema, just take the request if self.tileType is None or self.tileType.schema is None: return self.data.copy() # Try to decode the form data properly if we can try: return decode(self.data, self.tileType.schema, missing=True) except (ValueError, UnicodeDecodeError,): LOGGER.exception(u"Could not convert form data to schema") return self.data.copy()
[docs] def set(self, data): self.data.clear() self.data.update(data)
[docs] def delete(self): self.data.clear()
[docs]class PersistentTileDataManager(object): """A data reader for persistent tiles operating on annotatable contexts. The data is retrieved from an annotation. """ implements(ITileDataManager) adapts(IPersistentTile) def __init__(self, tile): self.tile = tile self.tileType = queryUtility(ITileType, name=tile.__name__) self.context = getMultiAdapter((tile.context, tile.request, tile), ITileDataContext) self.annotations = IAnnotations(self.context) self.key = "%s.%s" % (ANNOTATIONS_KEY_PREFIX, tile.id,)
[docs] def get(self): data = dict(self.annotations.get(self.key, {})) if self.tileType is not None and self.tileType.schema is not None: for name, field in getFields(self.tileType.schema).items(): if name not in data: data[name] = field.missing_value return data
[docs] def set(self, data): self.annotations[self.key] = PersistentDict(data)
[docs] def delete(self): if self.key in self.annotations: del self.annotations[self.key]
@implementer(ITileDataContext) @adapter(Interface, Interface, ITile)
[docs]def defaultTileDataContext(context, request, tile): return tile.context # Encoding
[docs]def encode(data, schema, ignore=()): """Given a data dictionary with key/value pairs and schema, return an encoded query string. This is similar to urllib.urlencode(), but field names will include the appropriate field type converters, e.g. an int field will be encoded as fieldname:int=123. Fields not found in the data dict will be ignored, and items in the dict not in the schema will also be ignored. Additional fields to ignore can be passed with the 'ignore' parameter. If any fields cannot be converted, a ComponentLookupError will be raised. """ encode = [] for name, field in getFieldsInOrder(schema): if name in ignore or name not in data: continue converter = IFieldTypeConverter(field, None) if converter is None: raise ComponentLookupError(u"Cannot URL encode %s of type %s" % (name, field.__class__,)) encoded_name = name if converter.token: encoded_name = "%s:%s" % (name, converter.token,) value = data[name] if value is None: continue if ISequence.providedBy(field): value_type_converter = IFieldTypeConverter(field.value_type, None) if value_type_converter is None: raise ComponentLookupError(u"Cannot URL encode value type for %s of type %s : %s" % (name, field.__class__, field.value_type.__class__,)) if value_type_converter.token: encoded_name = "%s:%s:%s" % (name, value_type_converter.token, converter.token,) for item in value: if isinstance(item, bool): item = item and '1' or '' encode.append((encoded_name, item,)) else: # The :bool converter just does bool() value, but urlencode() does # str() on the object. The result is False => 'False' => True :( if isinstance(value, bool): value = value and '1' or '' encode.append((encoded_name, value)) return urllib.urlencode(encode) # Decoding
[docs]def decode(data, schema, missing=True): """Decode a data dict according to a schema. The returned dictionary will contain only keys matching schema names, and will force type values appropriately. This function is only necessary because the encoders used by encode() are not sufficiently detailed to always return the exact type expected by a field, e.g. resulting in ascii/unicode discrepancies. If missing is True, fields that are in the schema but not in the data will be set to field.missing_value. Otherwise, they are ignored. """ decoded = {} for name, field in getFields(schema).items(): if name not in data: if missing: decoded[name] = field.missing_value continue value = data[name] if value is None: continue field_type = field._type if isinstance(field_type, (tuple, list,)): field_type = field_type[-1] if ISequence.providedBy(field): converted = [] value_type_field_type = field.value_type._type if isinstance(value_type_field_type, (tuple, list,)): value_type_field_type = value_type_field_type[-1] for item in value: if not isinstance(item, field.value_type._type): item = value_type_field_type(item) converted.append(item) value = converted if field._type is not None and not isinstance(value, field._type): value = field_type(value) decoded[name] = value return decoded