Source code for hed.schema.schema_comparer

"""Functions supporting comparison of schemas."""

import pandas as pd
from collections import defaultdict

from hed.schema.hed_schema import HedKey
from hed.schema.hed_schema_constants import HedSectionKey
from hed.schema.schema_io.df_constants import (
    EXTERNAL_ANNOTATION_KEY,
    PREFIXES_KEY,
    SOURCES_KEY,
    UNIQUE_EXTRAS_KEYS,
    in_library as _in_library,
)


[docs] class SchemaComparer: """Class for comparing HED schemas and generating change logs.""" # Class-level constants MISC_SECTION = "misc" HED_ID_SECTION = "HedId changes" EXTRAS_SECTION = "Extras changes" SOURCES = SOURCES_KEY PREFIXES = PREFIXES_KEY ANNOTATION_PROPERTY_EXTERNAL = EXTERNAL_ANNOTATION_KEY SECTION_ENTRY_NAMES = { HedSectionKey.Tags: "Tag", HedSectionKey.Units: "Unit", HedSectionKey.UnitClasses: "Unit Class", HedSectionKey.ValueClasses: "Value Class", HedSectionKey.UnitModifiers: "Unit Modifier", HedSectionKey.Properties: "Property", HedSectionKey.Attributes: "Attribute", MISC_SECTION: "Misc Metadata", HED_ID_SECTION: "Modified Hed Ids", SOURCES: "Sources", PREFIXES: "Prefixes", ANNOTATION_PROPERTY_EXTERNAL: "AnnotationPropertyExternal", } SECTION_ENTRY_NAMES_PLURAL = { HedSectionKey.Tags: "Tags", HedSectionKey.Units: "Units", HedSectionKey.UnitClasses: "Unit Classes", HedSectionKey.ValueClasses: "Value Classes", HedSectionKey.UnitModifiers: "Unit Modifiers", HedSectionKey.Properties: "Properties", HedSectionKey.Attributes: "Attributes", MISC_SECTION: "Misc Metadata", HED_ID_SECTION: "Modified Hed Ids", EXTRAS_SECTION: "Extras", SOURCES: "Sources", PREFIXES: "Prefixes", ANNOTATION_PROPERTY_EXTERNAL: "AnnotationPropertyExternal", } def __init__(self, schema1, schema2): """Initialize the SchemaComparer with two schemas. Parameters: schema1 (HedSchema): The first schema to compare (typically older version). schema2 (HedSchema): The second schema to compare (typically newer version). """ self.schema1 = schema1 self.schema2 = schema2
[docs] def find_matching_tags(self, sections=(HedSectionKey.Tags,), return_string=True): """Find tags with matching names in both schemas. This method identifies all entries that exist in both schemas with the same name, regardless of whether their attributes differ. Parameters: sections (tuple): Tuple of HedSectionKey values indicating which sections to compare. Default is (HedSectionKey.Tags,). return_string (bool): If True, return formatted string. If False, return dictionary. Default is True. Returns: str or dict: If return_string is True, returns formatted string listing matching tags. If False, returns dictionary mapping section keys to dictionaries of matching tag entries. """ matches, _, _, unequal_entries = self.compare_schemas(sections=sections) header_summary = self._get_tag_name_summary((matches, unequal_entries)) # Combine the two dictionaries for section_key, section_dict in matches.items(): section_dict.update(unequal_entries[section_key]) if return_string: final_string = "Nodes with matching names:\n" final_string += self._pretty_print_header(header_summary) return final_string return matches
[docs] def compare_schemas(self, attribute_filter=HedKey.InLibrary, sections=(HedSectionKey.Tags,)): """Compare two schemas section by section, categorizing entries by match status. This is the core comparison method that categorizes all schema entries into four groups: matches (identical entries), entries only in schema1, entries only in schema2, and entries with the same name but different attributes. Parameters: attribute_filter (HedKey or None): If provided, only entries with this attribute are compared. Set to None to compare all entries. Default is HedKey.InLibrary. sections (tuple or None): Tuple of HedSectionKey values to compare. If None, compares all sections including miscellaneous metadata. Default is (HedSectionKey.Tags,). Returns: tuple: Four dictionaries (matches, not_in_schema1, not_in_schema2, unequal_entries): - matches: Entries identical in both schemas - not_in_schema1: Entries only in schema2 - not_in_schema2: Entries only in schema1 - unequal_entries: Entries with same name but different attributes """ matches, not_in_schema2, not_in_schema1, unequal_entries = {}, {}, {}, {} # Handle miscellaneous sections if sections is None or self.MISC_SECTION in sections: unequal_entries[self.MISC_SECTION] = {} if self.schema1.get_save_header_attributes() != self.schema2.get_save_header_attributes(): unequal_entries[self.MISC_SECTION]["header_attributes"] = ( str(self.schema1.get_save_header_attributes()), str(self.schema2.get_save_header_attributes()), ) if self.schema1.prologue != self.schema2.prologue: unequal_entries[self.MISC_SECTION]["prologue"] = (self.schema1.prologue, self.schema2.prologue) if self.schema1.epilogue != self.schema2.epilogue: unequal_entries[self.MISC_SECTION]["epilogue"] = (self.schema1.epilogue, self.schema2.epilogue) # Compare sections for section_key in HedSectionKey: if sections is not None and section_key not in sections: continue dict1, dict2 = {}, {} section1, section2 = self.schema1[section_key], self.schema2[section_key] name_attribute = "short_tag_name" if section_key == HedSectionKey.Tags else "name" for entry in section1.all_entries: if not attribute_filter or entry.has_attribute(attribute_filter): dict1[getattr(entry, name_attribute)] = entry for entry in section2.all_entries: if not attribute_filter or entry.has_attribute(attribute_filter): dict2[getattr(entry, name_attribute)] = entry not_in_schema2[section_key] = {key: dict1[key] for key in dict1 if key not in dict2} not_in_schema1[section_key] = {key: dict2[key] for key in dict2 if key not in dict1} unequal_entries[section_key] = { key: (dict1[key], dict2[key]) for key in dict1 if key in dict2 and dict1[key] != dict2[key] } matches[section_key] = { key: (dict1[key], dict2[key]) for key in dict1 if key in dict2 and dict1[key] == dict2[key] } return matches, not_in_schema1, not_in_schema2, unequal_entries
[docs] def gather_schema_changes(self, attribute_filter=None): """Generate a structured changelog by comparing the two schemas. This method performs a comprehensive comparison and produces a categorized change dictionary suitable for version control and documentation. Changes are classified by severity (Major, Minor, Patch, Unknown) and organized by schema section. Parameters: attribute_filter (HedKey or None): If provided, only entries with this attribute are compared. Set to None to compare all entries. Default is None. Returns: dict: Dictionary mapping section keys to lists of change dictionaries. Each change dictionary contains 'change_type', 'change' (description), and 'tag' (affected entry). """ _, not_in_1, not_in_2, unequal_entries = self.compare_schemas(attribute_filter=attribute_filter, sections=None) change_dict = defaultdict(list) self._add_removed_items(change_dict, not_in_2) self._add_added_items(change_dict, not_in_1) self._add_unequal_entries(change_dict, unequal_entries) self._add_extras_changes(change_dict) self._sort_changes_by_severity(change_dict) return {key: change_dict[key] for key in self.SECTION_ENTRY_NAMES if key in change_dict}
[docs] def pretty_print_change_dict(self, change_dict, title="Schema changes", use_markdown=True): """Format a change dictionary into a human-readable string. Converts the structured change dictionary from gather_schema_changes into a formatted text report suitable for display or documentation. Parameters: change_dict (dict): Dictionary of changes as returned by gather_schema_changes. title (str): Title for the change report. Default is "Schema changes". use_markdown (bool): If True, use markdown formatting (bold headers, bullet points). If False, use plain text with tabs. Default is True. Returns: str: Formatted string representation of the changes. """ final_strings = [] line_prefix = " - " if use_markdown else "\t" if change_dict: final_strings.append(title) final_strings.append("") # add blank line for section_key, section_dict in change_dict.items(): name = self.SECTION_ENTRY_NAMES_PLURAL.get(section_key, section_key) line_endings = "**" if use_markdown else "" final_strings.append(f"{line_endings}{name}:{line_endings}") for item in section_dict: change, tag, change_type = item["change"], item["tag"], item["change_type"] final_strings.append(f"{line_prefix}{tag} ({change_type}): {change}") final_strings.append("") return "\n".join(final_strings)
[docs] def compare_differences(self, attribute_filter=None, title=""): """Compare two schemas and return a formatted report of all differences. This is a convenience method that combines gather_schema_changes and pretty_print_change_dict to produce a complete, human-readable comparison report. Parameters: attribute_filter (HedKey or None): If provided, only entries with this attribute are compared. Set to None to compare all entries. Default is None. title (str): Custom title for the report. If empty, generates title from schema names. Default is empty string. Returns: str: Formatted markdown string describing all differences between the schemas. """ changelog = self.gather_schema_changes(attribute_filter=attribute_filter) if not title: title = f"Differences between {self.schema1.name} and {self.schema2.name}" return self.pretty_print_change_dict(changelog, title=title)
# Private helper methods def _pretty_print_header(self, summary_dict): """Format a summary dictionary of tag names by section into a string. Parameters: summary_dict (dict): Dictionary mapping section keys to lists of tag names. Returns: str: Formatted string with section headers and comma-separated tag names. """ output_string = "" first_entry = True for section_key, tag_names in summary_dict.items(): if not tag_names: continue type_name = self.SECTION_ENTRY_NAMES_PLURAL[section_key] if not first_entry: output_string += "\n" output_string += f"{type_name}: " output_string += ", ".join(sorted(tag_names)) output_string += "\n" first_entry = False return output_string @staticmethod def _get_tag_name_summary(tag_dicts): """Combine multiple tag dictionaries into a unified summary organized by section. Parameters: tag_dicts (tuple or list): Collection of dictionaries mapping section keys to tag entries. Returns: dict: Dictionary mapping section keys to lists of all tag names from all input dictionaries. """ out_dict = {section_key: [] for section_key in HedSectionKey} for tag_dict in tag_dicts: for section_key, section in tag_dict.items(): out_dict[section_key].extend(section.keys()) return out_dict def _add_removed_items(self, change_dict, not_in_2): """Add entries for items removed from schema2 to the change dictionary. Parameters: change_dict (defaultdict): Change dictionary to append to. not_in_2 (dict): Dictionary of entries present in schema1 but not in schema2. """ for section_key, section in not_in_2.items(): for tag, _ in section.items(): type_name = self.SECTION_ENTRY_NAMES_PLURAL[section_key] change_type = "Major" if section_key == HedSectionKey.Tags else "Unknown" change_dict[section_key].append( {"change_type": change_type, "change": f"Tag {tag} deleted from {type_name}", "tag": tag} ) @staticmethod def _add_added_items(change_dict, not_in_1): """Add entries for items added to schema2 to the change dictionary. Parameters: change_dict (defaultdict): Change dictionary to append to. not_in_1 (dict): Dictionary of entries present in schema2 but not in schema1. """ for section_key, section in not_in_1.items(): for tag, _ in section.items(): change_dict[section_key].append({"change_type": "Minor", "change": f"Item {tag} added", "tag": tag}) def _add_unequal_entries(self, change_dict, unequal_entries): """Add entries for items with differing attributes to the change dictionary. Handles entries that exist in both schemas but have different attributes, descriptions, or other properties. Routes to specialized handlers for different section types. Parameters: change_dict (defaultdict): Change dictionary to append to. unequal_entries (dict): Dictionary mapping section keys to tuples of (entry1, entry2) for entries with the same name but different attributes. """ for section_key, changes in unequal_entries.items(): if section_key == self.MISC_SECTION: self._add_misc_section_changes(change_dict, section_key, changes) else: for tag, (entry1, entry2) in changes.items(): if section_key == HedSectionKey.UnitClasses: self._add_unit_classes_changes(change_dict, section_key, entry1, entry2) elif section_key == HedSectionKey.Tags: self._add_tag_changes(change_dict, section_key, entry1, entry2) self._check_other_attributes(change_dict, section_key, entry1, entry2) if entry1.description != entry2.description: change_dict[section_key].append( {"change_type": "Patch", "change": f"Description of {tag} modified", "tag": tag} ) @staticmethod def _add_misc_section_changes(change_dict, section_key, changes): """Add changes for miscellaneous metadata sections to the change dictionary. Handles changes to schema metadata like prologue, epilogue, and header attributes. Parameters: change_dict (defaultdict): Change dictionary to append to. section_key (str): The section identifier (typically MISC_SECTION). changes (dict): Dictionary mapping metadata field names to (old_value, new_value) tuples. """ for misc_section, (value1, value2) in changes.items(): change_type = "Patch" if "prologue" in misc_section or "epilogue" in misc_section else "Patch" change_desc = ( f"{misc_section} changed" if "prologue" in misc_section or "epilogue" in misc_section else f"{misc_section} changed from {value1} to {value2}" ) change_dict[section_key].append({"change_type": change_type, "change": change_desc, "tag": misc_section}) @staticmethod def _add_unit_classes_changes(change_dict, section_key, entry1, entry2): """Add changes in unit class definitions to the change dictionary. Compares the units contained in two unit class entries and records additions/removals. Parameters: change_dict (defaultdict): Change dictionary to append to. section_key (HedSectionKey): The section identifier (should be HedSectionKey.UnitClasses). entry1 (HedSchemaEntry): Unit class entry from schema1. entry2 (HedSchemaEntry): Unit class entry from schema2 with the same name. """ for unit in entry1.units: if unit not in entry2.units: change_dict[section_key].append( {"change_type": "Major", "change": f"Unit {unit} removed from {entry1.name}", "tag": entry1.name} ) for unit in entry2.units: if unit not in entry1.units: change_dict[section_key].append( {"change_type": "Patch", "change": f"Unit {unit} added to {entry2.name}", "tag": entry1.name} ) def _add_tag_changes(self, change_dict, section_key, entry1, entry2): """Add changes in tag definitions to the change dictionary. Compares unit classes, value classes, position in hierarchy, and suggested/related tags for two tag entries with the same name. Parameters: change_dict (defaultdict): Change dictionary to append to. section_key (HedSectionKey): The section identifier (should be HedSectionKey.Tags). entry1 (HedTagEntry): Tag entry from schema1. entry2 (HedTagEntry): Tag entry from schema2 with the same short name. """ for unit_class in entry1.unit_classes: if unit_class not in entry2.unit_classes: change_dict[section_key].append( { "change_type": "Major", "change": f"Unit class {unit_class} removed from {entry1.short_tag_name}", "tag": entry1.short_tag_name, } ) for unit_class in entry2.unit_classes: if unit_class not in entry1.unit_classes: change_dict[section_key].append( { "change_type": "Patch", "change": f"Unit class {unit_class} added to {entry2.short_tag_name}", "tag": entry1.short_tag_name, } ) for value_class in entry1.value_classes: if value_class not in entry2.value_classes: change_dict[section_key].append( { "change_type": "Unknown", "change": f"Value class {value_class} removed from {entry1.short_tag_name}", "tag": entry1.short_tag_name, } ) for value_class in entry2.value_classes: if value_class not in entry1.value_classes: change_dict[section_key].append( { "change_type": "Minor", "change": f"Value class {value_class} added to {entry2.short_tag_name}", "tag": entry1.short_tag_name, } ) if entry1.long_tag_name != entry2.long_tag_name: change_dict[section_key].append( { "change_type": "Minor", "change": f"Tag {entry1.short_tag_name} moved in schema from {entry1.long_tag_name} to {entry2.long_tag_name}", "tag": entry1.short_tag_name, } ) self._add_suggested_tag_changes(change_dict, entry1, entry2, HedKey.SuggestedTag, "Suggested tag") self._add_suggested_tag_changes(change_dict, entry1, entry2, HedKey.RelatedTag, "Related tag") @staticmethod def _add_suggested_tag_changes(change_dict, entry1, entry2, attribute, label): """Add changes for suggested or related tag attributes to the change dictionary. Compares multi-value tag attributes (like suggestedTag or relatedTag) between two entries. Parameters: change_dict (defaultdict): Change dictionary to append to. entry1 (HedTagEntry): Tag entry from schema1. entry2 (HedTagEntry): Tag entry from schema2 with the same short name. attribute (HedKey): The attribute to compare (e.g., HedKey.SuggestedTag). label (str): Human-readable label for the attribute (e.g., "Suggested tag"). """ related_tag1 = ", ".join(sorted(entry1.inherited_attributes.get(attribute, "").split(","))) related_tag2 = ", ".join(sorted(entry2.inherited_attributes.get(attribute, "").split(","))) if related_tag1 != related_tag2: if not related_tag1: related_tag1 = "empty" if not related_tag2: related_tag2 = "empty" change_dict[HedSectionKey.Tags].append( { "change_type": "Patch", "change": f"{label} changed on {entry1.short_tag_name} from {related_tag1} to {related_tag2}", "tag": entry1.short_tag_name, } ) def _check_other_attributes(self, change_dict, section_key, entry1, entry2): """Compare general attributes not handled by specialized methods. Checks all attributes except those already handled by specialized comparison methods (suggestedTag, relatedTag, unitClass, valueClass). Distinguishes between directly set attributes and inherited attributes for tags. Parameters: change_dict (defaultdict): Change dictionary to append to. section_key (HedSectionKey): The section identifier. entry1 (HedSchemaEntry): Entry from schema1. entry2 (HedSchemaEntry): Entry from schema2 with the same name. """ already_checked_attributes = [HedKey.RelatedTag, HedKey.SuggestedTag, HedKey.ValueClass, HedKey.UnitClass] unique_keys = set(entry1.attributes.keys()).union(entry2.attributes.keys()) if section_key == HedSectionKey.Tags: unique_inherited_keys = set(entry1.inherited_attributes.keys()).union(entry2.inherited_attributes.keys()) else: unique_inherited_keys = unique_keys all_unique_keys = unique_keys.union(unique_inherited_keys).difference(already_checked_attributes) for key in all_unique_keys: is_inherited = key in unique_inherited_keys is_direct = key in unique_keys if section_key == HedSectionKey.Tags: value1 = entry1.inherited_attributes.get(key) value2 = entry2.inherited_attributes.get(key) else: value1 = entry1.attributes.get(key) value2 = entry2.attributes.get(key) if value1 != value2: change_type = "Patch" start_text = f"Attribute {key} " if is_inherited and not is_direct: change_type = "Minor" start_text = f"Inherited attribute {key} " if value1 is True and value2 is None: end_text = "removed" elif value1 is None and value2 is True: end_text = "added" else: end_text = f"modified from {value1} to {value2}" use_section_key = section_key if key == HedKey.HedID: use_section_key = self.HED_ID_SECTION change_dict[use_section_key].append( { "change_type": change_type, "change": f"{start_text}{end_text}", "tag": entry1.name if section_key != HedSectionKey.Tags else entry1.short_tag_name, "section": section_key, } ) def _add_extras_changes(self, change_dict): """Compare extras dataframes between schemas and record differences. Extras include Sources, Prefixes, and AnnotationPropertyExternal sections stored as pandas DataFrames. Compares row-by-row using key columns to identify additions, removals, and modifications. Parameters: change_dict (defaultdict): Change dictionary to append to. """ extras1 = getattr(self.schema1, "extras", {}) or {} extras2 = getattr(self.schema2, "extras", {}) or {} all_keys = set(extras1.keys()).union(extras2.keys()) for key in all_keys: df1 = extras1.get(key) df2 = extras2.get(key) if df1 is None and df2 is not None: change_dict[key].append( {"change_type": "Minor", "change": f"Entire {key} section missing in first schema", "tag": key} ) continue if df2 is None and df1 is not None: change_dict[key].append( {"change_type": "Minor", "change": f"Entire {key} section missing in second schema", "tag": key} ) continue if df1 is None and df2 is None: continue df1 = df1.copy() df2 = df2.copy() df1.columns = [c.lower() for c in df1.columns] df2.columns = [c.lower() for c in df2.columns] key_cols = UNIQUE_EXTRAS_KEYS.get(key) if not key_cols: key_cols = sorted(c for c in set(df1.columns) & set(df2.columns) if c != _in_library) compare_cols = sorted(c for c in set(df1.columns) & set(df2.columns) if c != _in_library) if not compare_cols: continue df1 = df1[compare_cols] df2 = df2[compare_cols] diff_results = self._compare_dataframes(df1, df2, key_cols) for diff in diff_results: row_key = diff["row"] cols = diff["cols"] msg = diff["message"] if msg == "Row missing in first schema": change_dict[key].append( { "change_type": "Minor", "change": f"Row {row_key} missing in first schema", "tag": str(row_key), } ) elif msg == "Row missing in second schema": change_dict[key].append( { "change_type": "Minor", "change": f"Row {row_key} missing in second schema", "tag": str(row_key), } ) elif msg == "Duplicate keys found": change_dict[key].append( { "change_type": "Unknown", "change": f"Duplicate key {row_key} found in one or both schemas", "tag": str(row_key), } ) elif msg == "Column values differ": col_str = ", ".join(cols) if cols else "" change_dict[key].append( { "change_type": "Patch", "change": f"Row {row_key} columns differ: {col_str}", "tag": str(row_key), } ) @staticmethod def _compare_dataframes(df1, df2, key_cols): """Compare two dataframes row-by-row using key columns. Identifies rows that exist only in one dataframe, duplicate keys, and rows with differing column values. Parameters: df1 (pd.DataFrame): First dataframe to compare. df2 (pd.DataFrame): Second dataframe to compare. key_cols (list): List of column names to use as unique identifiers for rows. Returns: list: List of difference dictionaries, each containing 'row' (key value), 'cols' (list of differing columns), and 'message' (description of difference). """ results = [] df1_indexed = df1.set_index(key_cols) df2_indexed = df2.set_index(key_cols) all_keys = set(df1_indexed.index).union(df2_indexed.index) for key in all_keys: if key not in df1_indexed.index: results.append({"row": key, "cols": None, "message": "Row missing in first schema"}) elif key not in df2_indexed.index: results.append({"row": key, "cols": None, "message": "Row missing in second schema"}) else: row1 = df1_indexed.loc[key] row2 = df2_indexed.loc[key] if isinstance(row1, pd.DataFrame) or isinstance(row2, pd.DataFrame): results.append({"row": key, "cols": None, "message": "Duplicate keys found"}) continue unequal_cols = [col for col in df1.columns if col not in key_cols and row1[col] != row2[col]] if unequal_cols: results.append({"row": key, "cols": unequal_cols, "message": "Column values differ"}) return results @staticmethod def _sort_changes_by_severity(changes_dict): """Sort the changelist by severity. Parameters: changes_dict (dict): Dictionary mapping section keys to lists of change dicts. """ for section in changes_dict.values(): order = {"Major": 1, "Minor": 2, "Patch": 3, "Unknown": 4} section.sort(key=lambda x: order.get(x["change_type"], order["Unknown"]))