"""
Support functions for reporting validation errors.
You can scope the formatted errors with calls to push_error_context and pop_error_context.
The standard context keys are:
CUSTOM_TITLE = 'ec_title'
FILE_NAME = 'ec_filename'
SIDECAR_COLUMN_NAME = 'ec_sidecarColumnName'
SIDECAR_KEY_NAME = 'ec_sidecarKeyName'
ROW = 'ec_row'
COLUMN = 'ec_column'
LINE = "ec_line"
HED_STRING = 'ec_HedString'
SCHEMA_SECTION = 'ec_section'
SCHEMA_TAG = 'ec_schema_tag'
SCHEMA_ATTRIBUTE = 'ec_attribute'
The standard error keys are:
'code' -- the standard HED error code
'message' -- the error message
'severity' -- 1 is error and 10 is warning.
'url' -- the web page in the HED Specification corresponding to the error (has explanation of how can occur).
'source_tag' -- the source tag that caused the error if application.
"""
from __future__ import annotations
from functools import wraps
import xml.etree.ElementTree as ET
from collections import defaultdict
from typing import Optional
from hed.errors.error_types import ErrorContext, ErrorSeverity
from hed.errors.known_error_codes import known_error_codes
error_functions = {}
# Controls if the default issue printing skips adding indentation for this context.
no_tab_context = {ErrorContext.HED_STRING, ErrorContext.SCHEMA_ATTRIBUTE}
# Default sort ordering for issues list.
default_sort_list = [
ErrorContext.CUSTOM_TITLE,
ErrorContext.FILE_NAME,
ErrorContext.TABLE_NAME,
ErrorContext.SIDECAR_COLUMN_NAME,
ErrorContext.SIDECAR_KEY_NAME,
ErrorContext.ROW,
ErrorContext.COLUMN,
ErrorContext.LINE,
ErrorContext.SCHEMA_SECTION,
ErrorContext.SCHEMA_TAG,
ErrorContext.SCHEMA_ATTRIBUTE,
]
# ErrorContext which is expected to be int based.
int_sort_list = [ErrorContext.ROW]
def _register_error_function(error_type, wrapper_func):
if error_type in error_functions:
raise KeyError(f"{error_type} defined more than once.")
error_functions[error_type] = wrapper_func
def hed_error(error_type: str, actual_code: Optional[str] = None, default_severity: int = ErrorSeverity.ERROR):
"""Decorator for errors in error handler or inherited classes.
Parameters:
error_type (str): A value from error_types or optionally another value.
actual_code (str or None): The actual error to report to the outside world.
default_severity (int): The default severity for the decorated error.
"""
if actual_code is None:
actual_code = error_type
def inner_decorator(func):
@wraps(func)
def wrapper(*args, severity=default_severity, **kwargs):
"""Wrapper function for error handling non-tag errors.
Parameters:
args (args): non keyword args.
severity (int): Will override the default error value if passed.
kwargs (**kwargs): Any keyword args to be passed down to error message function.
Returns:
list: A list of dict with the errors.
"""
base_message = func(*args, **kwargs)
error_object = ErrorHandler._create_error_object(actual_code, base_message, severity)
return error_object
_register_error_function(error_type, wrapper_func=wrapper)
return wrapper
return inner_decorator
def hed_tag_error(error_type, default_severity=ErrorSeverity.ERROR, has_sub_tag=False, actual_code=None):
"""Decorator for errors in error handler or inherited classes.
Parameters:
error_type (str): A value from error_types or optionally another value.
default_severity (int): The default severity for the decorated error.
has_sub_tag (bool): If True, this error message also wants a sub_tag passed down. eg "This" in "This/Is/A/Tag"
actual_code (str): The actual error to report to the outside world.
"""
if actual_code is None:
actual_code = error_type
def inner_decorator(func):
if has_sub_tag:
@wraps(func)
def wrapper(tag, index_in_tag, index_in_tag_end, *args, severity=default_severity, **kwargs) -> list[dict]:
"""Wrapper function for error handling tag errors with sub tags.
Parameters:
tag (HedTag): The HED tag object with the problem.
index_in_tag (int): The index into the tag with a problem(usually 0).
index_in_tag_end (int): The last index into the tag with a problem - usually len(tag).
args (args): Any other non keyword args.
severity (int): Used to include warnings as well as errors.
kwargs (**kwargs): Any keyword args to be passed down to error message function.
Returns:
list[dict]: A list of dict with the errors.
"""
try:
tag_as_string = tag.tag
except AttributeError:
tag_as_string = str(tag)
if index_in_tag_end is None:
index_in_tag_end = len(tag_as_string)
problem_sub_tag = tag_as_string[index_in_tag:index_in_tag_end]
try:
org_tag_text = tag.org_tag
except AttributeError:
org_tag_text = str(tag)
base_message = func(org_tag_text, problem_sub_tag, *args, **kwargs)
error_object = ErrorHandler._create_error_object(
actual_code,
base_message,
severity,
index_in_tag=index_in_tag,
index_in_tag_end=index_in_tag_end,
source_tag=tag,
)
return error_object
_register_error_function(error_type, wrapper_func=wrapper)
return wrapper
else:
@wraps(func)
def wrapper(tag, *args, severity=default_severity, **kwargs):
"""Wrapper function for error handling tag errors.
Parameters:
tag (HedTag or HedGroup): The HED tag object with the problem.
args (non keyword args): Any other non keyword args.
severity (ErrorSeverity): For including warnings.
kwargs (keyword args): Any keyword args to be passed down to error message function.
Returns:
list: A list of dict with the errors.
"""
from hed.models.hed_tag import HedTag
from hed.models.hed_group import HedGroup
if isinstance(tag, HedTag):
org_tag_text = tag.org_tag
elif isinstance(tag, HedGroup):
org_tag_text = tag.get_original_hed_string()
else:
org_tag_text = str(tag)
base_message = func(org_tag_text, *args, **kwargs)
error_object = ErrorHandler._create_error_object(actual_code, base_message, severity, source_tag=tag)
return error_object
_register_error_function(error_type, wrapper_func=wrapper)
return wrapper
return inner_decorator
# Import after hed_error decorators are defined.
from hed.errors import error_messages # noqa:E402
from hed.errors import schema_error_messages # noqa:E402
# Intentional to make sure tools don't think the import is unused
error_messages.mark_as_used = True
schema_error_messages.mark_as_used = True
[docs]
class ErrorHandler:
"""Class to hold error context and having general error functions."""
[docs]
def __init__(self, check_for_warnings=True):
# The current (ordered) dictionary of contexts.
self.error_context = []
self._check_for_warnings = check_for_warnings
[docs]
def push_error_context(self, context_type, context):
"""Push a new error context to narrow down error scope.
Parameters:
context_type (str): A value from ErrorContext representing the type of scope.
context (str, int, or HedString): The main value for the context_type.
Notes:
The context depends on the context_type. For ErrorContext.FILE_NAME this would be the actual filename.
"""
if context is None:
if context_type in int_sort_list:
context = 0
else:
context = ""
self.error_context.append((context_type, context))
[docs]
def pop_error_context(self):
"""Remove the last scope from the error context.
Notes:
Modifies the error context of this reporter.
"""
self.error_context.pop(-1)
[docs]
def reset_error_context(self):
"""Reset all error context information to defaults.
Notes:
This function is mainly for testing and should not be needed with proper usage.
"""
self.error_context = []
[docs]
def add_context_and_filter(self, issues):
"""Filter out warnings if requested, while adding context to issues.
issues(list):
list: A list containing a single dictionary representing a single error.
"""
if not self._check_for_warnings:
issues[:] = self.filter_issues_by_severity(issues, ErrorSeverity.ERROR)
for error_object in issues:
self._add_context_to_error(error_object, self.error_context)
self._update_error_with_char_pos(error_object)
[docs]
def format_error_with_context(self, *args, **kwargs):
error_object = ErrorHandler.format_error(*args, **kwargs)
if self is not None:
actual_error = error_object[0]
# # Filter out warning errors
if not self._check_for_warnings and actual_error["severity"] >= ErrorSeverity.WARNING:
return []
self._add_context_to_error(actual_error, self.error_context)
self._update_error_with_char_pos(actual_error)
return error_object
[docs]
@staticmethod
def filter_issues_by_severity(issues_list: list[dict], severity: int) -> list[dict]:
"""Gather all issues matching or below a given severity.
Parameters:
issues_list (list[dict]): A list of dictionaries containing the full issue list.
severity (int): The level of issues to keep.
Returns:
list[dict]: A list of dictionaries containing the issue list after filtering by severity.
"""
return [issue for issue in issues_list if issue["severity"] <= severity]
[docs]
@staticmethod
def format_error_from_context(
error_type: str, error_context: list, *args, actual_error: Optional[str], **kwargs
) -> list[dict]:
"""Format an error based on the error type.
Parameters:
error_type (str): The type of error. Registered with @hed_error or @hed_tag_error.
error_context (list): Contains the error context to use for this error.
args (args): Any remaining non-keyword args.
actual_error (str or None): Error code to actually add to report out.
kwargs (kwargs): Keyword parameters to pass down to the error handling func.
Returns:
list[dict]: A list containing a single dictionary.
Notes:
- Generally the error_context is returned from _add_context_to_error.
- The actual_error is useful for errors that are shared like invalid character.
- This can't filter out warnings like the other ones.
"""
error_list = ErrorHandler.format_error(error_type, *args, actual_error=actual_error, **kwargs)
ErrorHandler._add_context_to_error(error_list[0], error_context)
ErrorHandler._update_error_with_char_pos(error_list[0])
return error_list
@staticmethod
def _add_context_to_error(error_object: dict, error_context_to_add: list) -> dict:
"""Add relevant context such as row number or column name around an error object.
Parameters:
error_object (dict): Generated error containing at least a code and message entry.
error_context_to_add (list): Source context to use. If none, the error handler context is used.
Returns:
dict: A list of dict with needed context strings added at the beginning of the list.
"""
for context_type, context in error_context_to_add:
error_object[context_type] = context
return error_object
@staticmethod
def _create_error_object(error_type, base_message, severity, **kwargs):
error_object = {"code": error_type, "message": base_message, "severity": severity}
for key, value in kwargs.items():
error_object.setdefault(key, value)
return error_object
@staticmethod
def _get_tag_span_to_error_object(error_object):
if ErrorContext.HED_STRING not in error_object:
return None, None
if "source_tag" in error_object:
source_tag = error_object["source_tag"]
if isinstance(source_tag, int):
return None, None
else:
return None, None
hed_string = error_object[ErrorContext.HED_STRING]
span = hed_string._get_org_span(source_tag)
return span
@staticmethod
def _update_error_with_char_pos(error_object):
# This part is optional as you can always generate these as needed.
start, end = ErrorHandler._get_tag_span_to_error_object(error_object)
if start is not None:
# silence warning in pycharm
start = int(start)
source_tag = error_object.get("source_tag", None)
# Todo: Move this functionality somewhere more centralized.
# If the tag has been modified from the original, don't try to use sub indexing.
if source_tag and source_tag._tag:
new_start, new_end = start, end
else:
new_start = start + error_object.get("index_in_tag", 0)
index_in_tag_end = end
if "index_in_tag_end" in error_object:
index_in_tag_end = start + error_object["index_in_tag_end"]
new_end = index_in_tag_end
error_object["char_index"], error_object["char_index_end"] = new_start, new_end
error_object["message"] += f" Problem spans string indexes: {new_start}, {new_end}"
[docs]
@hed_error("Unknown")
def val_error_unknown(*args, **kwargs) -> str:
"""Default error handler if no error of this type was registered.
Parameters:
args (args): List of non-keyword parameters (varies).
kwargs (kwargs): Keyword parameters (varies)
Returns:
str: The error message.
"""
return f"Unknown error. Args: {str(args), str(kwargs)}"
[docs]
@staticmethod
def filter_issues_by_count(issues, count, by_file=False) -> tuple[list[dict], dict[str, int]]:
"""Filter the issues list to only include the first count issues of each code.
Parameters:
issues (list): A list of dictionaries containing the full issue list.
count (int): The number of issues to keep for each code.
by_file (bool): If True, group by file name.
Returns:
tuple[list[dict], dict[str, int]]: A tuple containing:
- A list of dictionaries representing the filtered issue list.
- A dictionary with the codes as keys and the number of occurrences as values.
"""
file_dicts = {"": {}}
filtered_issues = []
for issue in issues:
seen_codes = file_dicts[""]
if by_file and "ec_filename" in issue:
file_name = issue["ec_filename"]
if file_name not in file_dicts:
file_dicts[file_name] = {}
seen_codes = file_dicts[file_name]
code = issue["code"]
if code not in seen_codes:
seen_codes[code] = 0
seen_codes[code] += 1
if seen_codes[code] > count:
continue
filtered_issues.append(issue)
return filtered_issues, ErrorHandler.aggregate_code_counts(file_dicts)
[docs]
@staticmethod
def aggregate_code_counts(file_code_dict) -> dict:
"""Aggregate the counts of codes across multiple files.
Parameters:
file_code_dict (dict): A dictionary where keys are filenames and values are dictionaries of code counts.
Returns:
dict: A dictionary with the aggregated counts of codes across all files.
"""
total_counts = defaultdict(int)
for file_dict in file_code_dict.values():
for code, count in file_dict.items():
total_counts[code] += count
return dict(total_counts)
[docs]
@staticmethod
def get_code_counts(issues: list[dict]) -> dict[str, int]:
"""Count the occurrences of each error code in the issues list.
Parameters:
issues (list[dict]): A list of dictionaries containing the issues.
Returns:
dict[str, int]: A dictionary with error codes as keys and their occurrence counts as values.
"""
code_counts = defaultdict(int)
for issue in issues:
code = issue.get("code", "UNKNOWN")
code_counts[code] += 1
return dict(code_counts)
[docs]
def sort_issues(issues, reverse=False) -> list[dict]:
"""Sort a list of issues by the error context values.
Parameters:
issues (list): A list of dictionaries representing the issues to be sorted.
reverse (bool, optional): If True, sorts the list in descending order. Default is False.
Returns:
list[dict]: The sorted list of issues.
"""
def _get_keys(d):
result = []
for key in default_sort_list:
if key in int_sort_list:
result.append(d.get(key, -1))
else:
result.append(d.get(key, ""))
return tuple(result)
issues = sorted(issues, key=_get_keys, reverse=reverse)
return issues
def check_for_any_errors(issues_list):
"""Return True if there are any errors with a severity of warning."""
for issue in issues_list:
if issue["severity"] < ErrorSeverity.WARNING:
return True
return False
[docs]
def get_printable_issue_string(
issues, title=None, severity=None, skip_filename=True, add_link=False, show_details=False
) -> str:
"""Return a string with issues list flatted into single string, one per line.
Parameters:
issues (list): Issues to print.
title (str): Optional title that will always show up first if present(even if there are no validation issues).
severity (int): Return only warnings >= severity.
skip_filename (bool): If True, don't add the filename context to the printable string.
add_link (bool): Add a link at the end of message to the appropriate error if True
show_details (bool): If True, show details about the issues.
Returns:
str: A string containing printable version of the issues or ''.
"""
if severity is not None:
issues = ErrorHandler.filter_issues_by_severity(issues, severity)
output_dict = _build_error_context_dict(issues, skip_filename)
issue_string = _error_dict_to_string(output_dict, add_link=add_link, show_details=show_details)
if title:
issue_string = title + "\n" + issue_string
return issue_string
def get_printable_issue_string_html(issues, title=None, severity=None, skip_filename=True, show_details=False):
"""Return a string with issues list as an HTML tree.
Parameters:
issues (list): Issues to print.
title (str): Optional title that will always show up first if present.
severity (int): Return only warnings >= severity.
skip_filename (bool): If True, don't add the filename context to the printable string.
show_details (bool): If True, show details about the issues.
Returns:
str: An HTML string containing the issues or ''.
"""
if severity is not None:
issues = ErrorHandler.filter_issues_by_severity(issues, severity)
output_dict = _build_error_context_dict(issues, skip_filename)
root_element = _create_error_tree(output_dict)
if title:
title_element = ET.Element("h1")
title_element.text = title
root_element.insert(0, title_element)
return ET.tostring(root_element, encoding="unicode")
def iter_errors(issues):
"""An iterator over issues that flattens the context into each issue dictionary.
This function takes a list of issues and transforms each one into a "flat" dictionary.
A flat dictionary contains all the information about a single error, including its context,
in a single, non-nested dictionary. This is useful for reporting or logging errors
in a simple, straightforward format.
For example, context information like file name or row number, which might be stored
in a nested structure or separate from the issue dictionary, is merged into the
top level of the dictionary.
It also adds a 'url' key with a link to the documentation for known HED error codes.
Any complex objects like HedTag or HedString are converted to their string representations.
Parameters:
issues (list): A list of issue dictionaries to iterate over.
Yields:
dict: A flattened dictionary representing a single error.
"""
for issue in issues:
flat_issue = {}
single_issue_context = _get_context_from_issue(issue, False)
flat_issue.update(single_issue_context)
flat_issue.update(issue)
# Add a link to the error if it's a known error code.
error_url = create_doc_link(issue.get("code", ""))
if error_url:
flat_issue["url"] = create_doc_link(issue.get("code", ""))
if "source_tag" in flat_issue:
flat_issue["source_tag"] = str(issue["source_tag"])
if "ec_HedString" in flat_issue:
flat_issue["ec_HedString"] = str(issue["ec_HedString"])
yield flat_issue
def create_doc_link(error_code):
"""If error code is a known code, return a documentation url for it.
Parameters:
error_code(str): A HED error code.
Returns:
Union[str, None]: The URL if it's a valid code.
"""
if error_code in known_error_codes["hed_validation_errors"] or error_code in known_error_codes["schema_validation_errors"]:
modified_error_code = error_code.replace("_", "-").lower()
return f"https://hed-specification.readthedocs.io/en/latest/Appendix_B.html#{modified_error_code}"
return None
def _build_error_context_dict(issues, skip_filename):
"""Build the context -> error dictionary for an entire list of issues.
Returns:
dict: A nested dictionary structure with a "children" key at each level for unrelated children.
"""
output_dict = None
for single_issue in issues:
single_issue_context = _get_context_from_issue(single_issue, skip_filename)
output_dict = _add_single_error_to_dict(single_issue_context, output_dict, single_issue)
return output_dict
def _add_single_error_to_dict(items, root=None, issue_to_add=None):
"""Build a nested dictionary out of the context lists.
Parameters:
items (list): A list of error contexts
root (dict, optional): An existing nested dictionary structure to update.
issue_to_add (dict, optional): The issue to add at this level of context.
Returns:
dict: A nested dictionary structure with a "children" key at each level for unrelated children.
"""
if root is None:
root = {"children": []}
current_dict = root
for item in items:
# Navigate to the next level if the item already exists, or create a new level
next_dict = current_dict.get(item, {"children": []})
current_dict[item] = next_dict
current_dict = next_dict
if issue_to_add:
current_dict["children"].append(issue_to_add)
return root
def _error_dict_to_string(print_dict, add_link=True, show_details=False, level=0):
output = ""
if print_dict is None:
return output
for context, value in print_dict.items():
if context == "children":
for child in value:
single_issue_message = child["message"]
issue_string = level * "\t" + _get_error_prefix(child)
issue_string += f"{single_issue_message}\n"
if add_link:
link_url = create_doc_link(child["code"])
if link_url:
single_issue_message += "\n" + (level + 1) * "\t" + f" See... {link_url}"
if show_details and "details" in child:
issue_string += _expand_details(child["details"], level + 1)
output += issue_string
continue
output += _format_single_context_string(context[0], context[1], level)
output += _error_dict_to_string(value, add_link, show_details, level + 1)
return output
def _expand_details(details, indent=0):
"""Expand the details of an error into a string.
Parameters:
details (str): The details to expand.
indent (int): The indentation level.
Returns:
str: The expanded details string.
"""
if not details:
return ""
expanded_details = ""
for line in details:
expanded_details += indent * "\t" + line + "\n"
return expanded_details
def _get_context_from_issue(val_issue, skip_filename=True):
"""Extract all the context values from the given issue.
Parameters:
val_issue (dict): A dictionary a representing a single error.
skip_filename (bool): If True, don't gather the filename context.
Returns:
list: A list of tuples containing the context_type and context for the given issue.
"""
single_issue_context = []
for key, value in val_issue.items():
if skip_filename and key == ErrorContext.FILE_NAME:
continue
if key == ErrorContext.HED_STRING:
value = value.get_original_hed_string()
if key.startswith("ec_"):
single_issue_context.append((key, str(value)))
return single_issue_context
def _get_error_prefix(single_issue):
"""Return the prefix for the error message based on severity and error code.
Parameters:
single_issue(dict): A single issue object.
Returns:
str: the prefix to use.
"""
severity = single_issue.get("severity", ErrorSeverity.ERROR)
error_code = single_issue["code"]
if severity == ErrorSeverity.ERROR:
error_prefix = f"{error_code}: "
else:
error_prefix = f"{error_code}: (Warning) "
return error_prefix
def _format_single_context_string(context_type, context, tab_count=0):
"""Return the human-readable form of a single context tuple.
Parameters:
context_type (str): The context type of this entry.
context (str or HedString): The value of this context.
tab_count (int): Number of tabs to name_prefix each line with.
Returns:
str: A string containing the context, including tabs.
"""
tab_string = tab_count * "\t"
error_types = {
ErrorContext.FILE_NAME: f"\nErrors in file '{context}':",
ErrorContext.TABLE_NAME: f"\nErrors in table '{context}':",
ErrorContext.SIDECAR_COLUMN_NAME: f"Column '{context}':",
ErrorContext.SIDECAR_KEY_NAME: f"Key: {context}",
ErrorContext.ROW: f"Issues in row {context}:",
ErrorContext.COLUMN: f"Issues in column {context}:",
ErrorContext.CUSTOM_TITLE: context,
ErrorContext.LINE: f"Line: {context}",
ErrorContext.HED_STRING: f"hed string: {context}",
ErrorContext.SCHEMA_SECTION: f"Schema Section: {context}",
ErrorContext.SCHEMA_TAG: f"Source tag: {context}",
ErrorContext.SCHEMA_ATTRIBUTE: f"Source Attribute: {context}",
}
context_portion = error_types[context_type]
context_string = f"{tab_string}{context_portion}\n"
return context_string
def _create_error_tree(error_dict, parent_element=None, add_link=True):
if parent_element is None:
parent_element = ET.Element("ul")
for context, value in error_dict.items():
if context == "children":
for child in value:
child_li = ET.SubElement(parent_element, "li")
error_prefix = _get_error_prefix(child)
single_issue_message = child["message"]
# Create a link for the error prefix if add_link is True.
if add_link:
link_url = create_doc_link(child["code"])
if link_url:
a_element = ET.SubElement(child_li, "a", href=link_url)
a_element.text = error_prefix
a_element.tail = " " + single_issue_message
else:
child_li.text = error_prefix + " " + single_issue_message
else:
child_li.text = error_prefix + " " + single_issue_message
continue
context_li = ET.SubElement(parent_element, "li")
context_li.text = _format_single_context_string(context[0], context[1])
context_ul = ET.SubElement(context_li, "ul")
_create_error_tree(value, context_ul, add_link)
return parent_element
[docs]
def replace_tag_references(list_or_dict):
"""Utility function to remove any references to tags, strings, etc. from any type of nested list or dict.
Use this if you want to save out issues to a file.
If you'd prefer a copy returned, use replace_tag_references(list_or_dict.copy()).
Parameters:
list_or_dict (list or dict): An arbitrarily nested list/dict structure
"""
if isinstance(list_or_dict, dict):
for key, value in list_or_dict.items():
if isinstance(value, (dict, list)):
replace_tag_references(value)
elif isinstance(value, (bool, float, int)):
list_or_dict[key] = value
else:
list_or_dict[key] = str(value)
elif isinstance(list_or_dict, list):
for key, value in enumerate(list_or_dict):
if isinstance(value, (dict, list)):
replace_tag_references(value)
elif isinstance(value, (bool, float, int)):
list_or_dict[key] = value
else:
list_or_dict[key] = str(value)