diff --git a/dapytains/tei/citeStructure.py b/dapytains/tei/citeStructure.py
index f45751c..2ce98aa 100644
--- a/dapytains/tei/citeStructure.py
+++ b/dapytains/tei/citeStructure.py
@@ -33,6 +33,11 @@ class CitableStructure:
delim: str = ""
children: List["CitableStructure"] = field(default_factory=list)
metadata: List["CiteData"] = field(default_factory=list)
+ match: str = ""
+ # True when this unit's matched element is self-closing (a "milestone", e.g. ),
+ # meaning any nested citeStructure children are siblings bounded by the next milestone
+ # of the same kind, not actual descendants.
+ milestone: bool = False
def get(self, ref: str):
if self.use != "position()":
@@ -101,6 +106,11 @@ def __init__(self, root: saxonlib.PyXdmNode, processor: saxonlib.PySaxonProcesso
self.root = root
self.processor: saxonlib.PySaxonProcessor = processor
self.xpath_matcher: Dict[str, str] = {}
+ self.structure_by_key: Dict[str, CitableStructure] = {}
+ # `root` is the element, not the document root: milestone-mode helpers need
+ # the actual document node since they evaluate relative ("./...") match expressions
+ # globally (bounded by document order), not relative to .
+ self.doc_root: saxonlib.PyXdmNode = get_xpath_proc(self.root, processor=processor).evaluate_single("/")
self.regex_pattern, cite_structure = self.build_regex_and_xpath(
get_xpath_proc(self.root, processor=processor).evaluate_single("./citeStructure[1]")
)
@@ -165,6 +175,13 @@ def build_regex_and_xpath(
cite_structure.xpath = f"{match}/{use}"
cite_structure.xpath_match = f"{match}[{use}]"
+ cite_structure.match = match
+ self.structure_by_key[accumulated_units] = cite_structure
+
+ if children_cite_struct:
+ first_match = get_xpath_proc(self.doc_root, processor=self.processor).evaluate_single(f"({match})[1]")
+ if first_match is not None and not len(first_match.children):
+ cite_structure.milestone = True
child_regexes = []
parsed_children_cite_structure = []
@@ -190,16 +207,120 @@ def build_regex_and_xpath(
return current_regex, cite_structure
- def generate_xpath(self, reference):
+ def _milestone_boundary(
+ self,
+ start_node: saxonlib.PyXdmNode,
+ parent_match: str
+ ) -> Optional[saxonlib.PyXdmNode]:
+ """ Find the next node matching `parent_match` after `start_node`, i.e. the next
+ milestone of the same kind (e.g. the next after the current one). """
+ xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor)
+ xpath_proc.declare_variable("__start")
+ xpath_proc.set_parameter("__start", start_node)
+ return xpath_proc.evaluate_single(f"({parent_match})[. >> $__start][1]")
+
+ def _milestone_window(
+ self,
+ start_node: saxonlib.PyXdmNode,
+ boundary_node: Optional[saxonlib.PyXdmNode],
+ child_xpath: str
+ ):
+ """ Evaluate `child_xpath` against the whole document and restrict the result to nodes
+ occurring after `start_node` and (if given) before `boundary_node`, in document order. """
+ xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor)
+ xpath_proc.declare_variable("__start")
+ xpath_proc.set_parameter("__start", start_node)
+ if boundary_node is not None:
+ xpath_proc.declare_variable("__boundary")
+ xpath_proc.set_parameter("__boundary", boundary_node)
+ return xpath_proc.evaluate(f"({child_xpath})[. >> $__start][not(. >> $__boundary)]")
+ return xpath_proc.evaluate(f"({child_xpath})[. >> $__start]")
+
+ def _absolute_path(self, node: saxonlib.PyXdmNode) -> str:
+ """ Turn a concrete node into its real, DOM-accurate absolute positional XPath
+ (e.g. /TEI[1]/text[1]/body[1]/div[1]/ab[1]/lb[5]), so it can flow through the rest of
+ the pipeline (document.py's reconstruct_doc) exactly like any other absolute XPath. """
+ xpath_proc = get_xpath_proc(node, processor=self.processor)
+ return str(xpath_proc.evaluate_single(
+ "string-join(for $n in (ancestor-or-self::*) "
+ "return concat('/', name($n), '[', 1 + count($n/preceding-sibling::*[name() = name($n)]), ']'), '')"
+ ))
+
+ def _parse_reference(self, reference: str) -> List[tuple]:
match = re.match(self.regex_pattern, reference)
if not match:
raise ValueError(f"Reference '{reference}' does not match the expected format.")
+ return [(k, v) for k, v in match.groupdict().items() if v]
+
+ def is_milestone_nested(self, reference: str) -> bool:
+ """ True if `reference`'s deepest unit is nested (directly or transitively) under a
+ milestone-mode parent (e.g. a under a ), in which case any tag/attribute
+ based xpath fragment derived from it is not safe to reuse as a sibling-boundary match
+ (the same attribute value can occur in other milestones, e.g. line "1" of every column). """
+ groups = self._parse_reference(reference)
+ return any(self.structure_by_key[key].milestone for key, _ in groups[:-1])
+
+ def _resolve_groups(self, groups: List[tuple]) -> Optional[saxonlib.PyXdmNode]:
+ """ Resolve a (possibly partial, e.g. groups[:-1]) ordered list of (key, value) ref
+ groups to its concrete node, walking the chain level by level so that ancestor
+ disambiguation (e.g. "which page's column 2") is preserved at every step. """
+ xpath_proc = get_xpath_proc(self.doc_root, processor=self.processor)
+ concrete_node = None
+ prev_structure: Optional[CitableStructure] = None
+ for key, value in groups:
+ formatted = self.xpath_matcher[key].format(**{key: value})
+ structure = self.structure_by_key[key]
+ if concrete_node is None:
+ concrete_node = xpath_proc.evaluate_single(f"({formatted})[1]")
+ elif prev_structure.milestone:
+ boundary = self._milestone_boundary(concrete_node, prev_structure.match)
+ concrete_node = (self._milestone_window(concrete_node, boundary, formatted) or [None])[0]
+ else:
+ local_proc = get_xpath_proc(concrete_node, processor=self.processor)
+ concrete_node = local_proc.evaluate_single(f"./{formatted}")
+ prev_structure = structure
+ return concrete_node
+
+ def resolve_node(self, reference: str) -> Optional[saxonlib.PyXdmNode]:
+ """ Resolve `reference` to its concrete node. """
+ return self._resolve_groups(self._parse_reference(reference))
- match = {k:v for k, v in match.groupdict().items() if v}
- xpath = "/".join([self.xpath_matcher[key].format(**{key: value}) for key, value in match.items()])
- # This is a VERY dirty trick in case we have // down the road
- xpath = xpath.replace("///", "//")
- return xpath
+ def generate_xpath(self, reference):
+ groups = self._parse_reference(reference)
+
+ # Fast path: untouched, original behavior when no ancestor in the chain is a milestone.
+ if not any(self.structure_by_key[key].milestone for key, _ in groups[:-1]):
+ xpath = "/".join([self.xpath_matcher[key].format(**{key: value}) for key, value in groups])
+ # This is a VERY dirty trick in case we have // down the road
+ return xpath.replace("///", "//")
+
+ # Milestone-aware path: resolve to a concrete node step by step, since milestone parents
+ # (self-closing elements like ) cannot be joined with their children via "/", then
+ # turn it into its real, DOM-accurate absolute positional XPath so it flows through the
+ # rest of the pipeline (document.py's reconstruct_doc) like any other absolute XPath.
+ return self._absolute_path(self.resolve_node(reference))
+
+ def get_next_milestone_boundary(self, reference: str) -> Optional[saxonlib.PyXdmNode]:
+ """ If `reference`'s deepest unit is nested directly under a milestone-mode parent
+ (e.g. a under a ), return the concrete node of the next occurrence of that
+ parent unit (e.g. the next ). Used as an upper bound for passage extraction when
+ there is no next sibling within the current milestone (e.g. last line of a column),
+ so content does not bleed into the next milestone's content (e.g. the next column).
+ Returns None if not applicable (no milestone parent, or no next occurrence). """
+ groups = self._parse_reference(reference)
+ if len(groups) < 2:
+ return None
+
+ parent_key, _ = groups[-2]
+ parent_structure = self.structure_by_key[parent_key]
+ if not parent_structure.milestone:
+ return None
+
+ # Resolve the parent through the full ancestor chain (groups[:-1]), not just its own
+ # value in isolation: e.g. "column 2" is ambiguous on its own when nested under "page",
+ # since column @n values repeat across pages.
+ start_node = self._resolve_groups(groups[:-1])
+ return self._milestone_boundary(start_node, parent_structure.match)
def _dispatch(
self,
@@ -209,19 +330,23 @@ def _dispatch(
unit: CitableUnit,
level: int):
# target = self.generate_xpath(child.ref)
+ target_root = xpath_processor.evaluate_single(child_xpath)
+ milestone_match = structure.match if structure.milestone else None
if len(structure.children) == 1:
self.find_refs(
- root=xpath_processor.evaluate_single(child_xpath),
+ root=target_root,
structure=structure.children[0],
unit=unit,
- level=level
+ level=level,
+ milestone_match=milestone_match
)
else:
self.find_refs_from_branches(
- root=xpath_processor.evaluate_single(child_xpath),
+ root=target_root,
structure=structure.children,
unit=unit,
- level=level
+ level=level,
+ milestone_match=milestone_match
)
def find_refs(
@@ -229,15 +354,25 @@ def find_refs(
root: saxonlib.PyXdmNode,
structure: CitableStructure = None,
unit: Optional[CitableUnit] = None,
- level: int = 1
+ level: int = 1,
+ milestone_match: Optional[str] = None
) -> List[CitableUnit]:
xpath_proc = get_xpath_proc(elem=root, processor=self.processor)
prefix = (unit.ref + structure.delim) if unit else ""
units = []
- xpath_prefix = "./" if unit else ""
- # .evaluate returns None instead of an empty list...
- for value in (xpath_proc.evaluate(f"{xpath_prefix}{structure.xpath}") or []):
+ if milestone_match is not None:
+ # `root` is a self-closing milestone node (e.g. ): its "children" are
+ # whatever matches structure.xpath between it and the next node matching
+ # milestone_match (the next milestone of the same kind), not its descendants.
+ boundary = self._milestone_boundary(root, milestone_match)
+ values = self._milestone_window(root, boundary, structure.xpath) or []
+ else:
+ xpath_prefix = "./" if unit else ""
+ # .evaluate returns None instead of an empty list...
+ values = xpath_proc.evaluate(f"{xpath_prefix}{structure.xpath}") or []
+
+ for value in values:
child = CitableUnit(
citeType=structure.citeType,
ref=f"{prefix}{value.string_value}",
@@ -275,16 +410,22 @@ def find_refs_from_branches(
root: saxonlib.PyXdmNode,
structure: List[CitableStructure],
unit: Optional[CitableUnit] = None,
- level: int = 1
+ level: int = 1,
+ milestone_match: Optional[str] = None
) -> List[CitableUnit]:
xpath_proc = get_xpath_proc(elem=root, processor=self.processor)
prefix = (unit.ref) if unit else "" # ToDo: Reinject delim
units = []
xpath_prefix = "./" if unit else ""
+ boundary = self._milestone_boundary(root, milestone_match) if milestone_match is not None else None
+
unsorted = []
for s in structure:
- results = xpath_proc.evaluate(f"{xpath_prefix}{s.xpath}")
+ if milestone_match is not None:
+ results = self._milestone_window(root, boundary, s.xpath)
+ else:
+ results = xpath_proc.evaluate(f"{xpath_prefix}{s.xpath}")
if results is not None:
unsorted.extend(
[
diff --git a/dapytains/tei/document.py b/dapytains/tei/document.py
index 8a260cd..026e814 100644
--- a/dapytains/tei/document.py
+++ b/dapytains/tei/document.py
@@ -281,7 +281,7 @@ def reverse_ancestor(xpaths: List[str]) -> str:
def _treat_siblings(
context_node: saxonlib.PyXdmNode,
last_node: ElementBase,
- xpath: str,
+ xpath: Union[str, saxonlib.PyXdmNode],
processor: saxonlib.PySaxonProcessor,
ancestor_list: Optional[List[str]] = None
) -> Optional[ElementBase]:
@@ -289,10 +289,25 @@ def _treat_siblings(
:param context_node: Node against which xPath are run
:param last_node: Node on which data is created
- :param xpath: xPath of the sibling
+ :param xpath: xPath of the sibling, or a concrete node acting as an exclusive upper bound
+ (used for milestone-mode column/line boundaries, where a tag/attribute based xpath
+ fragment would not reliably identify the right occurrence)
:param prefix: Ancestor path for the sibling at this point
"""
xproc = get_xpath_proc(context_node, processor=processor)
+
+ if isinstance(xpath, saxonlib.PyXdmNode):
+ xproc.declare_variable("__boundary")
+ xproc.set_parameter("__boundary", xpath)
+ next_nodes = xpath_eval(xproc, "./following-sibling::node()[. << $__boundary]")
+ for node in next_nodes:
+ if node.node_kind_str == "text":
+ if not last_node.tail:
+ last_node.tail = unescape(_get_text(node, ".", processor=processor))
+ else:
+ last_node = copy_node(node, include_children=True, parent=last_node.getparent(), processor=processor)
+ return last_node
+
loc_xpath = "node()" if xpath == COPY_UNTIL_END else xpath
if ancestor_list:
loc_xpath += f"{reverse_ancestor(ancestor_list[::-1])}"
@@ -433,7 +448,7 @@ def reconstruct_doc(
start_siblings=start_siblings,
end_siblings=end_siblings, processor=processor
)
- if start_siblings:
+ if start_siblings is not None:
_treat_siblings(context_node=result_start, xpath=start_siblings, last_node=copied_node,
ancestor_list=ancestor_start, processor=processor)
return copied_node
@@ -511,7 +526,7 @@ def reconstruct_doc(
)
# If we have a queue, we run the queue
if queue_start:
- if end_siblings and not start_siblings:
+ if end_siblings is not None and start_siblings is None:
# We have an end_siblings elsewhere, what we want is to cover what we find below, and we take everything
# but the next level !
start_siblings = "node()"
@@ -525,16 +540,26 @@ def reconstruct_doc(
processor=processor
)
- # When we don't have similar node, we loop on siblings until we get to the expected element
- # For this reason, we need to change matching xpath (ie. ./div[position()=1]) into compatible
- # suffixes with preceding-sibling or following-sibling.
- # We do that for start and end
- sib_current_start = clean_xpath_for_following(current_start, start_is_traversing)
- sib_current_end = clean_xpath_for_following(current_end, end_is_traversing)
-
- # We look for siblings between start and end matches
- for sibling in xpath_eval(xpath_proc, f"./node()[preceding-sibling::{sib_current_start} and following-sibling::{sib_current_end}]"):
- copy_node(sibling, include_children=True, parent=new_tree, processor=processor)
+ # We look for siblings between start and end matches. When both ends are already
+ # resolved to concrete nodes, bind them as parameters and compare by document order,
+ # rather than re-deriving a tag/position-based xpath fragment: a position() predicate
+ # (e.g. "lb[2]") is not safe to reuse as a relative preceding/following-sibling step,
+ # since its meaning ("2nd node of that name") is re-evaluated per candidate context,
+ # not globally.
+ if result_start is not None and result_end is not None:
+ xpath_proc.declare_variable("__range_start")
+ xpath_proc.set_parameter("__range_start", result_start)
+ xpath_proc.declare_variable("__range_end")
+ xpath_proc.set_parameter("__range_end", result_end)
+ for sibling in xpath_eval(xpath_proc, "./node()[. >> $__range_start][. << $__range_end]"):
+ copy_node(sibling, include_children=True, parent=new_tree, processor=processor)
+ else:
+ # For this reason, we need to change matching xpath (ie. ./div[position()=1]) into
+ # compatible suffixes with preceding-sibling or following-sibling.
+ sib_current_start = clean_xpath_for_following(current_start, start_is_traversing)
+ sib_current_end = clean_xpath_for_following(current_end, end_is_traversing)
+ for sibling in xpath_eval(xpath_proc, f"./node()[preceding-sibling::{sib_current_start} and following-sibling::{sib_current_end}]"):
+ copy_node(sibling, include_children=True, parent=new_tree, processor=processor)
# Here we reached the end, logically.
node = copy_node(node=result_end, include_children=len(queue_end) == 0, parent=new_tree, processor=processor)
@@ -553,7 +578,7 @@ def reconstruct_doc(
copy_until=not xpath_proc.effective_boolean_value(f"head(./element()[1]) is head({preview})"),
processor=processor
)
- if end_siblings:
+ if end_siblings is not None:
_treat_siblings(context_node=result_end, xpath=end_siblings, last_node=node, ancestor_list=ancestor_end,
processor=processor)
return new_tree
@@ -657,8 +682,13 @@ def get_passage(
next_ref = self.get_next(tree, end)
if next_ref:
next_ref = next_ref.ref
- next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1]
- end_sibling = next_ref_xpath.strip("/")
+ if self.citeStructure[tree].is_milestone_nested(next_ref):
+ end_sibling = self.citeStructure[tree].resolve_node(next_ref)
+ else:
+ next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1]
+ end_sibling = next_ref_xpath.strip("/")
+ elif (milestone_boundary := self.citeStructure[tree].get_next_milestone_boundary(end)) is not None:
+ end_sibling = milestone_boundary
else:
end_sibling = COPY_UNTIL_END
else:
@@ -667,8 +697,13 @@ def get_passage(
next_ref = self.get_next(tree, start)
if next_ref:
next_ref = next_ref.ref
- next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1]
- start_sibling = next_ref_xpath.strip("/")
+ if self.citeStructure[tree].is_milestone_nested(next_ref):
+ start_sibling = self.citeStructure[tree].resolve_node(next_ref)
+ else:
+ next_ref_xpath = normalize_xpath(xpath_split(self.citeStructure[tree].generate_xpath(next_ref)))[-1]
+ start_sibling = next_ref_xpath.strip("/")
+ elif (milestone_boundary := self.citeStructure[tree].get_next_milestone_boundary(start)) is not None:
+ start_sibling = milestone_boundary
else:
start_sibling = COPY_UNTIL_END
@@ -756,6 +791,6 @@ def _find(haystack, needle) -> Optional[Tuple[int, CitableUnit, List[CitableUnit
return c
return None
current_idx, current_unit, siblings = _find(refs, unit)
- if current_idx < len(refs)-1:
+ if current_idx < len(siblings)-1:
return siblings[current_idx+1]
return None
diff --git a/tests/tei/cb_lb_milestones.xml b/tests/tei/cb_lb_milestones.xml
new file mode 100644
index 0000000..c31c282
--- /dev/null
+++ b/tests/tei/cb_lb_milestones.xml
@@ -0,0 +1,45 @@
+
+
+
+
+ Sample Latin Inscription
+
+
+
Unit test example
+
+
+
Fictitious inscription for testing.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ IMP CAESARI
+ DIVI F AVGVSTO
+ PONTIFICI MAXIMO
+ TRIB POTESTATE X
+
+
+ COS XIII P P
+ SENATVS POPVLVSQVE
+ ROMANVS
+ D D
+
+
+
\n'
+ ' \n'
+ ' \n'
+ ''
+ )
+
+ # Last line of the last column of page 1 must not bleed into page 2's content, but may
+ # include the upcoming milestone marker itself
+ assert tostring(doc.get_passage("1.2.2"), encoding=str) == (
+ '\n'
+ ' \n'
+ '
\n'
+ ' \n\n'
+ ' delta\n\n'
+ ' \n'
+ ' \n'
+ '
\n'
+ ' \n'
+ ' \n'
+ ''
+ )
+
+ # A range crossing the page boundary should include both the and milestones
+ assert tostring(doc.get_passage("1.2.2", "2.1.1"), encoding=str) == (
+ '\n'
+ ' \n'
+ '