Source code for collecting_society_worker.trytonAccess

#!/usr/bin/env python
# For copyright and license terms, see COPYRIGHT.rst (top level of repository)
# Repository: https://github.com/C3S/collecting_society_worker

"""
The one and only C3S proteus tools for accessing data in C3S tryton db
"""


from proteus import config, Model


[docs] def connect(pconfig): """ get access to tryton database for processing state updates use http only on local test instance (and uncomment [ssl] entries in server side trytond.conf for using http) """ config.set_xmlrpc( "http://" + pconfig["user"] + ":" + pconfig["password"] + "@" + pconfig["host"] + ":" + pconfig["port"] + "/" + pconfig["database"] )
[docs] def get_content_by_filename(filename): """ Get a content by filename/uuid. """ if not filename: return Content = Model.get('content') matching_contents = Content.find(['uuid', "=", filename]) if len(matching_contents) == 0: print("ERROR: Wasn't able to find content entry in the database " "for '" + filename + "'.") return None if len(matching_contents) > 1: # unlikely with uuids, but we are # supersticious... print("WARNING: More than one content entry in the database for '" + filename + "'. Using the first one.") return matching_contents[0]
[docs] def get_creation_by_content(content): """ Get a creation by content(.id). """ Creation = Model.get('creation') matching_creations = Creation.find(['id', "=", content.id]) if len(matching_creations) == 0: print("ERROR: Wasn't able to find creation entry in the database " "with id '" + str(content.id) + "' for file '" + content.uuid + "'.") return None if len(matching_creations) > 1: print("WARNING: More than one content entry in the database for '" + content.uuid + "'. Using the first one.") return matching_creations[0]
[docs] def insert_content_by_filename(filename, user, pstate): """ insert an example content by filename/uuid. """ Content = Model.get('content') new_content = Content() new_content.id < 0 new_content.uuid = filename new_content.user = user new_content.processing_state = pstate new_content.save()
[docs] def update_content_pstate(filename, pstate): """ update content's state by filename/uuid. """ upd_content = get_content_by_filename(filename) upd_content.processing_state = pstate upd_content.save()
[docs] def get_or_insert_web_user(email): """ Finds a web user by email; if she doesn't exist, she will be created and returned """ WebUser = Model.get('web.user') matching_user = WebUser.find(['email', "=", email]) if len(matching_user) == 0: new_user = WebUser() new_user.email = email new_user.save() return new_user return matching_user[0]
[docs] def delete_web_user(email): """ deletes webuser by email .. caution:: Not yet functional! """ # TODO: delete # WebUser = Model.get('web.user') # matching_user = WebUser.find(['email', "=", email]) pass
[docs] def delete_content(filename): """ deletes a content (file) record of an uploaded file (.wav, .pdf) .. caution:: Not yet functional! """ # TODO: delete # Content = Model.get('content') # matching_content = Content.find(['uuid', "=", filename]) pass
[docs] def get_obj_state(filename): """ returns the state of objects inherited from the state mixin """ matching_content = get_content_by_filename(filename) return matching_content.processing_state
[docs] def set_content_unknown(filename): """ sets a content record's processing_state to unknown """ matching_content = get_content_by_filename(filename) matching_content.processing_state = "unknown" matching_content.save()