Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 157 additions & 16 deletions dapytains/tei/citeStructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class CitableStructure:
delim: str = ""
children: List["CitableStructure"] = field(default_factory=list)
metadata: List["CiteData"] = field(default_factory=list)
match: str = ""
# True when this unit's matched element is self-closing (a "milestone", e.g. <cb/>),
# meaning any nested citeStructure children are siblings bounded by the next milestone
# of the same kind, not actual descendants.
milestone: bool = False

def get(self, ref: str):
if self.use != "position()":
Expand Down Expand Up @@ -101,6 +106,11 @@ def __init__(self, root: saxonlib.PyXdmNode, processor: saxonlib.PySaxonProcesso
self.root = root
self.processor: saxonlib.PySaxonProcessor = processor
self.xpath_matcher: Dict[str, str] = {}
self.structure_by_key: Dict[str, CitableStructure] = {}
# `root` is the <refsDecl> element, not the document root: milestone-mode helpers need
# the actual document node since they evaluate relative ("./...") match expressions
# globally (bounded by document order), not relative to <refsDecl>.
self.doc_root: saxonlib.PyXdmNode = get_xpath_proc(self.root, processor=processor).evaluate_single("/")
self.regex_pattern, cite_structure = self.build_regex_and_xpath(
get_xpath_proc(self.root, processor=processor).evaluate_single("./citeStructure[1]")
)
Expand Down Expand Up @@ -165,6 +175,13 @@ def build_regex_and_xpath(

cite_structure.xpath = f"{match}/{use}"
cite_structure.xpath_match = f"{match}[{use}]"
cite_structure.match = match
self.structure_by_key[accumulated_units] = cite_structure

if children_cite_struct:
first_match = get_xpath_proc(self.doc_root, processor=self.processor).evaluate_single(f"({match})[1]")
if first_match is not None and not len(first_match.children):
cite_structure.milestone = True

child_regexes = []
parsed_children_cite_structure = []
Expand All @@ -190,16 +207,120 @@ def build_regex_and_xpath(

return current_regex, cite_structure

def generate_xpath(self, reference):
def _milestone_boundary(
self,
start_node: saxonlib.PyXdmNode,
parent_match: str
) -> Optional[saxonlib.PyXdmNode]:
""" Find the next node matching `parent_match` after `start_node`, i.e. the next
milestone of the same kind (e.g. the next <cb/> after the current one). """
xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor)
xpath_proc.declare_variable("__start")
xpath_proc.set_parameter("__start", start_node)
return xpath_proc.evaluate_single(f"({parent_match})[. >> $__start][1]")

def _milestone_window(
self,
start_node: saxonlib.PyXdmNode,
boundary_node: Optional[saxonlib.PyXdmNode],
child_xpath: str
):
""" Evaluate `child_xpath` against the whole document and restrict the result to nodes
occurring after `start_node` and (if given) before `boundary_node`, in document order. """
xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor)
xpath_proc.declare_variable("__start")
xpath_proc.set_parameter("__start", start_node)
if boundary_node is not None:
xpath_proc.declare_variable("__boundary")
xpath_proc.set_parameter("__boundary", boundary_node)
return xpath_proc.evaluate(f"({child_xpath})[. >> $__start][not(. >> $__boundary)]")
return xpath_proc.evaluate(f"({child_xpath})[. >> $__start]")

def _absolute_path(self, node: saxonlib.PyXdmNode) -> str:
""" Turn a concrete node into its real, DOM-accurate absolute positional XPath
(e.g. /TEI[1]/text[1]/body[1]/div[1]/ab[1]/lb[5]), so it can flow through the rest of
the pipeline (document.py's reconstruct_doc) exactly like any other absolute XPath. """
xpath_proc = get_xpath_proc(node, processor=self.processor)
return str(xpath_proc.evaluate_single(
"string-join(for $n in (ancestor-or-self::*) "
"return concat('/', name($n), '[', 1 + count($n/preceding-sibling::*[name() = name($n)]), ']'), '')"
))

def _parse_reference(self, reference: str) -> List[tuple]:
match = re.match(self.regex_pattern, reference)
if not match:
raise ValueError(f"Reference '{reference}' does not match the expected format.")
return [(k, v) for k, v in match.groupdict().items() if v]

def is_milestone_nested(self, reference: str) -> bool:
""" True if `reference`'s deepest unit is nested (directly or transitively) under a
milestone-mode parent (e.g. a <lb/> under a <cb/>), in which case any tag/attribute
based xpath fragment derived from it is not safe to reuse as a sibling-boundary match
(the same attribute value can occur in other milestones, e.g. line "1" of every column). """
groups = self._parse_reference(reference)
return any(self.structure_by_key[key].milestone for key, _ in groups[:-1])

def _resolve_groups(self, groups: List[tuple]) -> Optional[saxonlib.PyXdmNode]:
""" Resolve a (possibly partial, e.g. groups[:-1]) ordered list of (key, value) ref
groups to its concrete node, walking the chain level by level so that ancestor
disambiguation (e.g. "which page's column 2") is preserved at every step. """
xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor)
concrete_node = None
prev_structure: Optional[CitableStructure] = None
for key, value in groups:
formatted = self.xpath_matcher[key].format(**{key: value})
structure = self.structure_by_key[key]
if concrete_node is None:
concrete_node = xpath_proc.evaluate_single(f"({formatted})[1]")
elif prev_structure.milestone:
boundary = self._milestone_boundary(concrete_node, prev_structure.match)
concrete_node = (self._milestone_window(concrete_node, boundary, formatted) or [None])[0]
else:
local_proc = get_xpath_proc(concrete_node, processor=self.processor)
concrete_node = local_proc.evaluate_single(f"./{formatted}")
prev_structure = structure
return concrete_node

def resolve_node(self, reference: str) -> Optional[saxonlib.PyXdmNode]:
""" Resolve `reference` to its concrete node. """
return self._resolve_groups(self._parse_reference(reference))

match = {k:v for k, v in match.groupdict().items() if v}
xpath = "/".join([self.xpath_matcher[key].format(**{key: value}) for key, value in match.items()])
# This is a VERY dirty trick in case we have // down the road
xpath = xpath.replace("///", "//")
return xpath
def generate_xpath(self, reference):
groups = self._parse_reference(reference)

# Fast path: untouched, original behavior when no ancestor in the chain is a milestone.
if not any(self.structure_by_key[key].milestone for key, _ in groups[:-1]):
xpath = "/".join([self.xpath_matcher[key].format(**{key: value}) for key, value in groups])
# This is a VERY dirty trick in case we have // down the road
return xpath.replace("///", "//")

# Milestone-aware path: resolve to a concrete node step by step, since milestone parents
# (self-closing elements like <cb/>) cannot be joined with their children via "/", then
# turn it into its real, DOM-accurate absolute positional XPath so it flows through the
# rest of the pipeline (document.py's reconstruct_doc) like any other absolute XPath.
return self._absolute_path(self.resolve_node(reference))

def get_next_milestone_boundary(self, reference: str) -> Optional[saxonlib.PyXdmNode]:
""" If `reference`'s deepest unit is nested directly under a milestone-mode parent
(e.g. a <lb/> under a <cb/>), return the concrete node of the next occurrence of that
parent unit (e.g. the next <cb/>). Used as an upper bound for passage extraction when
there is no next sibling within the current milestone (e.g. last line of a column),
so content does not bleed into the next milestone's content (e.g. the next column).
Returns None if not applicable (no milestone parent, or no next occurrence). """
groups = self._parse_reference(reference)
if len(groups) < 2:
return None

parent_key, _ = groups[-2]
parent_structure = self.structure_by_key[parent_key]
if not parent_structure.milestone:
return None

# Resolve the parent through the full ancestor chain (groups[:-1]), not just its own
# value in isolation: e.g. "column 2" is ambiguous on its own when nested under "page",
# since column @n values repeat across pages.
start_node = self._resolve_groups(groups[:-1])
return self._milestone_boundary(start_node, parent_structure.match)

def _dispatch(
self,
Expand All @@ -209,35 +330,49 @@ def _dispatch(
unit: CitableUnit,
level: int):
# target = self.generate_xpath(child.ref)
target_root = xpath_processor.evaluate_single(child_xpath)
milestone_match = structure.match if structure.milestone else None
if len(structure.children) == 1:
self.find_refs(
root=xpath_processor.evaluate_single(child_xpath),
root=target_root,
structure=structure.children[0],
unit=unit,
level=level
level=level,
milestone_match=milestone_match
)
else:
self.find_refs_from_branches(
root=xpath_processor.evaluate_single(child_xpath),
root=target_root,
structure=structure.children,
unit=unit,
level=level
level=level,
milestone_match=milestone_match
)

def find_refs(
self,
root: saxonlib.PyXdmNode,
structure: CitableStructure = None,
unit: Optional[CitableUnit] = None,
level: int = 1
level: int = 1,
milestone_match: Optional[str] = None
) -> List[CitableUnit]:
xpath_proc = get_xpath_proc(elem=root, processor=self.processor)
prefix = (unit.ref + structure.delim) if unit else ""
units = []
xpath_prefix = "./" if unit else ""

# .evaluate returns None instead of an empty list...
for value in (xpath_proc.evaluate(f"{xpath_prefix}{structure.xpath}") or []):
if milestone_match is not None:
# `root` is a self-closing milestone node (e.g. <cb/>): its "children" are
# whatever matches structure.xpath between it and the next node matching
# milestone_match (the next milestone of the same kind), not its descendants.
boundary = self._milestone_boundary(root, milestone_match)
values = self._milestone_window(root, boundary, structure.xpath) or []
else:
xpath_prefix = "./" if unit else ""
# .evaluate returns None instead of an empty list...
values = xpath_proc.evaluate(f"{xpath_prefix}{structure.xpath}") or []

for value in values:
child = CitableUnit(
citeType=structure.citeType,
ref=f"{prefix}{value.string_value}",
Expand Down Expand Up @@ -275,16 +410,22 @@ def find_refs_from_branches(
root: saxonlib.PyXdmNode,
structure: List[CitableStructure],
unit: Optional[CitableUnit] = None,
level: int = 1
level: int = 1,
milestone_match: Optional[str] = None
) -> List[CitableUnit]:
xpath_proc = get_xpath_proc(elem=root, processor=self.processor)
prefix = (unit.ref) if unit else "" # ToDo: Reinject delim
units = []
xpath_prefix = "./" if unit else ""

boundary = self._milestone_boundary(root, milestone_match) if milestone_match is not None else None

unsorted = []
for s in structure:
results = xpath_proc.evaluate(f"{xpath_prefix}{s.xpath}")
if milestone_match is not None:
results = self._milestone_window(root, boundary, s.xpath)
else:
results = xpath_proc.evaluate(f"{xpath_prefix}{s.xpath}")
if results is not None:
unsorted.extend(
[
Expand Down
Loading
Loading