Source code for portal_web.views.forms.base

# For copyright and license terms, see COPYRIGHT.rst (top level of repository)
# Repository: https://github.com/C3S/portal_web

import os
import shutil
import glob
import tempfile
# from tempfile import NamedTemporaryFile
import time

from abc import ABCMeta, abstractmethod
import logging

from pyramid.httpexceptions import HTTPFound
import colander
import deform

from ...resources import ResourceBase
from ...services import benchmark

log = logging.getLogger(__name__)


[docs] class FormController(object, metaclass=ABCMeta): """ Abstract class for form handling """ __stage__ = None def __init__(self, name=None, stage=None, persistent=False, appstruct=None, context=None, request=None, response=None): self._name = name or self.__class__.__name__ self._form = None self._data = {} # aggregates several appstructs, dep. on form design self.persistent = persistent # store in session? self.stage = stage or self.__stage__ self.appstruct = appstruct or {} self.context = context self.request = request self.response = response or {} self.validationfailure = None def __getstate__(self): return { '__stage__': self.__stage__, '_name': self._name, '_data': self._data, 'persistent': self.persistent, 'stage': self.stage, 'appstruct': self.appstruct } def __setstate__(self, state): self.__stage__ = state['__stage__'] self._name = state['_name'] self._form = None self._data = state['_data'] self.persistent = state['persistent'] self.stage = state['stage'] self.appstruct = state['appstruct'] self.context = None self.request = None self.response = {} @property def name(self): return self._name @property def form(self): return self._form @form.setter def form(self, form): self._form = form if self._form.formid == 'deform': self._form.formid = self.name @property def data(self): """ session persistent dict for aggregated data of all appstructs (stages) needs to be filled manually """ return self._data
[docs] def render(self, appstruct={}, form=None): if form is None: form = self.form with benchmark(self.request, name='forms.render', uid=self.name, scale=1000): self.response = {self.name: form.render(appstruct=appstruct)}
[docs] def process(self, context, request): self.context = context self.request = request return self.controller()
[docs] @abstractmethod def controller(self): """ overwrite this with your form logic! (if form is submitted and ... then ...) """ pass # pragma: no cover
# --- Conditions ----------------------------------------------------------
[docs] def submitted(self, button=None, form=None): """ check to see if your form was submitted (by certain button and/or form) """ data = self.request.POST or self.request.GET if not data: return False form = form or self._form.formid if '__formid__' in data and data['__formid__'] == form: if button: return button in data return True return False
[docs] def valid(self): """ check if form did validate on last validation """ return self.validationfailure is None
# --- Actions -------------------------------------------------------------
[docs] def redirect(self, resource='', *args, **kwargs): """ return user to other place by str or resource, removes form from session """ if isinstance(resource, str): path = self.request.resource_path(self.context, resource, *args) elif isinstance(resource, ResourceBase): path = self.request.resource_path(resource, *args) elif isinstance(resource, type) and issubclass(resource, ResourceBase): path = self.request.resource_path(resource(self.request), *args) assert path self.response = HTTPFound(path, **kwargs) self.remove()
[docs] def validate(self, data=False): self.appstruct, self.validationfailure = None, None try: _data = self.request.POST.items() or self.request.GET.items() if data: _data += self.data.items() self.appstruct = self.form.validate(_data) self.response = {self.name: self.form.render()} return True except deform.ValidationFailure as e: self.validationfailure = e self.response = {self.name: self.validationfailure.render()} return False
[docs] def clean(self): """ reset the form """ self._form = None self._data = {} self.appstruct = None self.context = None self.stage = self.__stage__ self.validationfailure = None
[docs] def remove(self): """ no longer persist form in session """ if self.name in self.request.session['forms']: del self.request.session['forms'][self.name]
[docs] class MemoryTmpStore(dict):
[docs] def preview_url(self, name): return None
[docs] class TmpFile(object): def __init__(self, source, *args, **kwargs): self.file = tempfile.NamedTemporaryFile(*args, **kwargs) shutil.copyfileobj(source, self) def __getstate__(self): try: return self.name except: # noqa: E722 pass return None def __setstate__(self, name): self.file = None if name and os.path.isfile(name): self.file = open(name, 'r') def __getattr__(self, attr): return getattr(self.file, attr) def __iter__(self): return iter(self.file)
[docs] def delete(self): file = self.name if file and os.path.isfile(file): os.unlink(file)
[docs] class FileTmpStore(dict): def __init__(self, path=None, timeout=0): self.path = path self.timeout = timeout self.files = {} self.clear_tmpfiles()
[docs] def clear_tmpfiles(self): if not self.timeout: return path = os.path.join(self.path, 'tmp*') for tmpfile in glob.glob(path): created = os.stat(tmpfile).st_mtime now = time.time() if created < now - self.timeout: os.unlink(tmpfile)
def __setitem__(self, name, cstruct): tmpfile = TmpFile(source=cstruct['fp'], dir=self.path, delete=False) cstruct['fp'].close() cstruct['fp'] = tmpfile super(FileTmpStore, self).__setitem__(name, cstruct)
[docs] def preview_url(self, name): return None
@colander.deferred def deferred_file_upload_widget(node, kw): request = kw.get('request') session = request.session basepath = request.registry.settings.get('session.data_dir', '/tmp') path = os.path.join(basepath, 'file_tmp_store') timeout = int(request.registry.settings.get('session.file_expires', 0)) if not os.path.isdir(path): os.makedirs(path) if 'file_upload' not in session: session['file_upload'] = FileTmpStore(path=path, timeout=timeout) widget = deform.widget.FileUploadWidget(session['file_upload']) return widget