"""
Utilities for handling web requests and responses in the HED web application.
"""
import io
import json
import os
import zipfile
from datetime import datetime
from urllib.parse import urlparse
from flask import Response, make_response, send_file
from hed import HedSchema, HedSchemaGroup
from hed import schema as hedschema
from hed.errors import ErrorHandler, ErrorSeverity, HedFileError
from hed.schema import load_schema_version
from werkzeug.utils import secure_filename
from hedweb.constants import base_constants as bc
from hedweb.constants import file_constants as fc
TIME_FORMAT = "%Y_%m_%d_T_%H_%M_%S_%f"
[docs]
def convert_hed_versions(hed_info) -> list:
"""Convert a hed_info dictionary to a list of HED version strings.
Parameters:
hed_info (dict): A dictionary with HED version strings, where keys are prefixes and values are lists of HED versions.
Returns:
list: A list of HED versions with prefixes applied.
"""
hed_list = []
standard_list = hed_info.get(None, [])
for key, key_list in hed_info.items():
if key is not None:
hed_list = hed_list + [key + "_" + element for element in key_list]
return standard_list + hed_list
[docs]
def file_extension_is_valid(filename, accepted_extensions=None) -> bool:
"""Return True if the file extension is an accepted one.
Parameters:
filename (str): The name of the file to be checked.
accepted_extensions (list): A list of accepted extensions.
Returns:
bool: True if the file has an accepted extension.
"""
return (
not accepted_extensions
or os.path.splitext(filename.lower())[1] in accepted_extensions
)
[docs]
def filter_issues(issues, check_for_warnings):
"""Filter an issues list by severity level to allow warnings."""
if not check_for_warnings:
issues = ErrorHandler.filter_issues_by_severity(issues, ErrorSeverity.ERROR)
return issues
[docs]
def generate_download_file_from_text(results, file_header=None) -> Response:
"""Generate a download file from text output.
Parameters:
results (dict): Text with newlines for iterating.
file_header (str): Optional header for download file blob.
Returns:
Response: A Response object containing the downloaded file.
"""
display_name = results.get("output_display_name", None)
if display_name is None:
display_name = "download.txt"
download_text = results.get("data", "")
if not download_text:
raise HedFileError("EmptyDownloadText", "No download text given", "")
def generate():
if file_header:
yield file_header
if isinstance(download_text, list):
# If download_text is a list, yield from its iterator
yield from download_text
else:
# Otherwise, process it as a string
yield from download_text.splitlines(True)
return Response(
generate(),
mimetype="text/plain charset=utf-8",
headers={
"Content-Disposition": f"attachment filename={display_name}",
"Category": results[bc.MSG_CATEGORY],
"Message": results[bc.MSG],
},
)
[docs]
def generate_download_spreadsheet(results) -> Response:
"""Generate a download Excel file.
Parameters:
results (dict): Dictionary with the results to be downloaded.
Returns:
Response: A Response object containing the downloaded file.
"""
# return generate_download_test()
spreadsheet = results[bc.SPREADSHEET]
if not spreadsheet.loaded_workbook:
return generate_download_file_from_text(
{
"data": spreadsheet.to_csv(),
"output_display_name": results[bc.OUTPUT_DISPLAY_NAME],
bc.MSG_CATEGORY: results[bc.MSG_CATEGORY],
bc.MSG: results[bc.MSG],
}
)
buffer = io.BytesIO()
spreadsheet.to_excel(buffer)
buffer.seek(0)
response = make_response()
response.data = buffer.read()
response.headers["Content-Disposition"] = (
"attachment; filename=" + results[bc.OUTPUT_DISPLAY_NAME]
)
response.headers["Category"] = results[bc.MSG_CATEGORY]
response.headers["Message"] = results[bc.MSG]
response.mimetype = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
return response
[docs]
def generate_filename(
base_name, name_prefix=None, name_suffix=None, extension=None, append_datetime=False
) -> str:
"""Generate a filename for the attachment.
Parameters:
base_name (str or None): Name of the base, usually the name of the file that the issues were generated from.
name_prefix (str or None): Prefix prepended to the front of the base name.
name_suffix (str or None): Suffix appended to the end of the base name.
extension (str or None): Extension to use.
append_datetime (bool): If True, append the current date-time to the base output filename.
Returns:
str: Name of the attachment other containing the issues.
Notes:
- The form prefix_basename_suffix + extension.
"""
pieces = []
if name_prefix:
pieces = pieces + [name_prefix]
if base_name:
pieces.append(os.path.splitext(base_name)[0])
if name_suffix:
pieces = pieces + [name_suffix]
filename = "".join(pieces)
if append_datetime:
now = datetime.now()
filename = filename + "_" + now.strftime(TIME_FORMAT)[:-3]
if filename and extension:
filename = filename + extension
return secure_filename(filename)
[docs]
def generate_text_response(results) -> Response:
"""Generate a download response.
Parameters:
results (dict): Dictionary containing the results of the data.
Returns:
Response: A Response object containing the downloaded file.
"""
headers = {"Category": results[bc.MSG_CATEGORY], "Message": results[bc.MSG]}
download_text = results.get("data", "")
if len(download_text) > 0:
headers["Content-Length"] = len(download_text)
return Response(download_text, mimetype="text/plain charset=utf-8", headers=headers)
[docs]
def generate_download_zip_file(results) -> Response:
"""Generate a download response.
Parameters:
results (dict): Dictionary of results to use in constructing response.
Returns:
Response: A Response object containing the downloaded file.
"""
file_list = results[bc.FILE_LIST]
archive = io.BytesIO()
with zipfile.ZipFile(archive, mode="a", compression=zipfile.ZIP_DEFLATED) as zf:
for item in file_list:
zf.writestr(item["file_name"], str.encode(item["content"], "utf-8"))
archive.seek(0)
zip_name = results.get("zip_name", results["output_display_name"])
response = send_file(archive, as_attachment=True, download_name=zip_name)
response.headers["Message"] = results[bc.MSG]
response.headers["Category"] = results[bc.MSG_CATEGORY]
return response
[docs]
def get_hed_schema_from_pull_down(request) -> HedSchema:
"""Create a HedSchema object from form pull-down box.
Parameters:
request (Request): A Request object containing form data.
Returns:
HedSchema: The HED schema to use.
"""
if bc.SCHEMA_VERSION not in request.form:
raise HedFileError(
"NoSchemaError", "Must provide a valid schema or schema version", ""
)
elif request.form[bc.SCHEMA_VERSION] != bc.OTHER_VERSION_OPTION:
hed_schema = load_schema_version(request.form[bc.SCHEMA_VERSION])
elif bc.SCHEMA_PATH in request.files:
f = request.files[bc.SCHEMA_PATH]
hed_schema = hedschema.from_string(
f.read(fc.BYTE_LIMIT).decode("utf-8"),
schema_format=secure_filename(f.filename),
)
else:
raise HedFileError(
"NoSchemaFile", "Must provide a valid schema for upload if other chosen", ""
)
return hed_schema
[docs]
def get_option(options, option_name, default_value) -> str:
"""Get an option value from a dictionary of options.
Parameters:
options (dict): A dictionary of options.
option_name (str): The name of the option to retrieve.
default_value (str): The default value to return if the option is not found.
Returns:
str: The value of the option if found, otherwise the default value.
"""
option_value = default_value
if options and option_name in options:
option_value = options[option_name]
return option_value
[docs]
def get_parsed_name(filename, is_url=False) -> tuple[str, str]:
"""Parse a filename or URL to extract the display name and file type.
Parameters:
filename (str): The name of the file or URL to be parsed.
is_url (bool): If True, treat the filename as a URL.
Returns:
tuple[str, str]: A tuple containing the display name and file type.
"""
if is_url:
filename = urlparse(filename).path
display_name, file_type = os.path.splitext(os.path.basename(filename))
return display_name, file_type
[docs]
def get_schema_versions(hed_schema) -> str:
"""Get the formatted version of a HedSchema or HedSchemaGroup.
Parameters:
hed_schema (HedSchema or HedSchemaGroup or None): The schema or schema group to get the version from.
Returns:
str: The formatted version of the schema for display purposes.
Raises:
ValueError: If the provided hed_schema is not a HedSchema or HedSchemaGroup.
"""
if not hed_schema:
return ""
if isinstance(hed_schema, HedSchema) or isinstance(hed_schema, HedSchemaGroup):
return hed_schema.get_formatted_version()
raise ValueError(
"InvalidHedSchemaOrHedSchemaGroup", "Expected schema or schema group"
)
[docs]
def handle_error(ex, hed_info=None, title=None, return_as_str=True) -> str | dict:
"""Handle an error by returning a dictionary or simple string.
Parameters:
ex (Exception): The exception raised.
hed_info (dict): A dictionary of information describing the error.
title (str): A title to be included with the message.
return_as_str (bool): If true return as string otherwise as dictionary.
Returns:
Union[str, dict]: Contains error information.
"""
if not hed_info:
hed_info = {}
if hasattr(ex, "error_type"):
error_code = ex.error_type
else:
error_code = type(ex).__name__
if not title:
title = ""
if hasattr(ex, "message"):
message = str(ex.message)
else:
message = str(ex)
hed_info["message"] = f"{title}[{error_code}: {message}]"
if return_as_str:
return json.dumps(hed_info)
else:
hed_info["error_type"] = error_code
return hed_info
[docs]
def handle_http_error(ex) -> Response:
"""Handle http error.
Parameters:
ex (Exception): A class that extends python Exception class.
Returns:
Response: A response object indicating the field_type of error.
"""
return generate_text_response(get_exception_message(ex))
[docs]
def get_exception_message(ex) -> dict:
"""Extract a suitable message for exception as a dictionary
Parameters:
ex (Exception): A class that extends python Exception class.
Returns:
dict: A dict indicating the field_type of error.
"""
if hasattr(ex, "error_type"):
error_code = ex.error_type
elif hasattr(ex, "code"):
error_code = ex.code
else:
error_code = type(ex).__name__
if hasattr(ex, "message"):
message = str(ex.message)
else:
message = str(ex)
message = message.replace("\n", " ")
if hasattr(ex, "filename"):
filename = str(ex.filename)
else:
filename = ""
error_message = f"{error_code}: {filename} [{message}]"
return {"data": "", bc.MSG_CATEGORY: "error", bc.MSG: error_message}
[docs]
def package_results(results) -> Response:
"""Package a results dictionary into a standard form.
Parameters:
results (dict): A dictionary with the results.
Returns:
Response: A Response object containing the results in a standard format.
"""
if isinstance(results.get("data", None), list):
results["data"] = "\n".join(results["data"]) + "\n"
if results.get(bc.FILE_LIST, None):
return generate_download_zip_file(results)
elif (
results.get("data", None)
and results.get("command_target", None) != "spreadsheet"
):
return generate_download_file_from_text(results)
elif results.get("data", None) or not results.get("spreadsheet", None):
return generate_text_response(results)
else:
return generate_download_spreadsheet(results)