Source code for portal_web.models.base

# For copyright and license terms, see COPYRIGHT.rst (top level of repository)
# Repository: https://github.com/C3S/portal_web

import logging
from functools import wraps

from psycopg2._psycopg import InterfaceError

from pyramid import threadlocal

from trytond.transaction import Transaction
from trytond.config import config
from trytond.pool import Pool

log = logging.getLogger(__name__)


[docs] class Tdb(): """ Base Class for model wrappers and communication handling using trytond. Includes functions to - initialize the tryton database - wrap database communication with transaction handling Includes helper functions for getting the pool, context and models. Classattributes: _db (str): Name of database. _configfile (str): Tryton config file. _retry (int): Number of retries in transactions. _user (int): Default id of tryton backend user for transactions. _company (int): Default company id. __name__ (str): Name of the tryton model to be initialized. """ wraps = 0 # debug information # --- DB ------------------------------------------------------------------ _db = None _configfile = None _retry = None _user = None _company = None
[docs] @classmethod def init(cls): """ Initializes a Tryton database. Configuration is done by assigning the desired values to the following class attributes prior to the call: - _db - _configfile - _company - _user Updates the tryton config and reads out the configured number of retries before initialization. Note: This function is expected to be called only once. Examples: >>> Tdb._db = 'db' >>> Tdb._configfile = '/path/to/configfile' >>> Tdb._company = 1 >>> Tdb._user = 'user' >>> Tdb.init() """ config.update_etc(str(cls._configfile)) if cls.is_open(): return cls._retry = config.getint('database', 'retry') pool = Pool(str(cls._db)) with Transaction().start(str(cls._db), int(cls._user), readonly=True): pool.init()
[docs] @staticmethod def is_open(): transaction = Transaction() if not transaction.connection: return False cursor = transaction.connection.cursor() if cursor: return True return False
[docs] def transaction(readonly=None, user=None, context=None): """ Decorater function to wrap database communication with transactions. The wrapping function handles: - start and stop of transactions - caching in case of multithreading environments - commit and rollback of cursors on error - retries in case of an operational error of Tryton - chaining of multiple decorated functions within one transaction 2DO: Chaining multiple calls to the database could result in different cursors. This works in principle (commented out) but still has a problem with the combination of different rw/ro calls. Args: readonly (bool): Type of transaction. If None and kwargs contains a request object, then the transaction will be readonly except for PUT, POST, DELETE and PATCH request methods. If None and kwargs contains no request object, then the transaction will be readonly. user (int): Tryton backend user id for transaction. If None, then the default user will be used for transaction context (dict): Context for transaction. If None, then the context of transaction will be empty. Raises: DatabaseOperationalError: if Tryton or the database has a problem. Note: This work is based on the `flask_tryton`_ package by Cedric Krier <ced@b2ck.com> licensed under GPLv3 (see `flask_tryton.py`) .. _flask_tryton: https://pypi.python.org/pypi/flask_tryton """ _tdbglog = "/shared/tmp/logs/transaction.log" def _tdbg(func, mode, string=None, levelchange=0): settings = threadlocal.get_current_registry().settings if not settings or 'debug.tdb.transactions' not in settings: return if settings['debug.tdb.transactions'] == 'true': import os import inspect stack = inspect.stack() _, filename, line_number, function, lines, _ = stack[2] functions = [] for i, framerecord in enumerate(stack): f = framerecord[3] if f in ['<lambda>', '_call_view']: break if f in ['_tdbg', 'wrapper']: continue functions.append(f) functions.reverse() with open(_tdbglog, "a") as f: lvl = Tdb.wraps if mode == "WRAP" else Tdb.wraps - 1 f.write("\t"*Tdb.wraps + "%s %s" % (mode, func.__name__)) f.write(" %s" % lvl) if mode == "WRAP": f.write(" | %s:%s():%s %s\n" % ( os.path.basename(filename), function, line_number, lines[0].strip())) f.write( "\t"*(Tdb.wraps+1)+"- connection: %s, %s\n" % ( Tdb.is_open() and "open" or "closed", readonly and "read" or "write")) f.write("\t"*(Tdb.wraps+1)+"- calls: %s" % ( " -> ".join(functions))) if string: f.write(" | " + string) f.write("\n") Tdb.wraps += levelchange if not Tdb.wraps: f.write("\n") os.chmod(_tdbglog, 775) def decorator(func): @wraps(func) def wrapper(*args, **kwargs): _db = Tdb._db _user = user or 0 _retry = Tdb._retry or 0 _readonly = readonly from trytond.backend import DatabaseOperationalError if 'request' in kwargs: _readonly = not ( kwargs['request'].method in ('PUT', 'POST', 'DELETE', 'PATCH')) _tdbg(func, "WRAP", None, 1) for count in range(_retry, 0, -1): if not Tdb.is_open(): _tdbg(func, "CONNECT") with Transaction().start(_db, 0): pool = Pool(Tdb._db) User = pool.get('res.user') _context = User.get_preferences(context_only=True) _context.update(context or {}) Transaction().start( _db, _user, readonly=_readonly, context=_context, close=False) transaction = Transaction().new_transaction( readonly=_readonly) try: _tdbg(func, "CALL", "Try %s, Transaction %s" % (_retry + 1 - count, id(transaction))) result = func(*args, **kwargs) if not _readonly: _tdbg(func, "COMMIT", "Try %s, Transaction %s" % (_retry + 1 - count, id(transaction))) if func.__name__ != '_context_found_writable': transaction.commit() transaction.stop() except DatabaseOperationalError: if transaction: transaction.rollback() if not count or _readonly: raise continue except InterfaceError: if transaction: transaction.rollback() if not count: raise continue except Exception: if transaction: transaction.rollback() raise _tdbg(func, "RETURN", None, -1) return result return wrapper return decorator
[docs] @classmethod def pool(cls): """ Gets the Tryton pool object. Returns: obj: Pool. """ pool = Pool(str(cls._db)) return pool
[docs] @classmethod def context(cls): """ Gets the Transaction context. Returns: dict: Context. """ context = Transaction().context return context
# --- Model --------------------------------------------------------------- __name__ = None
[docs] @classmethod def get(cls, model=None): """ Gets a Tryton model object. If no Tryton model descriptor is passed, the class variable __name__ is assumed to be set to it's default Tryton model descriptor, which then will be used to get the Tryton model. Args: model (str): Tryton model descriptor or None. Returns: obj: Tryton model. """ pool = cls.pool() if model: return pool.get(str(model)) if not cls.__dict__['__name__']: raise KeyError("__name__ is missing") return pool.get(str(cls.__dict__['__name__']))
transaction = staticmethod(transaction) # --- Methods -------------------------------------------------------------
[docs] @classmethod def escape(cls, string, wrap=False): string = string.replace('_', '\\_') string = string.replace('%', '\\_') if wrap: string = "%" + string + "%" return string
[docs] @classmethod def escape_domain(cls, domain, wrap=True): for index, statement in enumerate(domain): if isinstance(statement, list): cls.escape_operands(statement) if isinstance(statement, str): continue if statement[1] in ['like', 'ilike', 'not like', 'not ilike']: statement_list = list(statement) statement_list[2].replace('_', '\\_') statement_list[2].replace('%', '\\_') if wrap: statement_list[2] = "%" + statement_list[2] + "%" domain[index] = tuple(statement_list) return domain
[docs] @classmethod def create(cls, vlist): """ Generic creation method to use in model wrappers. Args: vlist (list): list of dicts with attributes to create records. For example:: [ { 'code': ..., 'uuid': ..., 'name': ... }, { ... } ] Raises: KeyError: if required field is missing Returns: list: created devices None: if no object was created """ log.debug('create database object:\n{}'.format(vlist)) result = cls.get().create(vlist) return result or None
[docs] class MixinSearchById(object): """ Modelwrapper mixin for models that can be searched by id """
[docs] @classmethod def search_by_id(cls, id): """ Searches a model by id Args: id (int): model.id .. note:: This is a search for the internal database id. Returns: obj: model None: if no match is found """ result = cls.get().search([('id', '=', int(id))]) if not result: return None return result[0]
[docs] class MixinSearchByCode(object): """ Modelwrapper mixin for models that can be searched by its code """
[docs] @classmethod def search_by_code(cls, code): """ Searches an object by its code. Args: code (str): Code of the object. Returns: obj: Checksum. None: If no match is found. """ if code is None: return None result = cls.get().search([('code', '=', code)]) return result[0] if result else None
[docs] class MixinSearchByName(object): """ Modelwrapper mixin for models that can be searched by its name field """
[docs] @classmethod def search_by_name(cls, name): """ Searches an object by name Args: name (string): object.name Returns: obj: db object None: if no match is found """ result = cls.get().search([('name', '=', name)]) return result[0] or None
[docs] class MixinSearchByUuid(object): """ Modelwrapper mixin for models that can be searched by its UUID field """
[docs] @classmethod def search_by_uuid(cls, uuid): """ Searches an object by uuid Args: uuid (string): object.uuid Returns: obj: db object None: if no match is found """ result = cls.get().search([('uuid', '=', uuid)]) return result[0] or None
[docs] class MixinSearchByOid(object): """ Modelwrapper mixin for models that can be searched by its oid field. The oid is exposed via the public api as an identifier for an object. """
[docs] @classmethod def search_by_oid(cls, oid): """ Searches an object by oid Args: oid (string): object.oid Returns: obj: db object None: if no match is found """ result = cls.get().search([('oid', '=', oid)]) return result[0] or None
[docs] @classmethod def search_by_oids(cls, oids): """ Searches objects by oids Args: oids (list): [object.oid, ...] Returns: list: [db object, ...] """ result = cls.get().search([('oid', 'in', oids)]) return result
[docs] class MixinSearchAll(object): """ Modelwrapper mixin for models that can return all records of a table """
[docs] @classmethod def search_all(cls): """ Fetches all records Returns: list of records None: if table is empty """ return cls.get().search([])
[docs] class MixinWebuser(object): """ Modelwrapper mixin for models that need to filter by webusers acl restrictions, for example if the webuser is allowed to view or edit items. """
[docs] @classmethod def current_viewable(cls, request): """ Searches objects, which the current web_user is allowed to view. Args: request (pyramid.request.Request): Current request. Returns: list: viewable objects of web_user None: if no match is found """ return cls.search_viewable_by_web_user(request.web_user.id)
[docs] @classmethod def current_editable(cls, request): """ Searches objects, which the current web_user is allowed to edit. Args: request (pyramid.request.Request): Current request. Returns: list: editable objects of web_user None: if no match is found """ return cls.search_editable_by_web_user(request.web_user.id)
[docs] @classmethod def search_viewable_by_web_user(cls, web_user_id, active=True): """ Searches objects, which the web_user is allowed to view. The view permission is expected to have the form 'view_<modelname>'. Args: web_user_id (int): web.user.id Returns: list: viewable objects of web_user, empty if none were found """ return cls.get().search([ ('acl.web_user', '=', web_user_id), ('acl.roles.permissions.code', '=', 'view_' + cls.__dict__['__name__']) ])
[docs] @classmethod def search_editable_by_web_user(cls, web_user_id, active=True): """ Searches objects, which the web_user is allowed to edit. The view permission is expected to have the form 'edit_<modelname>'. Args: web_user_id (int): web.user.id Returns: list: viewable objects of web_user, empty if none were found """ return cls.get().search([ ('acl.web_user', '=', web_user_id), ('acl.roles.permissions.code', '=', 'edit_' + cls.__dict__['__name__']) ])