"""Main command-line program for running the remodeling tools."""
import os
import io
import json
import argparse
import logging
from hed.errors.exceptions import HedFileError
from hed.models.sidecar import Sidecar
from remodeler.remodeler_validator import RemodelerValidator
from remodeler.dispatcher import Dispatcher
from remodeler.backup_manager import BackupManager
from hed.tools.util import io_util
from hed.tools.bids import bids_util
[docs]
def get_parser():
"""Create a parser for the run_remodel command-line arguments.
Returns:
argparse.ArgumentParser: A parser for parsing the command line arguments.
"""
parser = argparse.ArgumentParser(description="Converts event files based on a json file specifying operations.")
parser.add_argument("data_dir", help="Full path of dataset root directory.")
parser.add_argument("model_path", help="Full path of the file with remodeling instructions.")
parser.add_argument(
"-bd", "--backup_dir", default="", dest="backup_dir", help="Directory for the backup that is being created"
)
parser.add_argument(
"-bn",
"--backup-name",
default=BackupManager.DEFAULT_BACKUP_NAME,
dest="backup_name",
help="Name of the default backup for remodeling",
)
parser.add_argument(
"-b",
"--bids-format",
action="store_true",
dest="use_bids",
help="If present, the dataset is in BIDS format with sidecars. HED analysis is available.",
)
parser.add_argument(
"-fs",
"--file-suffix",
dest="suffixes",
nargs="*",
default=["events"],
help="Optional list of suffixes (no under_bar) of tsv files to validate. If -s with no values, will use all possible suffixes as with single argument '*'.",
)
parser.add_argument(
"-i",
"--individual-summaries",
dest="individual_summaries",
default="separate",
choices=["separate", "consolidated", "none"],
help="Controls individual file summaries ('none', 'separate', 'consolidated')",
)
parser.add_argument(
"-j", "--json-sidecar", dest="json_sidecar", nargs="?", help="Optional path to JSON sidecar with HED information"
)
parser.add_argument("-ld", "--log_dir", dest="log_dir", default="", help="Directory for storing log entries for errors.")
parser.add_argument(
"-nb",
"--no-backup",
action="store_true",
dest="no_backup",
help="If present, the operations are run directly on the files with no backup.",
)
parser.add_argument(
"-ns",
"--no-summaries",
action="store_true",
dest="no_summaries",
help="If present, the summaries are not saved, but rather discarded.",
)
parser.add_argument(
"-nu",
"--no-update",
action="store_true",
dest="no_update",
help="If present, the files are not saved, but rather discarded.",
)
parser.add_argument(
"-hv",
"--hed-versions",
dest="hed_versions",
nargs="*",
default=[],
help="Optional list of HED schema versions used for annotation, include prefixes.",
)
parser.add_argument(
"-s",
"--save-formats",
nargs="*",
default=[".json", ".txt"],
dest="save_formats",
help="Format for saving any summaries, if any. If no summaries are to be written," + "use the -ns option.",
)
parser.add_argument(
"-t",
"--task-names",
dest="task_names",
nargs="*",
default=[],
help="The names of the task. If an empty list is given, all tasks are lumped together."
+ " If * is given, then tasks are found and reported individually.",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="If present, output informative messages as computation progresses."
)
parser.add_argument(
"-w",
"--work-dir",
default="",
dest="work_dir",
help="If given, is the path to directory for saving, otherwise derivatives/remodel is used.",
)
parser.add_argument(
"-x",
"--exclude-dirs",
nargs="*",
default=[],
dest="exclude_dirs",
help="Directories names to exclude from search for files.",
)
return parser
[docs]
def handle_backup(args):
"""Restore the backup if applicable.
Parameters:
args (obj): Parsed arguments as an object.
Returns:
Union[str, None]: Backup name if there was a backup done.
"""
if args.no_backup:
backup_name = None
else:
backup_man = BackupManager(args.data_dir)
if not backup_man.get_backup(args.backup_name):
raise HedFileError(
"BackupDoesNotExist", f"Backup {args.backup_name} does not exist. " f"Please run_remodel_backup first", ""
)
backup_man.restore_backup(args.backup_name, args.task_names, verbose=args.verbose)
backup_name = args.backup_name
return backup_name
[docs]
def parse_arguments(arg_list=None):
"""Parse the command line arguments or arg_list if given.
Parameters:
arg_list (list): List of command line arguments as a list.
Returns:
Tuple[Object, list]:
- Argument object.
- A list of parsed operations (each operation is a dictionary).
Raises:
ValueError: If the operations were unable to be correctly parsed.
"""
parser = get_parser()
args = parser.parse_args(arg_list)
if "*" in args.suffixes:
args.suffixes = None
args.data_dir = os.path.realpath(args.data_dir)
args.exclude_dirs = args.exclude_dirs + ["remodel"]
args.model_path = os.path.realpath(args.model_path)
if args.verbose:
print(f"Data directory: {args.data_dir}\nModel path: {args.model_path}")
with open(args.model_path, "r") as fp:
operations = json.load(fp)
validator = RemodelerValidator()
errors = validator.validate(operations)
if errors:
raise ValueError("UnableToFullyParseOperations", f"Fatal operation error, cannot continue:\n{errors}")
return args, operations
[docs]
def parse_tasks(files, task_args):
"""Parse the tasks argument to get a task list.
Parameters:
files (list): List of full paths of files.
task_args (str or list): The argument values for the task parameter.
"""
if not task_args:
return {"": files}
task_dict = io_util.get_task_dict(files)
if task_args == "*" or isinstance(task_args, list) and task_args[0] == "*":
return task_dict
task_dict = {key: task_dict[key] for key in task_args if key in task_dict}
return task_dict
[docs]
def run_ops(dispatch, args, tabular_files):
"""Run the remodeler on files of a specified form in a directory tree.
Parameters:
dispatch (Dispatcher): Controls the application of the operations and backup.
args (argparse.Namespace): Dictionary of arguments and their values.
tabular_files (list): List of files to include in this run.
"""
if args.verbose:
print(f"Found {len(tabular_files)} files to process")
if hasattr(args, "json_sidecar"):
base_sidecar = Sidecar(args.json_sidecar, name=args.json_sidecar)
else:
base_sidecar = None
for file_path in tabular_files:
if not base_sidecar and args.use_bids:
sidecar = get_sidecar(file_path, args.data_dir)
else:
sidecar = base_sidecar
if args.verbose:
print(f"Tabular file {file_path} sidecar {str(sidecar)}")
df = dispatch.run_operations(file_path, verbose=args.verbose, sidecar=sidecar)
if not args.no_update:
df.to_csv(file_path, sep="\t", index=False, header=True)
[docs]
def get_sidecar(data_dir, tsv_path):
"""Get the sidecar for a file if it exists.
Parameters:
data_dir (str): Full path of the data directory.
tsv_path (str): Full path of the file.
Returns:
Union[Sidecar, None]: The Sidecar if it exists, otherwise None.
"""
merged_dict = bids_util.get_merged_sidecar(data_dir, tsv_path)
if not merged_dict:
return None
name = "merged_" + io_util.get_basename(tsv_path)[0] + ".json"
return Sidecar(files=io.StringIO(json.dumps(merged_dict)), name=name)
[docs]
def main(arg_list=None):
"""The command-line program.
Parameters:
arg_list (list or None): Called with value None when called from the command line.
Otherwise, called with the command-line parameters as an argument list.
Raises:
HedFileError:
- if the data root directory does not exist.
- if the specified backup does not exist.
"""
args, operations = parse_arguments(arg_list)
if args.log_dir:
os.makedirs(args.log_dir, exist_ok=True)
timestamp = "_" + io_util.get_timestamp()
else:
timestamp = ""
try:
if not os.path.isdir(args.data_dir):
raise HedFileError("DataDirectoryDoesNotExist", f"The root data directory {args.data_dir} does not exist", "")
backup_name = handle_backup(args)
save_dir = None
if args.work_dir:
save_dir = os.path.realpath(os.path.join(args.work_dir, Dispatcher.REMODELING_SUMMARY_PATH))
tsv_files = io_util.get_file_list(
args.data_dir, name_suffix=args.suffixes, extensions=[".tsv"], exclude_dirs=args.exclude_dirs
)
task_dict = parse_tasks(tsv_files, args.task_names)
for task, files in task_dict.items():
dispatch = Dispatcher(operations, data_root=args.data_dir, backup_name=backup_name, hed_versions=args.hed_versions)
run_ops(dispatch, args, files)
if not args.no_summaries:
dispatch.save_summaries(
args.save_formats, individual_summaries=args.individual_summaries, summary_dir=save_dir, task_name=task
)
except Exception:
if args.log_dir:
log_name = io_util.get_alphanumeric_path(os.path.realpath(args.data_dir)) + timestamp + ".txt"
logging.basicConfig(filename=os.path.join(args.log_dir, log_name), level=logging.ERROR)
logging.exception(f"{args.data_dir}: {args.model_path}")
raise
if __name__ == "__main__":
main()