diff --git a/docs/src/design/semantic-matching-spec.md b/docs/src/design/semantic-matching-spec.md index 7963eeeff..dd4a3773d 100644 --- a/docs/src/design/semantic-matching-spec.md +++ b/docs/src/design/semantic-matching-spec.md @@ -164,6 +164,227 @@ A.join(B, semantic_check=False) # Explicit bypass The error message directs users to the explicit `.join()` method. +## Primary Key Rules in Relational Operators + +In DataJoint, the result of each query operator produces a valid **entity set** with a well-defined **entity type** and **primary key**. This section specifies how the primary key is determined for each relational operator. + +### General Principle + +The primary key of a query result identifies unique entities in that result. For most operators, the primary key is preserved from the left operand. For joins, the primary key depends on the functional dependencies between the operands. + +### Notation + +In the examples below, `*` marks primary key attributes: +- `A(x*, y*, z)` means A has primary key `{x, y}` and secondary attribute `z` +- `A → B` means "A determines B" (defined below) + +### Rules by Operator + +| Operator | Primary Key Rule | +|----------|------------------| +| `A & B` (restriction) | PK(A) — preserved from left operand | +| `A - B` (anti-restriction) | PK(A) — preserved from left operand | +| `A.proj(...)` (projection) | PK(A) — preserved from left operand | +| `A.aggr(B, ...)` (aggregation) | PK(A) — preserved from left operand | +| `A * B` (join) | Depends on functional dependencies (see below) | + +### Join Primary Key Rule + +The join operator requires special handling because it combines two entity sets. The primary key of `A * B` depends on the **functional dependency relationship** between the operands. + +#### Definitions + +**A determines B** (written `A → B`): Every attribute in PK(B) is either already in PK(A) or is a secondary attribute in A. + +``` +A → B iff ∀b ∈ PK(B): b ∈ PK(A) OR b ∈ secondary(A) +``` + +Intuitively, `A → B` means that knowing A's primary key is sufficient to determine B's primary key through functional dependencies. + +**B determines A** (written `B → A`): Every attribute in PK(A) is either already in PK(B) or is a secondary attribute in B. + +``` +B → A iff ∀a ∈ PK(A): a ∈ PK(B) OR a ∈ secondary(B) +``` + +#### Join Primary Key Algorithm + +For `A * B`: + +| Condition | PK(A * B) | Attribute Order | +|-----------|-----------|-----------------| +| A → B | PK(A) | A's attributes first | +| B → A (and not A → B) | PK(B) | B's attributes first | +| Neither | PK(A) ∪ PK(B) | PK(A) first, then PK(B) − PK(A) | + +When both `A → B` and `B → A` hold, the left operand takes precedence (use PK(A)). + +#### Examples + +**Example 1: B → A** +``` +A: x*, y* +B: x*, z*, y (y is secondary in B, so z → y) +``` +- A → B? PK(B) = {x, z}. Is z in PK(A) or secondary in A? No (z not in A). **No.** +- B → A? PK(A) = {x, y}. Is y in PK(B) or secondary in B? Yes (secondary). **Yes.** +- Result: **PK(A * B) = {x, z}** with B's attributes first. + +**Example 2: Both directions (bijection-like)** +``` +A: x*, y*, z (z is secondary in A) +B: y*, z*, x (x is secondary in B) +``` +- A → B? PK(B) = {y, z}. Is z in PK(A) or secondary in A? Yes (secondary). **Yes.** +- B → A? PK(A) = {x, y}. Is x in PK(B) or secondary in B? Yes (secondary). **Yes.** +- Both hold, prefer left operand: **PK(A * B) = {x, y}** with A's attributes first. + +**Example 3: Neither direction** +``` +A: x*, y* +B: z*, x (x is secondary in B) +``` +- A → B? PK(B) = {z}. Is z in PK(A) or secondary in A? No. **No.** +- B → A? PK(A) = {x, y}. Is y in PK(B) or secondary in B? No (y not in B). **No.** +- Result: **PK(A * B) = {x, y, z}** (union) with A's attributes first. + +**Example 4: A → B (subordinate relationship)** +``` +Session: session_id* +Trial: session_id*, trial_num* (references Session) +``` +- A → B? PK(Trial) = {session_id, trial_num}. Is trial_num in PK(Session) or secondary? No. **No.** +- B → A? PK(Session) = {session_id}. Is session_id in PK(Trial)? Yes. **Yes.** +- Result: **PK(Session * Trial) = {session_id, trial_num}** with Trial's attributes first. + +### Design Tradeoff: Predictability vs. Minimality + +The join primary key rule prioritizes **predictability** over **minimality**. In some cases, the resulting primary key may not be minimal (i.e., it may contain functionally redundant attributes). + +**Example of non-minimal result:** +``` +A: x*, y* +B: z*, x (x is secondary in B, so z → x) +``` + +The mathematically minimal primary key for `A * B` would be `{y, z}` because: +- `z → x` (from B's structure) +- `{y, z} → {x, y, z}` (z gives us x, and we have y) + +However, `{y, z}` is problematic: +- It is **not the primary key of either operand** (A has `{x, y}`, B has `{z}`) +- It is **not the union** of the primary keys +- It represents a **novel entity type** that doesn't correspond to A, B, or their natural pairing + +This creates confusion: what kind of entity does `{y, z}` identify? + +**The simplified rule produces `{x, y, z}`** (the union), which: +- Is immediately recognizable as "one A entity paired with one B entity" +- Contains A's full primary key and B's full primary key +- May have redundancy (`x` is determined by `z`) but is semantically clear + +**Rationale:** Users can always project away redundant attributes if they need the minimal key. But starting with a predictable, interpretable primary key reduces confusion and errors. + +### Attribute Ordering + +The primary key attributes always appear **first** in the result's attribute list, followed by secondary attributes. When `B → A` (and not `A → B`), the join is conceptually reordered as `B * A` to maintain this invariant: + +- If PK = PK(A): A's attributes appear first +- If PK = PK(B): B's attributes appear first +- If PK = PK(A) ∪ PK(B): PK(A) attributes first, then PK(B) − PK(A), then secondaries + +### Non-Commutativity + +With these rules, join is **not commutative** in terms of: +1. **Primary key selection**: `A * B` may have a different PK than `B * A` when one direction determines but not the other +2. **Attribute ordering**: The left operand's attributes appear first (unless B → A) + +The **result set** (the actual rows returned) remains the same regardless of order, but the **schema** (primary key and attribute order) may differ. + +### Left Join Constraint + +For left joins (`A.join(B, left=True)`), the functional dependency **A → B is required**. + +**Why this constraint exists:** + +In a left join, all rows from A are retained even if there's no matching row in B. For unmatched rows, B's attributes are NULL. This creates a problem for primary key validity: + +| Scenario | PK by inner join rule | Left join problem | +|----------|----------------------|-------------------| +| A → B | PK(A) | ✅ Safe — A's attrs always present | +| B → A | PK(B) | ❌ B's PK attrs could be NULL | +| Neither | PK(A) ∪ PK(B) | ❌ B's PK attrs could be NULL | + +**Example of invalid left join:** +``` +A: x*, y* PK(A) = {x, y} +B: x*, z*, y PK(B) = {x, z}, y is secondary + +Inner join: PK = {x, z} (B → A rule) +Left join attempt: FAILS because z could be NULL for unmatched A rows +``` + +**Valid left join example:** +``` +Session: session_id*, date +Trial: session_id*, trial_num*, stimulus (references Session) + +Session.join(Trial, left=True) # OK: Session → Trial +# PK = {session_id}, all sessions retained even without trials +``` + +**Error message:** +``` +DataJointError: Left join requires the left operand to determine the right operand (A → B). +The following attributes from the right operand's primary key are not determined by +the left operand: ['z']. Use an inner join or restructure the query. +``` + +### Bypassing the Left Join Constraint + +For special cases where the user takes responsibility for handling the potentially invalid primary key, the constraint can be bypassed using `allow_invalid_primary_key=True`: + +```python +# Normally blocked - B does not determine A +A.join(B, left=True) # Error: A → B not satisfied + +# Bypass the constraint - user takes responsibility +A.join(B, left=True, allow_invalid_primary_key=True) # Allowed, PK = PK(A) ∪ PK(B) +``` + +When bypassed, the resulting primary key is the union of both operands' primary keys (PK(A) ∪ PK(B)). The user must ensure that subsequent operations (such as `GROUP BY` or projection) establish a valid primary key. + +This mechanism is used internally by aggregation (`aggr`) with `keep_all_rows=True`, which resets the primary key via the `GROUP BY` clause. + +### Aggregation Exception + +`A.aggr(B, keep_all_rows=True)` uses a left join internally but has the **opposite requirement**: **B → A** (the group expression B must have all of A's primary key attributes). + +This apparent contradiction is resolved by the `GROUP BY` clause: + +1. Aggregation requires B → A so that B can be grouped by A's primary key +2. The intermediate left join `A LEFT JOIN B` would have an invalid PK under the normal left join rules +3. Aggregation internally allows the invalid PK, producing PK(A) ∪ PK(B) +4. The `GROUP BY PK(A)` clause then **resets** the primary key to PK(A) +5. The final result has PK(A), which consists entirely of non-NULL values from A + +Note: The semantic check (homologous namesake validation) is still performed for aggregation's internal join. Only the primary key validity constraint is bypassed. + +**Example:** +``` +Session: session_id*, date +Trial: session_id*, trial_num*, response_time (references Session) + +# Aggregation with keep_all_rows=True +Session.aggr(Trial, keep_all_rows=True, avg_rt='avg(response_time)') + +# Internally: Session LEFT JOIN Trial (with invalid PK allowed) +# Intermediate PK would be {session_id} ∪ {session_id, trial_num} = {session_id, trial_num} +# But GROUP BY session_id resets PK to {session_id} +# Result: All sessions, with avg_rt=NULL for sessions without trials +``` + ## Universal Set `dj.U` `dj.U()` or `dj.U('attr1', 'attr2', ...)` represents the universal set of all possible values and lineages. @@ -537,6 +758,14 @@ Use .proj() to rename one of the attributes or .join(semantic_check=False) in a - `A.aggr(B)` raises error when PK attributes have different lineage - `dj.U('a', 'b').aggr(B)` works when B has `a` and `b` attributes +6. **Join primary key determination**: + - `A * B` where `A → B`: result has PK(A) + - `A * B` where `B → A` (not `A → B`): result has PK(B), B's attributes first + - `A * B` where both `A → B` and `B → A`: result has PK(A) (left preference) + - `A * B` where neither direction: result has PK(A) ∪ PK(B) + - Verify attribute ordering matches primary key source + - Verify non-commutativity: `A * B` vs `B * A` may differ in PK and order + ### Integration Tests 1. **Schema migration**: Existing schema gets `~lineage` table populated correctly diff --git a/src/datajoint/expression.py b/src/datajoint/expression.py index 62359be94..ccaade5b5 100644 --- a/src/datajoint/expression.py +++ b/src/datajoint/expression.py @@ -282,7 +282,7 @@ def __matmul__(self, other): "The @ operator has been removed in DataJoint 2.0. " "Use .join(other, semantic_check=False) for permissive joins." ) - def join(self, other, semantic_check=True, left=False): + def join(self, other, semantic_check=True, left=False, allow_invalid_primary_key=False): """ Create the joined QueryExpression. @@ -293,10 +293,14 @@ def join(self, other, semantic_check=True, left=False): :param semantic_check: If True (default), raise error on non-homologous namesakes. If False, bypass semantic check (use for legacy compatibility). :param left: If True, perform a left join retaining all rows from self. + :param allow_invalid_primary_key: If True, bypass the left join A → B constraint. + The resulting PK will be PK(A) ∪ PK(B), which may contain NULLs for unmatched rows. + Use when you will reset the PK afterward (e.g., via GROUP BY in aggregation). Examples: a * b is short for a.join(b) a.join(b, semantic_check=False) for permissive joins + a.join(b, left=True, allow_invalid_primary_key=True) for left join with invalid PK """ # U joins are deprecated - raise error directing to use & instead if isinstance(other, U): @@ -336,10 +340,12 @@ def join(self, other, semantic_check=True, left=False): result._connection = self.connection result._support = self.support + other.support result._left = self._left + [left] + other._left - result._heading = self.heading.join(other.heading) + result._heading = self.heading.join(other.heading, left=left, allow_invalid_primary_key=allow_invalid_primary_key) result._restriction = AndList(self.restriction) result._restriction.append(other.restriction) - result._original_heading = self.original_heading.join(other.original_heading) + result._original_heading = self.original_heading.join( + other.original_heading, left=left, allow_invalid_primary_key=allow_invalid_primary_key + ) assert len(result.support) == len(result._left) + 1 return result @@ -683,7 +689,8 @@ def create(cls, arg, group, keep_all_rows=False): if keep_all_rows and len(group.support) > 1 or group.heading.new_attributes: group = group.make_subquery() # subquery if left joining a join - join = arg.join(group, left=keep_all_rows) # reuse the join logic + # Allow invalid PK for left join (aggregation resets PK via GROUP BY afterward) + join = arg.join(group, left=keep_all_rows, allow_invalid_primary_key=True) result = cls() result._connection = join.connection result._heading = join.heading.set_primary_key(arg.primary_key) # use left operand's primary key diff --git a/src/datajoint/heading.py b/src/datajoint/heading.py index dc305db71..1185794d6 100644 --- a/src/datajoint/heading.py +++ b/src/datajoint/heading.py @@ -468,18 +468,134 @@ def select(self, select_list, rename_map=None, compute_map=None): ) return Heading(chain(copy_attrs, compute_attrs)) - def join(self, other): + def join(self, other, left=False, allow_invalid_primary_key=False): """ Join two headings into a new one. + + The primary key of the result depends on the functional dependency relationship: + - A → B (self determines other): PK = PK(self), self's attributes first + - B → A (other determines self) and not A → B: PK = PK(other), other's attributes first + - Neither: PK = PK(self) ∪ PK(other), self's attributes first + + A → B holds iff every attribute in PK(B) is either in PK(A) or secondary in A. + B → A holds iff every attribute in PK(A) is either in PK(B) or secondary in B. + + For left joins (left=True), A → B is required by default. Otherwise, the result + would not have a valid primary key because: + - Unmatched rows from A have NULL values for B's attributes + - If B → A or Neither, the PK would include B's attributes, which could be NULL + - Only when A → B does PK(A) uniquely identify all result rows + + When allow_invalid_primary_key=True for left joins where A → B doesn't hold, + the constraint is bypassed and PK = PK(A) ∪ PK(B) is used. This is useful for + aggregation, where the GROUP BY clause resets the primary key afterward. + It assumes that self and other are headings that share no common dependent attributes. + + :param other: The other heading to join with + :param left: If True, this is a left join (requires A → B unless allow_invalid_primary_key) + :param allow_invalid_primary_key: If True, bypass left join A → B validation (PK becomes union) + :raises DataJointError: If left=True and A does not determine B (unless allow_invalid_primary_key) """ - return Heading( - [self.attributes[name].todict() for name in self.primary_key] - + [other.attributes[name].todict() for name in other.primary_key if name not in self.primary_key] - + [self.attributes[name].todict() for name in self.secondary_attributes if name not in other.primary_key] - + [other.attributes[name].todict() for name in other.secondary_attributes if name not in self.primary_key] + from .errors import DataJointError + + # Check functional dependencies + self_determines_other = all( + name in self.primary_key or name in self.secondary_attributes for name in other.primary_key + ) + other_determines_self = all( + name in other.primary_key or name in other.secondary_attributes for name in self.primary_key ) + # For left joins, require A → B unless allow_invalid_primary_key=True + if left and not self_determines_other: + if not allow_invalid_primary_key: + missing = [ + name + for name in other.primary_key + if name not in self.primary_key and name not in self.secondary_attributes + ] + raise DataJointError( + f"Left join requires the left operand to determine the right operand (A → B). " + f"The following attributes from the right operand's primary key are not " + f"determined by the left operand: {missing}. " + f"Use an inner join or restructure the query." + ) + else: + # Bypass: use union of PKs (will be reset by caller, e.g., aggregation) + other_determines_self = False # Force the "Neither" case + + seen = set() + result_attrs = [] + + if left or self_determines_other: + # A → B: use PK(A), A's attributes first + # 1. All of A's PK attrs (as PK) + for name in self.primary_key: + result_attrs.append(dict(self.attributes[name].todict(), in_key=True)) + seen.add(name) + # 2. B's PK attrs not already included (as secondary, determined by A's PK) + for name in other.primary_key: + if name not in seen: + result_attrs.append(dict(other.attributes[name].todict(), in_key=False)) + seen.add(name) + # 3. A's secondary attrs not already included + for name in self.secondary_attributes: + if name not in seen: + result_attrs.append(dict(self.attributes[name].todict(), in_key=False)) + seen.add(name) + # 4. B's secondary attrs not already included + for name in other.secondary_attributes: + if name not in seen: + result_attrs.append(dict(other.attributes[name].todict(), in_key=False)) + seen.add(name) + + elif other_determines_self: + # B → A (and not A → B): use PK(B), B's attributes first + # 1. All of B's PK attrs (as PK) + for name in other.primary_key: + result_attrs.append(dict(other.attributes[name].todict(), in_key=True)) + seen.add(name) + # 2. A's PK attrs not already included (as secondary, determined by B's PK) + for name in self.primary_key: + if name not in seen: + result_attrs.append(dict(self.attributes[name].todict(), in_key=False)) + seen.add(name) + # 3. B's secondary attrs not already included + for name in other.secondary_attributes: + if name not in seen: + result_attrs.append(dict(other.attributes[name].todict(), in_key=False)) + seen.add(name) + # 4. A's secondary attrs not already included + for name in self.secondary_attributes: + if name not in seen: + result_attrs.append(dict(self.attributes[name].todict(), in_key=False)) + seen.add(name) + + else: + # Neither: use PK(A) ∪ PK(B), A's attributes first + # 1. All of A's PK attrs (as PK) + for name in self.primary_key: + result_attrs.append(dict(self.attributes[name].todict(), in_key=True)) + seen.add(name) + # 2. B's PK attrs not already included (as PK) + for name in other.primary_key: + if name not in seen: + result_attrs.append(dict(other.attributes[name].todict(), in_key=True)) + seen.add(name) + # 3. A's secondary attrs not already included + for name in self.secondary_attributes: + if name not in seen: + result_attrs.append(dict(self.attributes[name].todict(), in_key=False)) + seen.add(name) + # 4. B's secondary attrs not already included + for name in other.secondary_attributes: + if name not in seen: + result_attrs.append(dict(other.attributes[name].todict(), in_key=False)) + seen.add(name) + + return Heading(result_attrs) + def set_primary_key(self, primary_key): """ Create a new heading with the specified primary key. diff --git a/tests/test_semantic_matching.py b/tests/test_semantic_matching.py index b7278b063..3892a3a44 100644 --- a/tests/test_semantic_matching.py +++ b/tests/test_semantic_matching.py @@ -332,3 +332,485 @@ def test_computed_attrs_have_no_lineage(self, schema_lineage): computed = Student.proj(doubled="enrollment_year * 2") assert computed.heading["doubled"].lineage is None + + +@pytest.fixture +def schema_pk_rules(connection): + """ + Create a schema with tables for testing join primary key rules. + + These tables are designed to test the functional dependency rules: + - A → B: every attr in PK(B) is either in PK(A) or secondary in A + - B → A: every attr in PK(A) is either in PK(B) or secondary in B + """ + schema = dj.Schema("test_pk_rules", connection=connection, create_schema=True) + + # Base tables for testing various scenarios + @schema + class TableX(dj.Manual): + """Table with single PK attribute x.""" + + definition = """ + x : int + --- + x_data : int + """ + + @schema + class TableXY(dj.Manual): + """Table with composite PK (x, y).""" + + definition = """ + x : int + y : int + --- + xy_data : int + """ + + @schema + class TableXZ(dj.Manual): + """Table with composite PK (x, z).""" + + definition = """ + x : int + z : int + --- + xz_data : int + """ + + @schema + class TableZ(dj.Manual): + """Table with single PK z and secondary x.""" + + definition = """ + z : int + --- + x : int + z_data : int + """ + + @schema + class TableXZwithY(dj.Manual): + """Table with PK (x, z) and secondary y.""" + + definition = """ + x : int + z : int + --- + y : int + xzy_data : int + """ + + @schema + class TableYZwithX(dj.Manual): + """Table with PK (y, z) and secondary x.""" + + definition = """ + y : int + z : int + --- + x : int + yzx_data : int + """ + + @schema + class TableXYwithZ(dj.Manual): + """Table with PK (x, y) and secondary z.""" + + definition = """ + x : int + y : int + --- + z : int + xyz_data : int + """ + + # Insert test data + TableX.insert([{"x": 1, "x_data": 10}, {"x": 2, "x_data": 20}], skip_duplicates=True) + TableXY.insert( + [ + {"x": 1, "y": 1, "xy_data": 11}, + {"x": 1, "y": 2, "xy_data": 12}, + {"x": 2, "y": 1, "xy_data": 21}, + ], + skip_duplicates=True, + ) + TableXZ.insert( + [ + {"x": 1, "z": 1, "xz_data": 11}, + {"x": 1, "z": 2, "xz_data": 12}, + {"x": 2, "z": 1, "xz_data": 21}, + ], + skip_duplicates=True, + ) + TableZ.insert( + [ + {"z": 1, "x": 1, "z_data": 10}, + {"z": 2, "x": 1, "z_data": 20}, + {"z": 3, "x": 2, "z_data": 30}, + ], + skip_duplicates=True, + ) + TableXZwithY.insert( + [ + {"x": 1, "z": 1, "y": 1, "xzy_data": 111}, + {"x": 1, "z": 2, "y": 2, "xzy_data": 122}, + {"x": 2, "z": 1, "y": 1, "xzy_data": 211}, + ], + skip_duplicates=True, + ) + TableYZwithX.insert( + [ + {"y": 1, "z": 1, "x": 1, "yzx_data": 111}, + {"y": 1, "z": 2, "x": 2, "yzx_data": 122}, + {"y": 2, "z": 1, "x": 1, "yzx_data": 211}, + ], + skip_duplicates=True, + ) + TableXYwithZ.insert( + [ + {"x": 1, "y": 1, "z": 1, "xyz_data": 111}, + {"x": 1, "y": 2, "z": 2, "xyz_data": 122}, + {"x": 2, "y": 1, "z": 1, "xyz_data": 211}, + ], + skip_duplicates=True, + ) + + yield { + "schema": schema, + "TableX": TableX, + "TableXY": TableXY, + "TableXZ": TableXZ, + "TableZ": TableZ, + "TableXZwithY": TableXZwithY, + "TableYZwithX": TableYZwithX, + "TableXYwithZ": TableXYwithZ, + } + + schema.drop(force=True) + + +class TestJoinPrimaryKeyRules: + """ + Test the join primary key determination rules. + + The rules are: + - A → B: PK(A * B) = PK(A), A's attributes first + - B → A (not A → B): PK(A * B) = PK(B), B's attributes first + - Both A → B and B → A: PK(A * B) = PK(A) (left preference) + - Neither: PK(A * B) = PK(A) ∪ PK(B) + """ + + def test_b_determines_a(self, schema_pk_rules): + """ + Test case: B → A (y is secondary in B, so PK(B) determines y). + + A: x*, y* PK(A) = {x, y} + B: x*, z*, y PK(B) = {x, z}, y is secondary + + A → B? z not in PK(A) and z not secondary in A → No + B → A? y secondary in B → Yes + + Result: PK = {x, z}, B's attributes first + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + result = TableXY * TableXZwithY + + # PK should be {x, z} (PK of B) + assert set(result.primary_key) == {"x", "z"} + # B's attributes should come first (x, z are both in B's PK) + assert result.heading.names[0] in {"x", "z"} + assert result.heading.names[1] in {"x", "z"} + + def test_both_directions_bijection_like(self, schema_pk_rules): + """ + Test case: Both A → B and B → A (bijection-like). + + A: x*, y*, z PK(A) = {x, y}, z is secondary + B: y*, z*, x PK(B) = {y, z}, x is secondary + + A → B? z secondary in A → Yes + B → A? x secondary in B → Yes + + Both hold, prefer left: PK = {x, y}, A's attributes first + """ + TableXYwithZ = schema_pk_rules["TableXYwithZ"] + TableYZwithX = schema_pk_rules["TableYZwithX"] + + result = TableXYwithZ * TableYZwithX + + # PK should be {x, y} (PK of A, left preference) + assert set(result.primary_key) == {"x", "y"} + # A's PK attributes should come first + assert result.heading.names[0] in {"x", "y"} + assert result.heading.names[1] in {"x", "y"} + + def test_neither_direction(self, schema_pk_rules): + """ + Test case: Neither A → B nor B → A. + + A: x*, y* PK(A) = {x, y} + B: z*, x PK(B) = {z}, x is secondary + + A → B? z not in PK(A) and z not secondary in A → No + B → A? y not in PK(B) and y not secondary in B → No + + Result: PK = {x, y, z} (union), A's attributes first + """ + TableXY = schema_pk_rules["TableXY"] + TableZ = schema_pk_rules["TableZ"] + + result = TableXY * TableZ + + # PK should be {x, y, z} (union) + assert set(result.primary_key) == {"x", "y", "z"} + # A's PK attributes should come first + pk_names = result.primary_key + assert pk_names[0] in {"x", "y"} + assert pk_names[1] in {"x", "y"} + assert pk_names[2] == "z" + + def test_a_determines_b_simple(self, schema_pk_rules): + """ + Test case: A → B (simple subordinate relationship). + + A: x* PK(A) = {x} + B: x*, y* PK(B) = {x, y} + + A → B? x in PK(A), y not in PK(A), y not secondary in A → No + B → A? x in PK(B) → Yes + + Result: PK = {x, y} (PK of B), B's attributes first + """ + TableX = schema_pk_rules["TableX"] + TableXY = schema_pk_rules["TableXY"] + + result = TableX * TableXY + + # B → A holds (x is in PK(B)), A → B doesn't (y not in A) + # Result: PK = PK(B) = {x, y} + assert set(result.primary_key) == {"x", "y"} + + def test_non_commutativity_pk_selection(self, schema_pk_rules): + """ + Test that A * B may have different PK than B * A. + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + result_ab = TableXY * TableXZwithY + result_ba = TableXZwithY * TableXY + + # For A * B: B → A, so PK = {x, z} + assert set(result_ab.primary_key) == {"x", "z"} + + # For B * A: A is now the "other", and A → B doesn't hold, + # B → A still means the new A (old B) determines new B (old A) + # Actually, let's recalculate: + # New A = TableXZwithY: PK = {x, z}, y is secondary + # New B = TableXY: PK = {x, y} + # New A → New B? y secondary in new A → Yes + # So PK = PK(new A) = {x, z} + assert set(result_ba.primary_key) == {"x", "z"} + + # In this case, both have the same PK but potentially different attribute order + + def test_non_commutativity_attribute_order(self, schema_pk_rules): + """ + Test that attribute order depends on which operand provides the PK. + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + result_ab = TableXY * TableXZwithY # B → A, B's attrs first + result_ba = TableXZwithY * TableXY # A → B, A's attrs first + + # In result_ab, B (TableXZwithY) provides PK, so its attrs come first + # In result_ba, A (TableXZwithY) provides PK, so its attrs come first + # Both should have TableXZwithY's attributes first + ab_names = result_ab.heading.names + ba_names = result_ba.heading.names + + # The first attributes should be from the PK-providing table + # Both cases have TableXZwithY providing the PK + assert ab_names[0] in {"x", "z"} + assert ba_names[0] in {"x", "z"} + + def test_join_preserves_all_attributes(self, schema_pk_rules): + """ + Test that all attributes from both tables are included in the result. + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + result = TableXY * TableXZwithY + + # All unique attributes should be present + all_expected = {"x", "y", "z", "xy_data", "xzy_data"} + assert set(result.heading.names) == all_expected + + def test_pk_attributes_come_first(self, schema_pk_rules): + """ + Test that primary key attributes always come first in the heading. + """ + TableXY = schema_pk_rules["TableXY"] + TableZ = schema_pk_rules["TableZ"] + + result = TableXY * TableZ + + # PK = {x, y, z} + pk = set(result.primary_key) + names = result.heading.names + + # All PK attributes should come before any secondary attributes + pk_indices = [names.index(attr) for attr in pk] + secondary_indices = [names.index(attr) for attr in names if attr not in pk] + + if secondary_indices: # If there are secondary attributes + assert max(pk_indices) < min(secondary_indices) + + +class TestLeftJoinConstraint: + """ + Test that left joins require A → B (left operand determines right operand). + + For left joins, B's attributes could be NULL for unmatched rows, so the PK + must be PK(A) only. This is only valid when A → B. + """ + + def test_left_join_valid_when_a_determines_b(self, schema_pk_rules): + """ + Left join should work when A → B. + + A: x*, y*, z PK(A) = {x, y}, z is secondary + B: y*, z*, x PK(B) = {y, z}, x is secondary + + A → B? z secondary in A → Yes + Left join is valid, PK = {x, y} + """ + TableXYwithZ = schema_pk_rules["TableXYwithZ"] + TableYZwithX = schema_pk_rules["TableYZwithX"] + + # This should work - A → B holds + result = TableXYwithZ().join(TableYZwithX(), left=True) + + # PK should be PK(A) = {x, y} + assert set(result.primary_key) == {"x", "y"} + + def test_left_join_fails_when_b_determines_a_only(self, schema_pk_rules): + """ + Left join should fail when only B → A (not A → B). + + A: x*, y* PK(A) = {x, y} + B: x*, z*, y PK(B) = {x, z}, y is secondary + + A → B? z not in PK(A) and z not secondary in A → No + B → A? y secondary in B → Yes + + Left join is invalid because z would need to be in PK but could be NULL. + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + # This should fail - A → B does not hold + with pytest.raises(DataJointError) as exc_info: + TableXY().join(TableXZwithY(), left=True) + + assert "Left join requires" in str(exc_info.value) + assert "A → B" in str(exc_info.value) or "determine" in str(exc_info.value) + + def test_left_join_fails_when_neither_direction(self, schema_pk_rules): + """ + Left join should fail when neither A → B nor B → A. + + A: x*, y* PK(A) = {x, y} + B: z*, x PK(B) = {z}, x is secondary + + A → B? z not in A → No + B → A? y not in B → No + + Left join is invalid. + """ + TableXY = schema_pk_rules["TableXY"] + TableZ = schema_pk_rules["TableZ"] + + # This should fail - A → B does not hold + with pytest.raises(DataJointError) as exc_info: + TableXY().join(TableZ(), left=True) + + assert "Left join requires" in str(exc_info.value) + + def test_inner_join_still_works_when_b_determines_a(self, schema_pk_rules): + """ + Inner join should still work normally when B → A (even though left join fails). + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + # Inner join should work - B → A applies + result = TableXY * TableXZwithY + + # PK should be {x, z} (B's PK) + assert set(result.primary_key) == {"x", "z"} + + +class TestAggregationWithKeepAllRows: + """ + Test that aggregation with keep_all_rows=True works correctly. + + Aggregation uses a left join internally but has the opposite requirement (B → A) + compared to direct left joins (which require A → B). This is valid because the + GROUP BY clause resets the PK to PK(A). + """ + + def test_aggregation_keep_all_rows_works_with_b_determines_a(self, schema_pk_rules): + """ + Aggregation with keep_all_rows=True should work when B → A. + + A: x* PK(A) = {x} + B: x*, y* PK(B) = {x, y} + + B → A? x in PK(B) → Yes (aggregation requirement met) + + The internal left join would normally fail (B → A, not A → B), but + aggregation bypasses this because GROUP BY resets PK to {x}. + """ + TableX = schema_pk_rules["TableX"] + TableXY = schema_pk_rules["TableXY"] + + # This should work - aggregation with keep_all_rows=True + result = TableX.aggr(TableXY, keep_all_rows=True, count="count(*)") + + # PK should be PK(A) = {x} (reset by GROUP BY) + assert set(result.primary_key) == {"x"} + + def test_aggregation_keep_all_rows_produces_correct_pk(self, schema_pk_rules): + """ + Aggregation result should always have PK(A), regardless of functional dependencies. + """ + TableXY = schema_pk_rules["TableXY"] + TableXZwithY = schema_pk_rules["TableXZwithY"] + + # TableXY (A): PK = {x, y} + # TableXZwithY (B): PK = {x, z}, y is secondary + # B → A (y secondary in B), so left join would use PK(B) = {x, z} + # But aggregation resets to PK(A) = {x, y} + result = TableXY.aggr(TableXZwithY, keep_all_rows=True, count="count(*)") + + # PK should be PK(A) = {x, y} + assert set(result.primary_key) == {"x", "y"} + + def test_aggregation_without_keep_all_rows_also_works(self, schema_pk_rules): + """ + Normal aggregation (keep_all_rows=False) should continue to work. + """ + TableX = schema_pk_rules["TableX"] + TableXY = schema_pk_rules["TableXY"] + + # Normal aggregation (inner join behavior) + result = TableX.aggr(TableXY, count="count(*)") + + # PK should be PK(A) = {x} + assert set(result.primary_key) == {"x"}