"""Utilities for assembly and conversion of HED strings to different forms."""
import re
import math
from collections import defaultdict
from functools import partial
import pandas as pd
from hed.models.hed_string import HedString
from hed.models.model_constants import DefTagNames
from hed.models.definition_dict import DefinitionDict
[docs]
def shrink_defs(df, hed_schema, columns=None):
"""Shrink (in place) any def-expand tags found in the specified columns in the dataframe.
Parameters:
df (pd.Dataframe or pd.Series): The dataframe or series to modify.
hed_schema (HedSchema or None): The schema to use to identify defs.
columns (list or None): The columns to modify on the dataframe.
"""
if isinstance(df, pd.Series):
mask = df.str.contains("Def-expand/", case=False)
df[mask] = df[mask].apply(partial(_shrink_defs, hed_schema=hed_schema))
else:
if columns is None:
columns = df.columns
for column in columns:
mask = df[column].str.contains("Def-expand/", case=False)
df.loc[mask, column] = df[column][mask].apply(partial(_shrink_defs, hed_schema=hed_schema))
[docs]
def expand_defs(df, hed_schema, def_dict, columns=None):
"""Expands any def tags found in the dataframe.
Converts in place
Parameters:
df (pd.Dataframe or pd.Series): The dataframe or series to modify.
hed_schema (HedSchema or None): The schema to use to identify defs.
def_dict (DefinitionDict): The definitions to expand.
columns (list or None): The columns to modify on the dataframe.
"""
if isinstance(df, pd.Series):
mask = df.str.contains("Def/", case=False)
df[mask] = df[mask].apply(partial(_expand_defs, hed_schema=hed_schema, def_dict=def_dict))
else:
if columns is None:
columns = df.columns
for column in columns:
mask = df[column].str.contains("Def/", case=False)
df.loc[mask, column] = df.loc[mask, column].apply(
partial(_expand_defs, hed_schema=hed_schema, def_dict=def_dict)
)
def _convert_to_form(hed_string, hed_schema, tag_form):
return str(HedString(hed_string, hed_schema).get_as_form(tag_form))
def _shrink_defs(hed_string, hed_schema):
return str(HedString(hed_string, hed_schema).shrink_defs())
def _expand_defs(hed_string, hed_schema, def_dict):
return str(HedString(hed_string, hed_schema, def_dict).expand_defs())
[docs]
def process_def_expands(
hed_strings, hed_schema, known_defs=None, ambiguous_defs=None
) -> tuple["DefinitionDict", dict, dict]:
"""Gather def-expand tags in the strings/compare with known definitions to find any differences.
Parameters:
hed_strings (list or pd.Series): A list of HED strings to process.
hed_schema (HedSchema): The schema to use.
known_defs (DefinitionDict or list or str or None):
A DefinitionDict or anything its constructor takes. These are the known definitions going in, that must
match perfectly.
ambiguous_defs (dict): A dictionary containing ambiguous definitions.
format TBD. Currently def name key: list of lists of HED tags values
Returns:
tuple [DefinitionDict, dict, dict]: A tuple containing the DefinitionDict, ambiguous definitions, and a
dictionary of error lists keyed by definition name
"""
from hed.models.def_expand_gather import DefExpandGatherer
def_gatherer = DefExpandGatherer(hed_schema, known_defs, ambiguous_defs)
return def_gatherer.process_def_expands(hed_strings)
[docs]
def sort_dataframe_by_onsets(df):
"""Sort a dataframe by the onset column.
Parameters:
df(pd.Dataframe): Dataframe to sort.
Returns:
pd.DataFrame: The sorted dataframe, or the original dataframe if it didn't have an onset column.
"""
if "onset" in df.columns:
# Create a copy and sort by onsets as floats(if needed), but continue to keep the string version.
df_copy = df.copy()
df_copy["_temp_onset_sort"] = pd.to_numeric(df_copy["onset"], errors="coerce")
df_copy.sort_values(by="_temp_onset_sort", inplace=True)
df_copy.drop(columns=["_temp_onset_sort"], inplace=True)
return df_copy
return df
def replace_ref(text, old_value, new_value="n/a"):
"""Replace column ref in x with y. If it's n/a, delete extra commas/parentheses.
Parameters:
text (str): The input string containing the ref enclosed in curly braces.
old_value (str): The full tag or ref to replace
new_value (str): The replacement value for the ref.
Returns:
str: The modified string with the ref replaced or removed.
"""
# If it's not n/a, we can just replace directly.
if new_value != "n/a" and new_value != "":
return text.replace(old_value, new_value)
def _remover(match):
p1 = match.group("p1").count("(")
p2 = match.group("p2").count(")")
if p1 > p2: # We have more starting parens than ending. Make sure we don't remove comma before
output = match.group("c1") + "(" * (p1 - p2)
elif p2 > p1: # We have more ending parens. Make sure we don't remove comma after
output = ")" * (p2 - p1) + match.group("c2")
else:
c1 = match.group("c1")
c2 = match.group("c2")
if c1:
c1 = ""
elif c2:
c2 = ""
output = c1 + c2
return output
# this finds all surrounding commas and parentheses to a reference.
# c1/c2 contain the comma(and possibly spaces) separating this ref from other tags
# p1/p2 contain the parentheses directly surrounding the tag
# All four groups can have spaces.
pattern = r"(?P<c1>[\s,]*)(?P<p1>[(\s]*)" + old_value + r"(?P<p2>[\s)]*)(?P<c2>[\s,]*)"
return re.sub(pattern, _remover, text)
def _handle_curly_braces_refs(df, refs, column_names):
"""Fills in the refs in the dataframe
You probably shouldn't call this function directly, but rather use base input.
Parameters:
df(pd.DataFrame): The dataframe to modify
refs(list or pd.Series): a list of column refs to replace(without {})
column_names(list): the columns we are interested in(should include all ref columns)
Returns:
pd.DataFrame: The modified dataframe with refs replaced
"""
# Filter out columns and refs that don't exist.
refs_new = [ref for ref in refs if ref in column_names]
remaining_columns = [column for column in column_names if column not in refs_new]
other_refs = [ref for ref in refs if ref not in column_names]
new_df = df.copy()
# Replace references in the columns we are saving out.
saved_columns = new_df[refs_new]
for column_name in remaining_columns:
for replacing_name in refs_new:
# If the data has no n/a values, this version is MUCH faster.
# column_name_brackets = f"{{{replacing_name}}}"
# df[column_name] = pd.Series(x.replace(column_name_brackets, y) for x, y
# in zip(df[column_name], saved_columns[replacing_name]))
new_df[column_name] = pd.Series(
replace_ref(x, f"{{{replacing_name}}}", y)
for x, y in zip(new_df[column_name], saved_columns[replacing_name], strict=False)
)
# Handle the special case of {HED} when the tsv file has no {HED} column
if "HED" in refs and "HED" not in column_names:
for column_name in remaining_columns:
new_df[column_name] = pd.Series(replace_ref(x, "{HED}", "n/a") for x in new_df[column_name])
# Handle any other refs that aren't in the dataframe.
for ref in other_refs:
for column_name in remaining_columns:
new_df[column_name] = pd.Series(replace_ref(x, "{" + ref + "}", "n/a") for x in new_df[column_name])
new_df = new_df[remaining_columns]
return new_df
# todo: Consider updating this to be a pure string function(or at least, only instantiating the Duration tags)
[docs]
def filter_series_by_onset(series, onsets):
"""Return the series, with rows that have the same onset combined.
Parameters:
series(pd.Series or pd.Dataframe): The series to filter. If dataframe, it filters the "HED" column.
onsets(pd.Series): The onset column to filter by.
Returns:
Union[Series, Dataframe]: the series with rows filtered together.
"""
indexed_dict = _indexed_dict_from_onsets(pd.to_numeric(onsets, errors="coerce"))
y = _filter_by_index_list(series, indexed_dict=indexed_dict)
return y
def _indexed_dict_from_onsets(onsets):
"""Finds series of consecutive lines with the same (or close enough) onset."""
current_onset = -1000000.0
tol = 1e-9
indexed_dict = defaultdict(list)
for i, onset in enumerate(onsets):
if math.isnan(onset): # Ignore NaNs
continue
if abs(onset - current_onset) > tol:
current_onset = onset
indexed_dict[current_onset].append(i)
return indexed_dict
def _filter_by_index_list(original_data, indexed_dict):
"""Filters a series or dataframe by the indexed_dict, joining lines as indicated"""
if isinstance(original_data, pd.Series):
data_series = original_data
elif isinstance(original_data, pd.DataFrame):
data_series = original_data["HED"]
else:
raise TypeError("Input must be a pandas Series or DataFrame")
new_series = pd.Series([""] * len(data_series), dtype=data_series.dtype)
for _onset, indices in indexed_dict.items():
if indices:
first_index = indices[0]
new_series.iloc[first_index] = ",".join([str(data_series.iloc[i]) for i in indices])
if isinstance(original_data, pd.Series):
return new_series
else:
result_df = original_data.copy()
result_df["HED"] = new_series
return result_df