From db41bac4b885ba10d35c90ee1073f8d492450bbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thibault=20Cl=C3=A9rice?= Date: Wed, 17 Jun 2026 12:53:36 +0200 Subject: [PATCH] Support nested self-closing milestone citeStructures (e.g. cb/lb, pb/cb/lb) --- dapytains/tei/citeStructure.py | 173 +++++++++++++++++++++++++++--- dapytains/tei/document.py | 75 +++++++++---- tests/tei/cb_lb_milestones.xml | 45 ++++++++ tests/tei/pb_cb_lb_milestones.xml | 53 +++++++++ tests/test_tei.py | 137 +++++++++++++++++++++++ 5 files changed, 447 insertions(+), 36 deletions(-) create mode 100644 tests/tei/cb_lb_milestones.xml create mode 100644 tests/tei/pb_cb_lb_milestones.xml diff --git a/dapytains/tei/citeStructure.py b/dapytains/tei/citeStructure.py index f45751c..2ce98aa 100644 --- a/dapytains/tei/citeStructure.py +++ b/dapytains/tei/citeStructure.py @@ -33,6 +33,11 @@ class CitableStructure: delim: str = "" children: List["CitableStructure"] = field(default_factory=list) metadata: List["CiteData"] = field(default_factory=list) + match: str = "" + # True when this unit's matched element is self-closing (a "milestone", e.g. ), + # meaning any nested citeStructure children are siblings bounded by the next milestone + # of the same kind, not actual descendants. + milestone: bool = False def get(self, ref: str): if self.use != "position()": @@ -101,6 +106,11 @@ def __init__(self, root: saxonlib.PyXdmNode, processor: saxonlib.PySaxonProcesso self.root = root self.processor: saxonlib.PySaxonProcessor = processor self.xpath_matcher: Dict[str, str] = {} + self.structure_by_key: Dict[str, CitableStructure] = {} + # `root` is the element, not the document root: milestone-mode helpers need + # the actual document node since they evaluate relative ("./...") match expressions + # globally (bounded by document order), not relative to . + self.doc_root: saxonlib.PyXdmNode = get_xpath_proc(self.root, processor=processor).evaluate_single("/") self.regex_pattern, cite_structure = self.build_regex_and_xpath( get_xpath_proc(self.root, processor=processor).evaluate_single("./citeStructure[1]") ) @@ -165,6 +175,13 @@ def build_regex_and_xpath( cite_structure.xpath = f"{match}/{use}" cite_structure.xpath_match = f"{match}[{use}]" + cite_structure.match = match + self.structure_by_key[accumulated_units] = cite_structure + + if children_cite_struct: + first_match = get_xpath_proc(self.doc_root, processor=self.processor).evaluate_single(f"({match})[1]") + if first_match is not None and not len(first_match.children): + cite_structure.milestone = True child_regexes = [] parsed_children_cite_structure = [] @@ -190,16 +207,120 @@ def build_regex_and_xpath( return current_regex, cite_structure - def generate_xpath(self, reference): + def _milestone_boundary( + self, + start_node: saxonlib.PyXdmNode, + parent_match: str + ) -> Optional[saxonlib.PyXdmNode]: + """ Find the next node matching `parent_match` after `start_node`, i.e. the next + milestone of the same kind (e.g. the next after the current one). """ + xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor) + xpath_proc.declare_variable("__start") + xpath_proc.set_parameter("__start", start_node) + return xpath_proc.evaluate_single(f"({parent_match})[. >> $__start][1]") + + def _milestone_window( + self, + start_node: saxonlib.PyXdmNode, + boundary_node: Optional[saxonlib.PyXdmNode], + child_xpath: str + ): + """ Evaluate `child_xpath` against the whole document and restrict the result to nodes + occurring after `start_node` and (if given) before `boundary_node`, in document order. """ + xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor) + xpath_proc.declare_variable("__start") + xpath_proc.set_parameter("__start", start_node) + if boundary_node is not None: + xpath_proc.declare_variable("__boundary") + xpath_proc.set_parameter("__boundary", boundary_node) + return xpath_proc.evaluate(f"({child_xpath})[. >> $__start][not(. >> $__boundary)]") + return xpath_proc.evaluate(f"({child_xpath})[. >> $__start]") + + def _absolute_path(self, node: saxonlib.PyXdmNode) -> str: + """ Turn a concrete node into its real, DOM-accurate absolute positional XPath + (e.g. /TEI[1]/text[1]/body[1]/div[1]/ab[1]/lb[5]), so it can flow through the rest of + the pipeline (document.py's reconstruct_doc) exactly like any other absolute XPath. """ + xpath_proc = get_xpath_proc(node, processor=self.processor) + return str(xpath_proc.evaluate_single( + "string-join(for $n in (ancestor-or-self::*) " + "return concat('/', name($n), '[', 1 + count($n/preceding-sibling::*[name() = name($n)]), ']'), '')" + )) + + def _parse_reference(self, reference: str) -> List[tuple]: match = re.match(self.regex_pattern, reference) if not match: raise ValueError(f"Reference '{reference}' does not match the expected format.") + return [(k, v) for k, v in match.groupdict().items() if v] + + def is_milestone_nested(self, reference: str) -> bool: + """ True if `reference`'s deepest unit is nested (directly or transitively) under a + milestone-mode parent (e.g. a under a ), in which case any tag/attribute + based xpath fragment derived from it is not safe to reuse as a sibling-boundary match + (the same attribute value can occur in other milestones, e.g. line "1" of every column). """ + groups = self._parse_reference(reference) + return any(self.structure_by_key[key].milestone for key, _ in groups[:-1]) + + def _resolve_groups(self, groups: List[tuple]) -> Optional[saxonlib.PyXdmNode]: + """ Resolve a (possibly partial, e.g. groups[:-1]) ordered list of (key, value) ref + groups to its concrete node, walking the chain level by level so that ancestor + disambiguation (e.g. "which page's column 2") is preserved at every step. """ + xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor) + concrete_node = None + prev_structure: Optional[CitableStructure] = None + for key, value in groups: + formatted = self.xpath_matcher[key].format(**{key: value}) + structure = self.structure_by_key[key] + if concrete_node is None: + concrete_node = xpath_proc.evaluate_single(f"({formatted})[1]") + elif prev_structure.milestone: + boundary = self._milestone_boundary(concrete_node, prev_structure.match) + concrete_node = (self._milestone_window(concrete_node, boundary, formatted) or [None])[0] + else: + local_proc = get_xpath_proc(concrete_node, processor=self.processor) + concrete_node = local_proc.evaluate_single(f"./{formatted}") + prev_structure = structure + return concrete_node + + def resolve_node(self, reference: str) -> Optional[saxonlib.PyXdmNode]: + """ Resolve `reference` to its concrete node. """ + return self._resolve_groups(self._parse_reference(reference)) - match = {k:v for k, v in match.groupdict().items() if v} - xpath = "/".join([self.xpath_matcher[key].format(**{key: value}) for key, value in match.items()]) - # This is a VERY dirty trick in case we have // down the road - xpath = xpath.replace("///", "//") - return xpath + def generate_xpath(self, reference): + groups = self._parse_reference(reference) + + # Fast path: untouched, original behavior when no ancestor in the chain is a milestone. + if not any(self.structure_by_key[key].milestone for key, _ in groups[:-1]): + xpath = "/".join([self.xpath_matcher[key].format(**{key: value}) for key, value in groups]) + # This is a VERY dirty trick in case we have // down the road + return xpath.replace("///", "//") + + # Milestone-aware path: resolve to a concrete node step by step, since milestone parents + # (self-closing elements like ) cannot be joined with their children via "/", then + # turn it into its real, DOM-accurate absolute positional XPath so it flows through the + # rest of the pipeline (document.py's reconstruct_doc) like any other absolute XPath. + return self._absolute_path(self.resolve_node(reference)) + + def get_next_milestone_boundary(self, reference: str) -> Optional[saxonlib.PyXdmNode]: + """ If `reference`'s deepest unit is nested directly under a milestone-mode parent + (e.g. a under a ), return the concrete node of the next occurrence of that + parent unit (e.g. the next ). Used as an upper bound for passage extraction when + there is no next sibling within the current milestone (e.g. last line of a column), + so content does not bleed into the next milestone's content (e.g. the next column). + Returns None if not applicable (no milestone parent, or no next occurrence). """ + groups = self._parse_reference(reference) + if len(groups) < 2: + return None + + parent_key, _ = groups[-2] + parent_structure = self.structure_by_key[parent_key] + if not parent_structure.milestone: + return None + + # Resolve the parent through the full ancestor chain (groups[:-1]), not just its own + # value in isolation: e.g. "column 2" is ambiguous on its own when nested under "page", + # since column @n values repeat across pages. + start_node = self._resolve_groups(groups[:-1]) + return self._milestone_boundary(start_node, parent_structure.match) def _dispatch( self, @@ -209,19 +330,23 @@ def _dispatch( unit: CitableUnit, level: int): # target = self.generate_xpath(child.ref) + target_root = xpath_processor.evaluate_single(child_xpath) + milestone_match = structure.match if structure.milestone else None if len(structure.children) == 1: self.find_refs( - root=xpath_processor.evaluate_single(child_xpath), + root=target_root, structure=structure.children[0], unit=unit, - level=level + level=level, + milestone_match=milestone_match ) else: self.find_refs_from_branches( - root=xpath_processor.evaluate_single(child_xpath), + root=target_root, structure=structure.children, unit=unit, - level=level + level=level, + milestone_match=milestone_match ) def find_refs( @@ -229,15 +354,25 @@ def find_refs( root: saxonlib.PyXdmNode, structure: CitableStructure = None, unit: Optional[CitableUnit] = None, - level: int = 1 + level: int = 1, + milestone_match: Optional[str] = None ) -> List[CitableUnit]: xpath_proc = get_xpath_proc(elem=root, processor=self.processor) prefix = (unit.ref + structure.delim) if unit else "" units = [] - xpath_prefix = "./" if unit else "" - # .evaluate returns None instead of an empty list... - for value in (xpath_proc.evaluate(f"{xpath_prefix}{structure.xpath}") or []): + if milestone_match is not None: + # `root` is a self-closing milestone node (e.g. ): its "children" are + # whatever matches structure.xpath between it and the next node matching + # milestone_match (the next milestone of the same kind), not its descendants. + boundary = self._milestone_boundary(root, milestone_match) + values = self._milestone_window(root, boundary, structure.xpath) or [] + else: + xpath_prefix = "./" if unit else "" + # .evaluate returns None instead of an empty list... + values = xpath_proc.evaluate(f"{xpath_prefix}{structure.xpath}") or [] + + for value in values: child = CitableUnit( citeType=structure.citeType, ref=f"{prefix}{value.string_value}", @@ -275,16 +410,22 @@ def find_refs_from_branches( root: saxonlib.PyXdmNode, structure: List[CitableStructure], unit: Optional[CitableUnit] = None, - level: int = 1 + level: int = 1, + milestone_match: Optional[str] = None ) -> List[CitableUnit]: xpath_proc = get_xpath_proc(elem=root, processor=self.processor) prefix = (unit.ref) if unit else "" # ToDo: Reinject delim units = [] xpath_prefix = "./" if unit else "" + boundary = self._milestone_boundary(root, milestone_match) if milestone_match is not None else None + unsorted = [] for s in structure: - results = xpath_proc.evaluate(f"{xpath_prefix}{s.xpath}") + if milestone_match is not None: + results = self._milestone_window(root, boundary, s.xpath) + else: + results = xpath_proc.evaluate(f"{xpath_prefix}{s.xpath}") if results is not None: unsorted.extend( [ diff --git a/dapytains/tei/document.py b/dapytains/tei/document.py index 2d24e74..be585c7 100644 --- a/dapytains/tei/document.py +++ b/dapytains/tei/document.py @@ -233,7 +233,7 @@ def reverse_ancestor(xpaths: List[str]) -> str: def _treat_siblings( context_node: saxonlib.PyXdmNode, last_node: ElementBase, - xpath: str, + xpath: Union[str, saxonlib.PyXdmNode], processor: saxonlib.PySaxonProcessor, ancestor_list: Optional[List[str]] = None ) -> Optional[ElementBase]: @@ -241,10 +241,25 @@ def _treat_siblings( :param context_node: Node against which xPath are run :param last_node: Node on which data is created - :param xpath: xPath of the sibling + :param xpath: xPath of the sibling, or a concrete node acting as an exclusive upper bound + (used for milestone-mode column/line boundaries, where a tag/attribute based xpath + fragment would not reliably identify the right occurrence) :param prefix: Ancestor path for the sibling at this point """ xproc = get_xpath_proc(context_node, processor=processor) + + if isinstance(xpath, saxonlib.PyXdmNode): + xproc.declare_variable("__boundary") + xproc.set_parameter("__boundary", xpath) + next_nodes = xpath_eval(xproc, "./following-sibling::node()[. << $__boundary]") + for node in next_nodes: + if node.node_kind_str == "text": + if not last_node.tail: + last_node.tail = unescape(_get_text(node, ".", processor=processor)) + else: + last_node = copy_node(node, include_children=True, parent=last_node.getparent(), processor=processor) + return last_node + loc_xpath = "node()" if xpath == COPY_UNTIL_END else xpath if ancestor_list: loc_xpath += f"{reverse_ancestor(ancestor_list[::-1])}" @@ -385,7 +400,7 @@ def reconstruct_doc( start_siblings=start_siblings, end_siblings=end_siblings, processor=processor ) - if start_siblings: + if start_siblings is not None: _treat_siblings(context_node=result_start, xpath=start_siblings, last_node=copied_node, ancestor_list=ancestor_start, processor=processor) return copied_node @@ -463,7 +478,7 @@ def reconstruct_doc( ) # If we have a queue, we run the queue if queue_start: - if end_siblings and not start_siblings: + if end_siblings is not None and start_siblings is None: # We have an end_siblings elsewhere, what we want is to cover what we find below, and we take everything # but the next level ! start_siblings = "node()" @@ -477,16 +492,26 @@ def reconstruct_doc( processor=processor ) - # When we don't have similar node, we loop on siblings until we get to the expected element - # For this reason, we need to change matching xpath (ie. ./div[position()=1]) into compatible - # suffixes with preceding-sibling or following-sibling. - # We do that for start and end - sib_current_start = clean_xpath_for_following(current_start, start_is_traversing) - sib_current_end = clean_xpath_for_following(current_end, end_is_traversing) - - # We look for siblings between start and end matches - for sibling in xpath_eval(xpath_proc, f"./node()[preceding-sibling::{sib_current_start} and following-sibling::{sib_current_end}]"): - copy_node(sibling, include_children=True, parent=new_tree, processor=processor) + # We look for siblings between start and end matches. When both ends are already + # resolved to concrete nodes, bind them as parameters and compare by document order, + # rather than re-deriving a tag/position-based xpath fragment: a position() predicate + # (e.g. "lb[2]") is not safe to reuse as a relative preceding/following-sibling step, + # since its meaning ("2nd node of that name") is re-evaluated per candidate context, + # not globally. + if result_start is not None and result_end is not None: + xpath_proc.declare_variable("__range_start") + xpath_proc.set_parameter("__range_start", result_start) + xpath_proc.declare_variable("__range_end") + xpath_proc.set_parameter("__range_end", result_end) + for sibling in xpath_eval(xpath_proc, "./node()[. >> $__range_start][. << $__range_end]"): + copy_node(sibling, include_children=True, parent=new_tree, processor=processor) + else: + # For this reason, we need to change matching xpath (ie. ./div[position()=1]) into + # compatible suffixes with preceding-sibling or following-sibling. + sib_current_start = clean_xpath_for_following(current_start, start_is_traversing) + sib_current_end = clean_xpath_for_following(current_end, end_is_traversing) + for sibling in xpath_eval(xpath_proc, f"./node()[preceding-sibling::{sib_current_start} and following-sibling::{sib_current_end}]"): + copy_node(sibling, include_children=True, parent=new_tree, processor=processor) # Here we reached the end, logically. node = copy_node(node=result_end, include_children=len(queue_end) == 0, parent=new_tree, processor=processor) @@ -505,7 +530,7 @@ def reconstruct_doc( copy_until=not xpath_proc.effective_boolean_value(f"head(./element()[1]) is head({preview})"), processor=processor ) - if end_siblings: + if end_siblings is not None: _treat_siblings(context_node=result_end, xpath=end_siblings, last_node=node, ancestor_list=ancestor_end, processor=processor) return new_tree @@ -591,8 +616,13 @@ def get_passage(self, ref_or_start: Optional[str], end: Optional[str] = None, tr next_ref = self.get_next(tree, end) if next_ref: next_ref = next_ref.ref - next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1] - end_sibling = next_ref_xpath.strip("/") + if self.citeStructure[tree].is_milestone_nested(next_ref): + end_sibling = self.citeStructure[tree].resolve_node(next_ref) + else: + next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1] + end_sibling = next_ref_xpath.strip("/") + elif (milestone_boundary := self.citeStructure[tree].get_next_milestone_boundary(end)) is not None: + end_sibling = milestone_boundary else: end_sibling = COPY_UNTIL_END else: @@ -601,8 +631,13 @@ def get_passage(self, ref_or_start: Optional[str], end: Optional[str] = None, tr next_ref = self.get_next(tree, start) if next_ref: next_ref = next_ref.ref - next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1] - start_sibling = next_ref_xpath.strip("/") + if self.citeStructure[tree].is_milestone_nested(next_ref): + start_sibling = self.citeStructure[tree].resolve_node(next_ref) + else: + next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1] + start_sibling = next_ref_xpath.strip("/") + elif (milestone_boundary := self.citeStructure[tree].get_next_milestone_boundary(start)) is not None: + start_sibling = milestone_boundary else: start_sibling = COPY_UNTIL_END @@ -634,6 +669,6 @@ def _find(haystack, needle) -> Optional[Tuple[int, CitableUnit, List[CitableUnit return c return None current_idx, current_unit, siblings = _find(refs, unit) - if current_idx < len(refs)-1: + if current_idx < len(siblings)-1: return siblings[current_idx+1] return None diff --git a/tests/tei/cb_lb_milestones.xml b/tests/tei/cb_lb_milestones.xml new file mode 100644 index 0000000..c31c282 --- /dev/null +++ b/tests/tei/cb_lb_milestones.xml @@ -0,0 +1,45 @@ + + + + + Sample Latin Inscription + + +

Unit test example

+
+ +

Fictitious inscription for testing.

+
+
+ + + + + + + + +
+ + + +
+ + + + IMP CAESARI + DIVI F AVGVSTO + PONTIFICI MAXIMO + TRIB POTESTATE X + + + COS XIII P P + SENATVS POPVLVSQVE + ROMANVS + D D + + +
+ +
+
diff --git a/tests/tei/pb_cb_lb_milestones.xml b/tests/tei/pb_cb_lb_milestones.xml new file mode 100644 index 0000000..43eb319 --- /dev/null +++ b/tests/tei/pb_cb_lb_milestones.xml @@ -0,0 +1,53 @@ + + + + + Sample Manuscript + + +

Unit test example

+
+ +

Fictitious manuscript for testing.

+
+
+ + + + + + + + + + +
+ + + +
+ + + + + alpha + beta + + + gamma + delta + + + + epsilon + zeta + + + eta + theta + + +
+ +
+
diff --git a/tests/test_tei.py b/tests/test_tei.py index 763e7f7..f66176f 100644 --- a/tests/test_tei.py +++ b/tests/test_tei.py @@ -267,3 +267,140 @@ def test_ref_parsing_uneven_tree(): """Test that a level that can contain data is not missed""" doc = Document(f"{local_dir}/uneven_parent_level.xml") assert _flat_refs(doc.get_reffs()) == ['Luke', 'Luke 1', 'Luke 1#1', 'Luke:1', 'Mark', 'Mark:1', 'Mark:2'] + + +def test_milestone_cb_lb(): + """Test that nested self-closing milestones (e.g. containing siblings) work""" + doc = Document(f"{local_dir}/cb_lb_milestones.xml") + + refs = doc.get_reffs() + assert [(r.ref, [c.ref for c in r.children]) for r in refs] == [ + ("1", ["1.1", "1.2", "1.3", "1.4"]), + ("2", ["2.1", "2.2", "2.3", "2.4"]), + ] + + # Same @n value ("1") in both columns must resolve to different, disambiguated lines + assert tostring(doc.get_passage("1.1"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' IMP CAESARI\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + assert tostring(doc.get_passage("2.1"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' COS XIII P P\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + + # Last line of column 1 must not bleed into column 2's content + assert tostring(doc.get_passage("1.4"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' TRIB POTESTATE X\n\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + + # A range crossing the column boundary should include the milestone itself + assert tostring(doc.get_passage("1.4", "2.1"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' TRIB POTESTATE X\n\n' + ' \n' + ' COS XIII P P\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + + +def test_milestone_pb_cb_lb(): + """Test a 3-level manuscript milestone hierarchy: page () > column () > line ()""" + doc = Document(f"{local_dir}/pb_cb_lb_milestones.xml") + + assert _flat_refs(doc.get_reffs()) == [ + "1", "1.1", "1.1.1", "1.1.2", "1.2", "1.2.1", "1.2.2", + "2", "2.1", "2.1.1", "2.1.2", "2.2", "2.2.1", "2.2.2", + ] + + # Same @n values ("1"/"2") repeat for column and line across every page; each must resolve + # to its own, disambiguated line. + assert tostring(doc.get_passage("1.1.1"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' alpha\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + assert tostring(doc.get_passage("2.2.1"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' eta\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + + # Last line of the last column of page 1 must not bleed into page 2's content, but may + # include the upcoming milestone marker itself + assert tostring(doc.get_passage("1.2.2"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' delta\n\n' + ' \n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + ) + + # A range crossing the page boundary should include both the and milestones + assert tostring(doc.get_passage("1.2.2", "2.1.1"), encoding=str) == ( + '\n' + ' \n' + '
\n' + ' \n\n' + ' delta\n\n' + ' \n' + ' \n' + ' epsilon\n' + ' \n' + '
\n' + ' \n' + '
\n' + '
' + )