Source code for hed.tools.bids.bids_util

import os
import json
from hed.tools.util.io_util import get_full_extension
import hed.schema.hed_schema_io as hed_schema_io


[docs] def get_schema_from_description(root_path): try: description_path = os.path.abspath(os.path.join(root_path, "dataset_description.json")) with open(description_path, "r") as fp: dataset_description = json.load(fp) version = dataset_description.get("HEDVersion", None) return hed_schema_io.load_schema_version(version) except Exception: return None
[docs] def group_by_suffix(file_list): """Group files by suffix. Parameters: file_list (list): List of file paths. Returns: dict: Dictionary with suffixes as keys and file lists as values. """ suffix_groups = {} for file_path in file_list: name, ext = get_full_extension(file_path) result = os.path.basename(name).rsplit("_", 1) if len(result) == 2: suffix_groups.setdefault(result[1], []).append(file_path) else: suffix_groups.setdefault(result[0], []).append(file_path) return suffix_groups
[docs] def parse_bids_filename(file_path): """Split a filename into BIDS-relevant components. Parameters: file_path (str): Path to be parsed. Returns: dict: Dictionary with keys 'basename', 'suffix', 'prefix', 'ext', 'bad', and 'entities'. Notes: - Splits into BIDS suffix, extension, and a dictionary of entity name-value pairs. """ name, ext = get_full_extension(file_path.strip()) basename = os.path.basename(name) name_dict = {"basename": basename, "suffix": None, "prefix": None, "ext": ext, "bad": [], "entities": {}} if not basename: return name_dict entity_pieces = basename.rsplit("_", 1) # Case: No underscore in filename → could be a single entity (e.g., "task-blech.tsv") if len(entity_pieces) == 1: entity_count = entity_pieces[0].count("-") if entity_count > 1: name_dict["bad"].append(entity_pieces[0]) elif entity_count == 1: # Looks like an entity-type pair update_entity(name_dict, entity_pieces[0]) else: name_dict["suffix"] = entity_pieces[0] return name_dict # Case: Underscore present → split into entities + possible suffix rest, suffix = entity_pieces # If suffix is a valid entity-type pair (e.g., "task-motor"), move it into the entity dictionary if "-" in suffix and suffix.count("-") == 1: update_entity(name_dict, suffix) else: name_dict["suffix"] = suffix # Look for prefix - first entity piece without a hyphen entity_pieces = rest.split("_") if "-" not in entity_pieces[0]: name_dict["prefix"] = entity_pieces[0] del entity_pieces[0] if len(entity_pieces) == 0: return name_dict # Process entities for entity in entity_pieces: update_entity(name_dict, entity) return name_dict
[docs] def update_entity(name_dict, entity): """Update the dictionary with a new entity. Parameters: name_dict (dict): Dictionary of entities. entity (str): Entity to be added. """ parts = entity.split("-") if len(parts) == 2 and all(parts): # Valid entity pair name_dict["entities"][parts[0]] = parts[1] else: name_dict["bad"].append(entity)
[docs] def get_merged_sidecar(root_path, tsv_file): sidecar_files = list(walk_back(root_path, tsv_file)) merged_sidecar = {} # Process from closest to most distant - first file wins for each key for sidecar_file in sidecar_files: with open(sidecar_file, "r", encoding="utf-8") as f: sidecar_data = json.load(f) # Only add keys that don't already exist (closer files have precedence) for key, value in sidecar_data.items(): if key not in merged_sidecar: merged_sidecar[key] = value return merged_sidecar
[docs] def walk_back(root_path, file_path): file_path = os.path.abspath(file_path) source_dir = os.path.dirname(file_path) root_path = os.path.abspath(root_path) # Normalize root_path for cross-platform support # Parse the filename once to get the BIDS dictionary tsv_file_dict = parse_bids_filename(file_path) while source_dir and len(source_dir) >= len(root_path): candidates = get_candidates(source_dir, tsv_file_dict) if len(candidates) == 1: yield candidates[0] elif len(candidates) > 1: raise Exception( { "code": "MULTIPLE_INHERITABLE_FILES", "location": candidates[0], "affects": file_path, "issueMessage": f"Candidate files: {candidates}", } ) # Stop when we reach the root directory if source_dir == root_path: break new_source_dir = os.path.dirname(source_dir) if new_source_dir == source_dir: # Reached filesystem root break source_dir = new_source_dir
[docs] def get_candidates(source_dir, tsv_file_dict): candidates = [] for file in os.listdir(source_dir): this_path = os.path.realpath(os.path.join(source_dir, file)) if not os.path.isfile(this_path): continue bids_file_dict = parse_bids_filename(this_path) if not bids_file_dict or bids_file_dict["bad"]: continue if matches_criteria(bids_file_dict, tsv_file_dict): candidates.append(this_path) return candidates
[docs] def matches_criteria(json_file_dict, tsv_file_dict): extension_is_valid = json_file_dict["ext"].lower() == ".json" suffix_is_valid = (json_file_dict["suffix"] == tsv_file_dict["suffix"]) or not tsv_file_dict["suffix"] json_entities = json_file_dict["entities"] tsv_entities = tsv_file_dict["entities"] # BIDS inheritance: All entities in JSON must have matching values in TSV entities_match = all(json_entities.get(entity) == tsv_entities.get(entity) for entity in json_entities.keys()) return extension_is_valid and suffix_is_valid and entities_match