Source code for hed.tools.bids.bids_dataset
"""The contents of a BIDS dataset."""
import os
import logging
from hed.tools.bids.bids_file_group import BidsFileGroup
from hed.tools.bids import bids_util
from hed.tools.util import io_util
# Sentinel value for default arguments (avoids mutable default bug)
_SENTINEL = object()
[docs]
class BidsDataset:
"""A BIDS dataset representation primarily focused on HED evaluation.
Attributes:
root_path (str): Real root path of the BIDS dataset.
schema (HedSchema or HedSchemaGroup): The schema used for evaluation.
file_groups (dict): A dictionary of BidsFileGroup objects with a given file suffix.
"""
[docs]
def __init__(self, root_path, schema=None, suffixes=_SENTINEL, exclude_dirs=_SENTINEL):
"""Constructor for a BIDS dataset.
Parameters:
root_path (str): Root path of the BIDS dataset.
schema (HedSchema or HedSchemaGroup): A schema that overrides the one specified in dataset.
suffixes (list or None): File name suffixes of items to include.
If not provided, defaults to ['events', 'participants'].
If None or empty list, includes all files.
exclude_dirs (list or None): Directory names to exclude from traversal.
If not provided, defaults to ['sourcedata', 'derivatives', 'code', 'stimuli'].
If None or empty list, no directories are excluded.
"""
if suffixes is _SENTINEL:
suffixes = ["events", "participants"]
if exclude_dirs is _SENTINEL:
exclude_dirs = ["sourcedata", "derivatives", "code", "stimuli"]
logger = logging.getLogger("hed.bids_dataset")
logger.debug(f"Initializing BidsDataset for path: {root_path}")
self.root_path = os.path.realpath(root_path)
logger.debug(f"Real root path resolved to: {self.root_path}")
if schema:
self.schema = schema
logger.debug(
f"Using provided schema: {schema.get_schema_versions() if hasattr(schema, 'get_schema_versions') else 'custom'}"
)
else:
logger.debug("Loading schema from dataset description...")
self.schema = bids_util.get_schema_from_description(self.root_path)
if self.schema:
logger.info(f"Loaded schema from dataset: {self.schema.get_schema_versions()}")
else:
logger.warning("No valid schema found in dataset description")
self.exclude_dirs = exclude_dirs
self.suffixes = suffixes
logger.debug(f"Using suffixes: {suffixes}, excluding directories: {exclude_dirs}")
logger.info("Setting up file groups...")
self.file_groups = self._set_file_groups()
self.bad_files = []
logger.info(f"BidsDataset initialized with {len(self.file_groups)} file groups: {list(self.file_groups.keys())}")
[docs]
def get_file_group(self, suffix):
"""Return the file group of files with the specified suffix.
Parameters:
suffix (str): Suffix of the BidsFileGroup to be returned.
Returns:
Union[BidsFileGroup, None]: The requested tabular group.
"""
return self.file_groups.get(suffix, None)
[docs]
def validate(self, check_for_warnings=False, schema=None):
"""Validate the dataset.
Parameters:
check_for_warnings (bool): If True, check for warnings.
schema (HedSchema or HedSchemaGroup or None): The schema used for validation.
Returns:
list: List of issues encountered during validation. Each issue is a dictionary.
"""
logger = logging.getLogger("hed.bids_dataset")
logger.info(f"Starting validation of {len(self.file_groups)} file groups")
logger.debug(f"Check for warnings: {check_for_warnings}")
issues = []
if schema:
this_schema = schema
logger.debug("Using provided schema for validation")
elif self.schema:
this_schema = self.schema
logger.debug(f"Using dataset schema for validation: {this_schema.get_schema_versions()}")
else:
logger.error("No valid schema available for validation")
return [
{
"code": "SCHEMA_LOAD_FAILED",
"message": "BIDS dataset_description.json has invalid HEDVersion and passed schema was invalid}",
}
]
for suffix, group in self.file_groups.items():
if group.has_hed:
logger.info(f"Validating file group: {suffix} ({len(group.datafile_dict)} files)")
group_issues = group.validate(this_schema, check_for_warnings=check_for_warnings)
logger.info(f"File group {suffix} validation completed: {len(group_issues)} issues found")
issues += group_issues
else:
logger.debug(f"Skipping file group {suffix} - no HED content")
logger.info(f"Dataset validation completed: {len(issues)} total issues found")
return issues
[docs]
def get_summary(self):
"""Return an abbreviated summary of the dataset."""
summary = {
"dataset": self.root_path,
"hed_schema_versions": self.schema.get_schema_versions(),
"file_group_types": f"{str(list(self.file_groups.keys()))}",
}
return summary
def _set_file_groups(self):
logger = logging.getLogger("hed.bids_dataset")
logger.debug(f"Searching for files with extensions ['.tsv', '.json'] and suffixes {self.suffixes}")
file_paths = io_util.get_file_list(
self.root_path, extensions=[".tsv", ".json"], exclude_dirs=self.exclude_dirs, name_suffix=self.suffixes
)
logger.debug(f"Found {len(file_paths)} files matching criteria")
file_dict = bids_util.group_by_suffix(file_paths)
logger.debug(f"Files grouped by suffix: {[(suffix, len(files)) for suffix, files in file_dict.items()]}")
file_groups = {}
for suffix, files in file_dict.items():
logger.debug(f"Creating file group for suffix '{suffix}' with {len(files)} files")
file_group = BidsFileGroup.create_file_group(self.root_path, files, suffix)
if file_group:
file_groups[suffix] = file_group
logger.debug(f"Successfully created file group for '{suffix}'")
else:
logger.warning(f"Failed to create file group for suffix '{suffix}'")
self.suffixes = list(file_groups.keys())
logger.info(f"Created {len(file_groups)} file groups: {list(file_groups.keys())}")
return file_groups