Source code for hed.tools.bids.bids_util

import os
import json
from hed.tools.util.io_util import get_full_extension
import hed.schema.hed_schema_io as hed_schema_io


[docs] def get_schema_from_description(root_path): try: description_path = os.path.abspath(os.path.join(root_path, "dataset_description.json")) with open(description_path, "r") as fp: dataset_description = json.load(fp) version = dataset_description.get("HEDVersion", None) return hed_schema_io.load_schema_version(version) except Exception as e: return None
[docs] def group_by_suffix(file_list): """ Group files by suffix. Parameters: file_list (list): List of file paths. Returns: dict: Dictionary with suffixes as keys and file lists as values. """ suffix_groups = {} for file_path in file_list: name, ext = get_full_extension(file_path) result = os.path.basename(name).rsplit('_', 1) if len(result) == 2: suffix_groups.setdefault(result[1], []).append(file_path) else: suffix_groups.setdefault(result[0], []).append(file_path) return suffix_groups
[docs] def parse_bids_filename(file_path): """Split a filename into BIDS-relevant components. Parameters: file_path (str): Path to be parsed. Returns: dict: Dictionary with keys 'basename', 'suffix', 'prefix', 'ext', 'bad', and 'entities'. Notes: - Splits into BIDS suffix, extension, and a dictionary of entity name-value pairs. """ name, ext = get_full_extension(file_path.strip()) basename = os.path.basename(name) name_dict = {"basename": basename, "suffix": None, "prefix": None, "ext": ext, "bad": [], "entities": {}} if not basename: return name_dict entity_pieces = basename.rsplit('_', 1) # Case: No underscore in filename → could be a single entity (e.g., "task-blech.tsv") if len(entity_pieces) == 1: entity_count = entity_pieces[0].count('-') if entity_count > 1: name_dict["bad"].append(entity_pieces[0]) elif entity_count == 1: # Looks like an entity-type pair update_entity(name_dict, entity_pieces[0]) else: name_dict["suffix"] = entity_pieces[0] return name_dict # Case: Underscore present → split into entities + possible suffix rest, suffix = entity_pieces # If suffix is a valid entity-type pair (e.g., "task-motor"), move it into the entity dictionary if '-' in suffix and suffix.count('-') == 1: update_entity(name_dict, suffix) else: name_dict["suffix"] = suffix # Look for prefix - first entity piece without a hyphen entity_pieces = rest.split('_') if '-' not in entity_pieces[0]: name_dict["prefix"] = entity_pieces[0] del entity_pieces[0] if len(entity_pieces) == 0: return name_dict # Process entities for entity in entity_pieces: update_entity(name_dict, entity) return name_dict
[docs] def update_entity(name_dict, entity): """Update the dictionary with a new entity. Parameters: name_dict (dict): Dictionary of entities. entity (str): Entity to be added. """ parts = entity.split('-') if len(parts) == 2 and all(parts): # Valid entity pair name_dict["entities"][parts[0]] = parts[1] else: name_dict["bad"].append(entity)
[docs] def get_merged_sidecar(root_path, tsv_file): sidecar_files = list(walk_back(root_path, tsv_file)) merged_sidecar = {} while sidecar_files: this_sidecar_file = sidecar_files.pop() with open(this_sidecar_file, 'r', encoding='utf-8') as this_sidecar: this_sidecar = json.load(this_sidecar) merged_sidecar.update(this_sidecar) return merged_sidecar
[docs] def walk_back(root_path, file_path): file_path = os.path.abspath(file_path) source_dir = os.path.dirname(file_path) root_path = os.path.abspath(root_path) # Normalize root_path for cross-platform support while source_dir and source_dir != root_path: candidates = get_candidates(source_dir, file_path) if len(candidates) == 1: yield candidates[0] elif len(candidates) > 1: raise Exception({ "code": "MULTIPLE_INHERITABLE_FILES", "location": candidates[0], "affects": file_path, "issueMessage": f"Candidate files: {candidates}", }) # Stop when we reach the root directory (handling Windows and Unix) new_source_dir = os.path.dirname(source_dir) if new_source_dir == source_dir or new_source_dir == root_path: break source_dir = new_source_dir
[docs] def get_candidates(source_dir, tsv_file_dict): candidates = [] for file in os.listdir(source_dir): this_path = os.path.realpath(os.path.join(source_dir, file)) if not os.path.isfile(this_path): continue bids_file_dict = parse_bids_filename(this_path) if not bids_file_dict or bids_file_dict["bad"]: continue if matches_criteria(bids_file_dict, tsv_file_dict): candidates.append(this_path) return candidates
[docs] def matches_criteria(json_file_dict, tsv_file_dict): extension_is_valid = json_file_dict["ext"].lower() == ".json" suffix_is_valid = (json_file_dict["suffix"] == tsv_file_dict["suffix"]) or not tsv_file_dict["suffix"] json_entities = json_file_dict["entities"] tsv_entities = tsv_file_dict["entities"] entities_match = all(json_entities.get(entity) == tsv_entities.get(entity) for entity in tsv_entities.keys()) return extension_is_valid and suffix_is_valid and entities_match