#!/usr/bin/env python
# For copyright and license terms, see COPYRIGHT.rst (top level of repository)
# Repository: https://github.com/C3S/collecting_society_worker
"""
The one and only C3S repertoire processing utility
"""
# --- Imports ---
import os
import time
import fcntl
import socket
import subprocess
import configparser
import json
import datetime
import re
import hashlib
import click
import requests
from pydub import AudioSegment
import taglib
from proteus import config, Model
try:
import trytonAccess
except Exception:
from . import trytonAccess
# import fileTools
# --- some constants ---
# 2DO: _preview_default
_preview_format = 'ogg'
_preview_quality = '0'
_preview_samplerate = '16000'
_preview_fadein = 1000
_preview_fadeout = 1000
_preview_segment_duration = 8000
_preview_segment_crossfade = 2000
_preview_segment_interval = 54000
_excerpt_format = 'wav'
_excerpt_quality = '0'
_excerpt_samplerate = '11025'
_excerpt_fadein = 0
_excerpt_fadeout = 0
_excerpt_segment_duration = 60000
# --- some initialization ---
[docs]
def expand_envvars(section):
return dict(
(k, os.path.expandvars(v)) for k, v in section.items()
)
# read config from .ini
CONFIGURATION = configparser.ConfigParser()
CONFIGURATION.read(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'..', 'config.ini'))
# debugging
try:
DEBUGGING_CONFIG = expand_envvars(dict(CONFIGURATION.items('debugging')))
# winpdb
if int(DEBUGGING_CONFIG['debugger_winpdb']):
import rpdb2
rpdb2.start_embedded_debugger("supersecret", fAllowRemote=True)
# debugpy
if int(DEBUGGING_CONFIG['debugger_debugpy']):
import debugpy # unconditional import breaks test coverage
debugpy.listen(("0.0.0.0", 52002))
except configparser.NoSectionError:
pass
try:
PROTEUS_CONFIG = expand_envvars(dict(CONFIGURATION.items('proteus')))
except configparser.NoSectionError:
print(
"Error: Please run repro.py "
"from the collecting_society_worker main folder.")
exit()
FILEHANDLING_CONFIG = expand_envvars(dict(CONFIGURATION.items('filehandling')))
ECHOPRINT_CONFIG = expand_envvars(dict(CONFIGURATION.items('echoprint')))
ECHOPRINT_SCHEMA = ECHOPRINT_CONFIG['schema']
assert ECHOPRINT_SCHEMA
ECHOPRINT_HOSTNAME = ECHOPRINT_CONFIG['hostname']
assert ECHOPRINT_HOSTNAME
ECHOPRINT_PORT = ECHOPRINT_CONFIG['port']
if not ECHOPRINT_PORT:
ECHOPRINT_PORT = 80
ECHOPRINT_URL = (ECHOPRINT_SCHEMA + "://" + ECHOPRINT_HOSTNAME + ":" +
ECHOPRINT_PORT)
HOSTNAME = socket.gethostname()
STORAGE_BASE_PATH = FILEHANDLING_CONFIG['storage_base_path']
# --- Processing stage functions for single audiofiles ---
[docs]
def preview_audiofile(srcdir, destdir, filename):
"""
Creates a low-quality preview audio snippet for newly uploaded files.
"""
# make sure previews and excerpts paths exist
content_base_path = FILEHANDLING_CONFIG['content_base_path']
if ensure_path_exists(content_base_path) is None:
print("ERROR: '" + content_base_path +
"' couldn't be created as content base path.")
return
# create directories in absolute paths if needed
previews_path = os.path.join(
FILEHANDLING_CONFIG['previews_path'],
filename[0],
filename[1])
excerpts_path = os.path.join(
FILEHANDLING_CONFIG['excerpts_path'],
filename[0],
filename[1])
if ensure_path_exists(
os.path.join(content_base_path, previews_path)
) is None:
print("ERROR: '" +
os.path.join(content_base_path, previews_path) +
"' couldn't be created for previews.")
return
if ensure_path_exists(
os.path.join(content_base_path, excerpts_path)
) is None:
print(
"ERROR: '" +
os.path.join(content_base_path, excerpts_path) +
"' couldn't be created for excerpts.")
return
# create paths with filenames
filepath = os.path.join(srcdir, filename)
previews_filepath_relative = os.path.join(previews_path, filename)
excerpts_filepath_relative = os.path.join(excerpts_path, filename)
previews_filepath = os.path.join(
content_base_path,
previews_filepath_relative)
excerpts_filepath = os.path.join(
content_base_path,
excerpts_filepath_relative)
# find content in database from filename
try:
matching_content = trytonAccess.get_content_by_filename(filename)
except Exception:
print("ERROR: Database seems to be under rebuild. Trying again later.")
exit()
if matching_content is None:
print(
"ERROR: Couldn't find content entry for '" +
filename +
"' in database.")
reject_file(
filepath,
'missing_database_record',
"File name: " +
filepath)
return
if matching_content.category == 'audio':
# create preview
audio = AudioSegment.from_file(filepath)
result = create_preview(audio, previews_filepath)
if not result:
print("ERROR: '" + filename + "' couldn't be previewed.")
return
# create excerpt
result = create_excerpt(audio, excerpts_filepath)
if not result:
print(
"ERROR: No excerpt could be cut out of '" +
filename + "'.")
return
# do a first test query on the EchoPrint server (2nd one after ingest)
score = 0
track_id_from_test_query = None
similiar_artist = ''
similiar_track = ''
# create fingerprint from audio file
# using echoprint-codegen and relate to the score
print('-' * 80)
print("test query with excerpt file " + excerpts_filepath)
try:
proc = subprocess.Popen(["echoprint-codegen", excerpts_filepath],
stdout=subprocess.PIPE)
except OSError:
print(
"Error: Unable to find echoprint-codegen executable in exe "
"path."
)
return
json_meta_fp = str(proc.communicate()[0], "utf-8")
fpcode_pos = json_meta_fp.find('"code":')
if fpcode_pos > 0 and len(json_meta_fp) > 80:
print(
"Got from codegen:" +
json_meta_fp[:fpcode_pos+40] +
"....." + json_meta_fp[-40:])
meta_fp = json.loads(json_meta_fp)
try:
query_request = requests.get(
ECHOPRINT_URL + "/query?fp_code=" +
meta_fp[0]['code'],
verify=False, # TODO: remove when cert. is updated
)
except Exception as e:
raise e
print(
"ERROR: '" + excerpts_filepath_relative +
"' couldn't be test-queried on the EchoPrint server.")
return
print
print(
"Server response: ",
query_request.status_code,
query_request.reason)
print
print(
"Body: " +
(query_request.text[:500] +
'...' + query_request.text[-1500:]
if len(query_request.text) > 2000 else query_request.text)
)
if query_request.status_code != 200:
print(
"ERROR: '" +
srcdir +
"' cloudn't be test-queried on the EchoPrint server.")
else:
qresult = json.loads(query_request.text)
score = qresult['score']
if qresult['match']:
track_id_from_test_query = (
qresult['track_id'][:8] +
'-' +
qresult['track_id'][8:12] +
'-' +
qresult['track_id'][12:16] +
'-' +
qresult['track_id'][16:]
)
# TODO: get these two from the creation table:
if 'artist' in qresult.keys():
similiar_artist = qresult['artist']
if 'track' in qresult.keys():
similiar_track = qresult['track']
else:
print("Got from codegen:" + json_meta_fp)
# check and update content processing status,
# save some pydub metadata to database
if matching_content.processing_state != 'uploaded':
print(
"WARNING: File '" +
filename +
"' in the uploaded folder had status '" +
matching_content.processing_state +
"'.")
matching_content.processing_state = 'previewed'
matching_content.processing_hostname = HOSTNAME
matching_content.path = filepath.replace(
STORAGE_BASE_PATH +
os.sep, '') # relative path
matching_content.preview_path = previews_filepath
matching_content.length = int(audio.duration_seconds)
matching_content.channels = int(audio.channels)
matching_content.sample_rate = int(audio.frame_rate)
matching_content.sample_width = int(audio.sample_width * 8)
matching_content.pre_ingest_excerpt_score = score
if track_id_from_test_query:
most_similar_content = trytonAccess.get_content_by_filename(
track_id_from_test_query)
if most_similar_content is None:
print(
"ERROR: Couldn't find content entry " +
"of most similar content for '" +
filename +
"' in database. EchoPrint server seems " +
"out of sync with database."
)
else:
matching_content.most_similiar_content = most_similar_content
matching_content.most_similiar_artist = similiar_artist
matching_content.most_similiar_track = similiar_track
# read metadata from file
try: # to append the extension from the original filename temporarily,
# because taglib
# isn't smart enough to process files without the proper extension
filepath_plus_extension = filepath + os.path.splitext(
matching_content.name)[1]
os.rename(filepath, filepath_plus_extension)
try:
song = taglib.File(filepath_plus_extension)
if song.tags:
if "ARTIST" in song.tags and song.tags["ARTIST"][0]:
matching_content.metadata_artist = song.tags[
"ARTIST"][0]
if "TITLE" in song.tags and song.tags["TITLE"][0]:
matching_content.metadata_title = song.tags["TITLE"][
0]
if "ALBUM" in song.tags and song.tags["ALBUM"][0]:
matching_content.metadata_release = song.tags["ALBUM"][
0]
if "TDOR" in song.tags and song.tags["TDOR"][0]:
matching_content.metadata_release_date = song.tags[
"TDOR"][0]
if (
"TRACKNUMBER" in song.tags and
song.tags["TRACKNUMBER"][0]
):
matching_content.metadata_track_number = song.tags[
"TRACKNUMBER"][0]
except OSError:
print(
"WARNING: taglib couldn't extract any metadata from file '"
+ filepath_plus_extension +
"' "
)
try:
os.rename(filepath_plus_extension, filepath)
except IOError:
print(
"ERROR: File '" +
filepath_plus_extension +
"' in the uploaded folder that was temporarily " +
"renamed to have a file extension " +
"to retrieve metadata from it couldn't be renamed back")
except IOError as e:
print(e)
print(
"WARNING: File '" +
filename +
"' in the uploaded folder couldn't be renamed " +
"temproarily to retrieve metadata from it")
# song.tags
matching_content.save()
# check it the audio format is much too bad even for 8bit enthusiasts
reason_details = ''
if audio.frame_rate < 11025:
reason_details = (
'Invalid frame rate of ' +
str(int(audio.frame_rate)) +
' Hz')
if audio.sample_width < 1:
# less than one byte? is this even possible? :-p
reason_details = (
'Invalid sample rate of ' +
str(int(audio.sample_width * 8)) +
' bits')
if reason_details != '':
reject_file(filepath, 'format_error', reason_details)
return
# move file to previewed directory
if move_file(filepath, destdir + os.sep + filename) is False:
print(
"ERROR: '" +
filename +
"' couldn't be moved to '" +
destdir + "'.")
return
[docs]
def get_segments(audio):
"""
Yields the segments back to the caller one by one.
"""
_total = len(audio)
_segment = _preview_segment_duration
_interval = _preview_segment_interval
if _segment >= _total:
yield audio
else:
start = 0
end = _segment
while end < _total:
yield audio[start:end]
start = end + _interval + 1
end = start + _segment
[docs]
def create_preview(audio, preview_path):
"""
mix a cool audio preview file with low quality
"""
# convert to mono
mono = audio.set_channels(1)
# mix segments
preview = None
for segment in get_segments(mono):
preview = segment if not preview else preview.append(
segment, crossfade=_preview_segment_crossfade
)
# fade in/out
preview = preview.fade_in(_preview_fadein).fade_out(_preview_fadeout)
# export
ok_return = True
try:
preview.export(
preview_path,
format=_preview_format,
parameters=[
"-aq", _preview_quality,
"-ar", _preview_samplerate
]
)
except Exception:
ok_return = False
return ok_return and os.path.isfile(preview_path)
[docs]
def create_excerpt(audio, excerpt_path):
"""
well, as the functin name says...
"""
# convert to mono
# mono =
audio.set_channels(1)
# this was for experimenting with different code lengths:
# for exlen in range(1000, 61000, 1000):
# # cut out one minute from the middle of the file
# if len(audio) > exlen:
# excerpt_center = len(audio) / 2
# excerpt_start = excerpt_center - exlen/2
# excerpt_end = excerpt_center + exlen/2
# audio2 = audio[excerpt_start:excerpt_end]
#
# # export
# ok_return = True
# try:
# audio2.export(
# excerpt_path+"-"+str(int(exlen/1000))+".wav",
# format=_excerpt_format,
# parameters=[
# "-aq", _excerpt_quality,
# "-ar", _excerpt_samplerate
# ]
# )
# except:
# ok_return = False
# cut out one minute from the middle of the file
if len(audio) > 60000:
excerpt_center = len(audio) / 2
excerpt_start = excerpt_center - 30000
excerpt_end = excerpt_center + 30000
audio = audio[excerpt_start:excerpt_end]
# export
ok_return = True
try:
audio.export(
excerpt_path,
format=_excerpt_format,
parameters=[
"-aq", _excerpt_quality,
"-ar", _excerpt_samplerate
]
)
except Exception:
ok_return = False
return ok_return and os.path.isfile(excerpt_path)
[docs]
def checksum_audiofile(srcdir, destdir, filename):
"""
Hashing a single file.
"""
filepath = os.path.join(srcdir, filename)
statinfo = os.stat(filepath)
filelength = statinfo.st_size
bufsize = 65536
sha256 = hashlib.sha256()
with open(filepath, 'rb') as filetohash:
while True:
data = filetohash.read(bufsize)
if not data:
break
sha256.update(data)
print(
"SHA256 of file {0}: {1}".format(
filename,
sha256.hexdigest())
)
# TO DO:
# check for duplicate checksums in database
# and possibly issue a 'checksum_collision'
# find content in database from filename
matching_content = trytonAccess.get_content_by_filename(filename)
if matching_content is None:
print(
"ERROR: Orphaned file " +
filename +
" (no DB entry) -- please clean up!")
return # shouldn't happen
# write checksum to file '<UUID>.checksum'
checksumfile = open(srcdir + os.sep + filename + '.checksum', 'w+')
checksumfile.write(sha256.__class__.__name__ + ':' + sha256.hexdigest())
checksumfile.close()
# move file to checksummed directory
if move_file(filepath, destdir + os.sep + filename) is False:
print(
"ERROR: '" +
filename +
"' couldn't be moved to '" +
destdir +
"'."
)
return
# check and update content processing status
if (matching_content.processing_state != 'previewed' and
matching_content.category == 'audio'):
print(
"WARNING: File '" +
filename +
"' in the previewed folder had status '" +
matching_content.processing_state + "'.")
if (matching_content.processing_state != 'previewed' and
matching_content.processing_state != 'uploaded' and
matching_content.category == 'sheet'):
print(
"WARNING: File '" +
filename +
"' in the previewed folder had status '" +
matching_content.processing_state + "'.")
matching_content.processing_state = 'checksummed'
matching_content.processing_hostname = HOSTNAME
matching_content.path = filepath.replace(
STORAGE_BASE_PATH + os.sep,
'') # relative path
matching_content.save()
# save sha256 to database
matching_checksums = [
x for x in matching_content.checksums if (
x.begin == 0 and
x.end == filelength
)
]
if len(matching_checksums) == 0:
# create a checksum
Checksum = Model.get('checksum')
checksum_to_use = Checksum()
matching_content.checksums.append(checksum_to_use)
elif len(matching_checksums) > 1: # shouldn't happen
print(
"WARNING: More than one whole file checksum entry " +
"in the database for '" +
filename +
"'. Please clean up the mess! Using the first one.")
else:
checksum_to_use = matching_checksums[0] # just one found: use it!
checksum_to_use.code = sha256.hexdigest()
checksum_to_use.timestamp = datetime.datetime.now()
checksum_to_use.algorithm = sha256.__class__.__name__
checksum_to_use.begin = 0
checksum_to_use.end = filelength
checksum_to_use.save()
[docs]
def fingerprint_audiofile(srcdir, destdir, filename):
"""
Audiofingerprint a single file.
Comes along with metadata lookup and store it on the EchoPrint server.
"""
filepath = os.path.join(srcdir, filename)
# get track_id and metadata from the creation via content table
matching_content = trytonAccess.get_content_by_filename(filename)
if matching_content is None:
return
if matching_content.category == 'audio':
# already metadata present in creation?
# then put it on the EchoPrint server rightaway
artist = ''
title = ''
release = ''
matching_creation = trytonAccess.get_creation_by_content(
matching_content)
if matching_creation:
artist = matching_creation.artist.name
title = matching_creation.title
if matching_creation.releases:
release = matching_creation.releases[0].title
if artist == '':
artist = 'DummyFiFaFu'
if title == '':
title = 'DummyFiFaFu'
if release == '':
release = 'DummyFiFaFu'
# create fringerprint from audio file using echoprint-codegen
print('-' * 80)
print("processing file " + filepath)
proc = subprocess.Popen(
["echoprint-codegen", filepath],
stdout=subprocess.PIPE)
json_meta_fp = proc.communicate()[0]
fpcode_pos = json_meta_fp.find('"code":')
if fpcode_pos > 0 and len(json_meta_fp) > 80:
print(
"Got from codegen:" +
json_meta_fp[:fpcode_pos+40] +
"....." + json_meta_fp[-40:]
)
else:
print(
"Got from codegen:" +
json_meta_fp)
reject_file(
filepath,
'no_fingerprint',
"Got from codegen:" +
json_meta_fp)
return
meta_fp = json.loads(json_meta_fp)
# TO DO: More sanity checks with possible no_fingerprint rejection
# save fingerprint to echoprint server
data = {
'track_id': filename.replace('-', ''),
# '-' reserved for fp segment
'token': ECHOPRINT_CONFIG['token'],
'fp_code': meta_fp[0]['code'].encode('utf8'),
'artist': artist,
'release': release,
'track': title,
'length': int(matching_content.length),
'codever': str(meta_fp[0]['metadata']['version']),
}
json_data = json.dumps(data)
fpcode_pos = json_data.find('"fp":')
if ECHOPRINT_CONFIG['token']:
json_data_pwhidden = json_data[-200:].replace(
ECHOPRINT_CONFIG['token'], 9*'*')
else:
json_data_pwhidden = json_data
if fpcode_pos > 0 and len(json_data_pwhidden) > 250:
print(
"Ingesting to server: " +
json_data_pwhidden[:fpcode_pos+40] +
"....." +
json_data_pwhidden[-200:]
)
else:
print("Ingesting to server: " + json_data_pwhidden)
print()
try:
ingest_request = requests.post(
ECHOPRINT_URL + "/ingest",
data,
verify=False, # TO DO: remove when certificate is updated
)
except Exception as e:
reject_file(
filepath,
'no_fingerprint',
( # TODO think about a better message...
"Could not be sent to EchoPrint server " +
"response code (server offline?). Note: %s" % e
)
)
print(
"ERROR: '" +
srcdir +
"' cloudn't be ingested into the EchoPrint server."
)
return
print()
print(
"Server response:",
ingest_request.status_code,
ingest_request.reason
)
print()
if (len(ingest_request.text) > 2000):
print(
"Body: " +
ingest_request.text[:500] +
'...' +
ingest_request.text[-1500:]
)
else:
print(
"Body: " +
ingest_request.text
)
if ingest_request.status_code != 200:
reject_file(
filepath,
'no_fingerprint',
(
"Could not be sent to EchoPrint server response code " +
str(ingest_request.status_code) +
': ' +
ingest_request.reason
)
)
print(
"ERROR: '" +
srcdir +
"' cloudn't be ingested into the EchoPrint server. " +
"File rejected.")
return
# do a 2nd test query on the EchoPrint server
# (1st was before ingest, during preview)
score = 0
track_id_from_test_query = ''
# similiar_artist = ''
# similiar_track = ''
# make sure previews and excerpts paths exist
content_base_path = FILEHANDLING_CONFIG['content_base_path']
if ensure_path_exists(content_base_path) is None:
print(
"ERROR: '" +
content_base_path +
"' couldn't be created as content base path."
)
return
excerpts_path = FILEHANDLING_CONFIG['excerpts_path']
if (
ensure_path_exists(excerpts_path) is None
):
print(
"ERROR: '" +
excerpts_path +
"' couldn't be created for excerpts."
)
return
# create excerpt paths with filenames
excerpts_filepath_relative = os.path.join(excerpts_path, filename)
excerpts_filepath = os.path.join(
content_base_path,
excerpts_filepath_relative
)
# create fringerprint from audio file
# using echoprint-codegen and relate to the score
print('-' * 80)
print("test query with excerpt file " + excerpts_filepath)
proc = subprocess.Popen(
["echoprint-codegen", excerpts_filepath],
stdout=subprocess.PIPE)
json_meta_fp = proc.communicate()[0]
fpcode_pos = json_meta_fp.find('"code":')
if fpcode_pos > 0 and len(json_meta_fp) > 80:
print(
"Got from codegen:" +
json_meta_fp[:fpcode_pos+40] +
"....." + json_meta_fp[-40:])
meta_fp = json.loads(json_meta_fp)
try:
query_request = requests.get(
ECHOPRINT_URL + "/query?fp_code=" +
meta_fp[0]['code'].encode('utf8'),
verify=False, # TO DO: remove when cert. is updated
)
except Exception:
print(
"ERROR: '" +
excerpts_filepath_relative +
"' cloudn't be test-queried on the EchoPrint server.")
return
print()
print(
"Server response: ",
query_request.status_code,
query_request.reason)
print()
print(
"Body: " +
(query_request.text[:500] + '...' + query_request.text[-1500:]
if len(query_request.text) > 2000 else query_request.text)
)
if query_request.status_code != 200:
print(
"ERROR: '" +
srcdir +
"' cloudn't be test-queried on the EchoPrint server.")
else:
qresult = json.loads(query_request.text)
score = qresult['score']
if qresult['match']:
track_id_from_test_query = (
qresult['track_id'][:8] +
'-' +
qresult['track_id'][8:12] +
'-' +
qresult['track_id'][12:16] +
'-' +
qresult['track_id'][16:])
# similiar_artist = qresult['artist']
# similiar_track = qresult['track']
else:
print("Got from codegen:" + json_meta_fp)
# check and update content processing state
if matching_content.processing_state != 'checksummed':
print(
"WARNING: File '" +
filename +
"' in the checksummed folder had status '" +
matching_content.processing_state +
"'.")
matching_content.processing_state = 'fingerprinted'
matching_content.processing_hostname = HOSTNAME
matching_content.path = filepath.replace(
STORAGE_BASE_PATH +
os.sep, '') # relative path
matching_content.post_ingest_excerpt_score = score
if track_id_from_test_query:
most_similar_content = trytonAccess.get_content_by_filename(
track_id_from_test_query)
if most_similar_content is None:
print(
"ERROR: Couldn't find content entry " +
"of most similar content for '" +
filename +
"' in database. EchoPrint server seems " +
"out of sync with database.")
reject_file(
filepath,
'missing_database_record',
"File name: " +
filepath)
else:
if (
track_id_from_test_query
==
matching_content.most_similiar_content.uuid
):
matching_content.pre_ingest_excerpt_score = 0
matching_content.save()
# TO DO: user access control
FingerprintLog = Model.get('content.fingerprintlog')
new_logentry = FingerprintLog()
user = Model.get('res.user')
matching_users = user.find(['login', '=', 'admin'])
if not matching_users:
return
new_logentry.user = matching_users[0]
new_logentry.content = matching_content
new_logentry.timestamp = datetime.datetime.now()
new_logentry.fingerprinting_algorithm = 'EchoPrint'
if fpcode_pos > 0 and len(json_meta_fp) > 80:
new_logentry.fingerprinting_version = str(
meta_fp[0]['metadata']['version'])
else:
new_logentry.fingerprinting_version = (
"unknown (check if echoprint access token was set properly!)")
new_logentry.entity_origin = 'direct'
Party = Model.get('party.party')
party = Party.find(['name', '=', 'C3S SCE'])[0]
new_logentry.entity_creator = party
new_logentry.save()
# TO DO: check newly ingested fingerprint using the excerpt
# Response codes:
# 0 = NOT_ENOUGH_CODE,
# 1 = CANNOT_DECODE,
# 2 = SINGLE_BAD_MATCH,
# 3 = SINGLE_GOOD_MATCH,
# 4 = NO_RESULTS,
# 5 = MULTIPLE_GOOD_MATCH_HISTOGRAM_INCREASED,
# 6 = MULTIPLE_GOOD_MATCH_HISTOGRAM_DECREASED,
# 7 = MULTIPLE_BAD_HISTOGRAM_MATCH,
# 8 = MULTIPLE_GOOD_MATCH
# move file to fingerprinted directory
if move_file(filepath, destdir + os.sep + filename) is False:
print(
"ERROR: '" +
filepath +
"' couldn't be moved to '" +
destdir +
os.sep +
filename +
"'.")
return
[docs]
def drop_audiofile(srcdir, destdir, filename):
"""
Does nothing but setting the processing step from fingerprinted to dropped.
"""
filepath = os.path.join(srcdir, filename)
# find content in database from filename
matching_content = trytonAccess.get_content_by_filename(filename)
if matching_content is None:
print(
"ERROR: Orphaned file " +
filename +
" (no DB entry) -- rejecting!")
reject_file(
filepath,
'missing_database_record',
"There was no content database record for " +
filename)
return # shouldn't normally happen
# move file to checksummed directory
if move_file(filepath, destdir + os.sep + filename) is False:
print(
"ERROR: '" +
filename +
"' couldn't be moved to '" +
destdir +
"'.")
return
# overwrite file content with its filename
# (so it generates different hash values)
if FILEHANDLING_CONFIG['disembody_dropped_files'] == 'yes':
overwritefile = open(destdir + os.sep + filename, 'w+')
if (overwritefile is not None):
overwritefile.write(filename)
overwritefile.close()
# check and update content processing status
if (matching_content.processing_state != 'fingerprinted' and
matching_content.category == 'audio'):
print(
"WARNING: File '" +
filename +
"' in the fingerprinted folder had status '" +
matching_content.processing_state + "'.")
if (matching_content.processing_state != 'fingerprinted' and
matching_content.processing_state != 'checksummed' and
matching_content.category == 'sheet'):
print(
"WARNING: File '" +
filename +
"' in the fingerprinted folder had status '" +
matching_content.processing_state + "'.")
matching_content.processing_state = 'dropped'
matching_content.processing_hostname = HOSTNAME
matching_content.path = filepath.replace(
STORAGE_BASE_PATH +
os.sep, '') # relative path
matching_content.save()
# --- Helper functions ---
[docs]
def directory_walker(processing_step_func, args):
"""
Walks through the specified directory tree.
Applies the specified repertoire processing step for each file.
Example: directory_walker(fingerprint_audiofile, (sourcedir, destdir))
"""
startpath = args[0]
destdir = args[1]
processing_message = "Processing " + startpath
processing_did_some_work = False
if ensure_path_exists(destdir) is None:
print(
"ERROR: '" +
destdir +
"' couldn't be created.")
return
uuid4rule = (
'^[0-9A-F]{8}-[0-9A-F]{4}-[4][0-9A-F]{3}'
'-[89AB][0-9A-F]{3}-[0-9A-F]{12}$')
uuid4hex = re.compile(uuid4rule, re.I)
for root, _, files in os.walk(startpath):
level = root.replace(startpath, '').count(os.sep)
if level == 1:
for audiofile in files:
if uuid4hex.match(audiofile) is not None:
# only uuid4 compilant filenames
destsubdir = root.replace(args[0], destdir)
if ensure_path_exists(destsubdir) is None:
# ensure user subfolder exists
print(
"ERROR: '" +
destdir +
"' couldn't be created.")
continue
# lock file
try:
audiofilepath = os.path.join(root, audiofile)
lockfilename = audiofilepath + '.lock'
lockfile = open(lockfilename, 'w+')
fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
# after successful locking,
# make sure the audiofile is still there ...
if os.path.isfile(audiofilepath):
# ... and process file
print(processing_message)
processing_did_some_work = True
processing_step_func(root, destsubdir, audiofile)
# unlock file
fcntl.flock(lockfile, fcntl.LOCK_UN)
lockfile.close()
# try:
os.remove(lockfilename)
# except OSError:
# pass
except IOError:
print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
print("LOCKED: " + audiofile)
# pass
finally:
if lockfile:
lockfile.close()
if processing_did_some_work:
print("Finished processing " + startpath)
[docs]
def move_file(source, target):
"""
Moves a file from one path to another.
"""
# check file
if not os.path.isfile(source):
return False
if os.path.isfile(target):
return False
# move file
try:
# shutil.copyfile(source, target)
# shutil.copyfile(source + ".checksums", target + ".checksums")
# os.remove(source)
# os.remove(source + ".checksums")
# print "Moving files " + source + "* to " + target + "*."
if os.path.isfile(source + ".checksums"):
os.rename(source + ".checksums", target + ".checksums")
if os.path.isfile(source + ".checksum"):
os.rename(source + ".checksum", target + ".checksum")
os.rename(source, target)
# suppose we only have one mounted filesystem
except IOError as e:
print(e)
return os.path.isfile(target) and not os.path.isfile(source)
[docs]
def ensure_path_exists(path):
"""
If path doesn't exist, create directory.
"""
if not os.path.exists(path):
try:
os.makedirs(path)
except IOError:
pass
return os.path.exists(path)
[docs]
def reject_file(source, reason, reason_details):
"""
Moves a file to the 'rejected' folder
and writes the reject reason to the database.
"""
# check file
if not os.path.isfile(source):
return False
storage_base_path = FILEHANDLING_CONFIG['storage_base_path']
if ensure_path_exists(storage_base_path) is None:
print(
"ERROR: '" +
storage_base_path +
"' couldn't be created as content base path.")
return
rejected_path = FILEHANDLING_CONFIG['rejected_path']
if ensure_path_exists(rejected_path) is None:
print(
"ERROR: '" +
rejected_path +
"' couldn't be created for rejected files.")
return
filename = os.sep.join(
source.rsplit(os.sep, 2)[-2:])
# get user-id/filename from source path
rejected_filepath_relative = os.path.join(
rejected_path, filename)
rejected_filepath = os.path.join(
storage_base_path,
rejected_filepath_relative)
move_file(source, rejected_filepath)
# TODO: cleanup possible preview and excerpt files
# find content in database from filename
slash_pos = filename.find(os.sep)
if slash_pos > 0:
matching_content = trytonAccess.get_content_by_filename(
filename[slash_pos+1:])
else:
matching_content = trytonAccess.get_content_by_filename(filename)
if matching_content is None:
# print(
# "ERROR: Couldn't find content entry for '" +
# rejected_filepath_relative +
# "' in database. But no problem: " +
# "I'm about to reject the file anyway...")
pass
return
# check and update content processing status,
# save some pydub metadata to database
matching_content.processing_state = 'rejected'
matching_content.processing_hostname = HOSTNAME
matching_content.path = rejected_filepath_relative
matching_content.rejection_reason = reason
matching_content.rejection_reason_details = reason_details
matching_content.save()
# --- Click Commands ---
@click.group()
def repro():
"""
Repertoire Processor command line tool.
"""
@repro.command('preview')
# @click.pass_context
def preview():
"""
Get files from uploaded_path and creates a low quality audio snippet of it.
"""
directory_walker(
preview_audiofile,
(
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['uploaded_path']),
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['previewed_path'])
)
)
@repro.command('checksum')
# @click.pass_context
def checksum():
"""
Get files from previewed_path and hash them.
"""
directory_walker(
checksum_audiofile, (
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['previewed_path']),
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['checksummed_path'])
)
)
@repro.command('fingerprint')
# @click.pass_context
def fingerprint():
"""
Get files from checksummed_path and fingerprint them.
"""
directory_walker(
fingerprint_audiofile, (
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['checksummed_path']),
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['fingerprinted_path'])))
@repro.command('drop')
# @click.pass_context
def drop():
"""
Get files from fingerprinted_path and 'drop' them for archiving.
This command does nothing with the files
and is rather a mere abstraction
in case there will at some point
be another processing step after fingerprint.
"""
directory_walker(
drop_audiofile, (
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['fingerprinted_path']),
os.path.join(STORAGE_BASE_PATH,
FILEHANDLING_CONFIG['dropped_path'])))
@repro.command('delete')
# @click.pass_context
def delete():
"""
delete a fingerprint from the echoprint server
"""
# utilizations = Model.get('creation.utilisation.imp')
# result = utilizations.find(['title', "=", "999,999"])
# if not result:
# sys.exit()
# code = result.fingerprint
url = ECHOPRINT_URL + "/delete"
var_id = "track_id"
track = "TRWIZHB123E858D912"
query_request = requests.post(url, data={var_id: track}, verify=False)
print("status code: " + str(query_request.status_code) + " -- " +
query_request.reason)
@repro.command('all')
@click.pass_context
def all(ctx):
"""
Apply all processing steps.
Looks for new files to be previed after each processing step.
"""
ctx.invoke(preview)
ctx.invoke(checksum)
ctx.invoke(preview)
ctx.invoke(fingerprint)
ctx.invoke(drop)
ctx.invoke(preview)
@repro.command('loop')
@click.pass_context
def loop(ctx):
"""
Apply all processing steps in an endless loop.
"""
while True:
ctx.invoke(all)
time_to_wait_between_cycles = 10
print(
'Waiting for ' +
str(time_to_wait_between_cycles) +
'seconds...')
time.sleep(time_to_wait_between_cycles)
print('entering new processing cycle')
[docs]
def connect_db():
"""
get access to database
"""
max_tries = 3
tries = 0
# As XMLRPC has no disconnect method (should be stateless), but leaves
# a connection to postgres open, the postgres backend has to
# disconnect them via pg_terminate_backend in order to be able to drop
# the database (testing).
# The proteus config on the other hand seems to cache some connection
# data, as the first connection results in an unauthorized error, so
# a second try has to be added.
while tries < max_tries:
SCHEMA = "https"
if os.environ.get('ENVIRONMENT') in ['development', 'testing']:
SCHEMA = "http"
try:
config.set_xmlrpc(
SCHEMA + "://" +
PROTEUS_CONFIG['user'] + ":" +
PROTEUS_CONFIG['password'] +
"@" + PROTEUS_CONFIG['host'] +
":" + PROTEUS_CONFIG['port'] +
"/" + PROTEUS_CONFIG['database'] + "/"
)
break
except Exception as e:
tries += 1
if tries == max_tries:
exit(
"Database connection could not be established "
"(yet), skipping file processing ... %s" % e)
time.sleep(1)
if __name__ == '__main__':
# ensure storage path
if ensure_path_exists(STORAGE_BASE_PATH) is None:
print(
"ERROR: '" +
STORAGE_BASE_PATH +
"' couldn't be created as storage base path.")
exit()
# connect db
connect_db()
# execute repro
repro()