Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/datajoint/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,29 @@ def find_downstream_schemas_sql(self, schemas_list: str) -> str:
raise NotImplementedError
...

def find_upstream_schemas_sql(self, schemas_list: str) -> str:
"""
Generate query to find schemas that the given schemas reference via FK.

Used to discover unloaded schemas that the loaded ones depend on
(the upstream / ancestor direction). Symmetric to
:meth:`find_downstream_schemas_sql`.

Parameters
----------
schemas_list : str
Comma-separated, quoted schema names for an IN clause.

Returns
-------
str
SQL query returning rows with a single column ``schema_name``
containing distinct schema names that are referenced by the
given schemas.
"""
raise NotImplementedError
...

@abstractmethod
def get_constraint_info_sql(self, constraint_name: str, schema_name: str, table_name: str) -> str:
"""
Expand Down
10 changes: 10 additions & 0 deletions src/datajoint/adapters/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,16 @@ def find_downstream_schemas_sql(self, schemas_list: str) -> str:
f"AND table_schema NOT IN ({schemas_list})"
)

def find_upstream_schemas_sql(self, schemas_list: str) -> str:
"""Find schemas that the given schemas reference via FK."""
return (
f"SELECT DISTINCT referenced_table_schema as schema_name "
f"FROM information_schema.key_column_usage "
f"WHERE table_schema IN ({schemas_list}) "
f"AND referenced_table_schema IS NOT NULL "
f"AND referenced_table_schema NOT IN ({schemas_list})"
)

def get_constraint_info_sql(self, constraint_name: str, schema_name: str, table_name: str) -> str:
"""Query to get FK constraint details from information_schema."""
return (
Expand Down
14 changes: 14 additions & 0 deletions src/datajoint/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,20 @@ def find_downstream_schemas_sql(self, schemas_list: str) -> str:
f"AND ns1.nspname NOT IN ({schemas_list})"
)

def find_upstream_schemas_sql(self, schemas_list: str) -> str:
"""Find schemas that the given schemas reference via FK."""
return (
f"SELECT DISTINCT ns2.nspname as schema_name "
f"FROM pg_constraint c "
f"JOIN pg_class cl1 ON c.conrelid = cl1.oid "
f"JOIN pg_namespace ns1 ON cl1.relnamespace = ns1.oid "
f"JOIN pg_class cl2 ON c.confrelid = cl2.oid "
f"JOIN pg_namespace ns2 ON cl2.relnamespace = ns2.oid "
f"WHERE c.contype = 'f' "
f"AND ns1.nspname IN ({schemas_list}) "
f"AND ns2.nspname NOT IN ({schemas_list})"
)

def get_constraint_info_sql(self, constraint_name: str, schema_name: str, table_name: str) -> str:
"""
Query to get FK constraint details from information_schema.
Expand Down
29 changes: 29 additions & 0 deletions src/datajoint/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,35 @@ def load_all_downstream(self) -> None:

self.load(force=True, schema_names=known_schemas)

def load_all_upstream(self) -> None:
"""
Load dependencies including all upstream schemas referenced via FK chains.

Iteratively discovers schemas that the currently loaded schemas
reference, expanding the dependency graph until no new schemas
are found. This ensures that upstream restriction propagation
(``Diagram.trace()``) reaches all ancestor tables, including
those in schemas the user has not explicitly activated.

Called automatically by ``Diagram.trace()``. Symmetric to
:meth:`load_all_downstream`.
"""
adapter = self._conn.adapter
known_schemas = set(self._conn.schemas)
if not known_schemas:
self.load()
return

while True:
schemas_list = ", ".join(adapter.quote_string(s) for s in known_schemas)
result = self._conn.query(adapter.find_upstream_schemas_sql(schemas_list))
new_schemas = {row[0] for row in result} - known_schemas
if not new_schemas:
break
known_schemas |= new_schemas

self.load(force=True, schema_names=known_schemas)

def topo_sort(self) -> list[str]:
"""
Return table names in topological order.
Expand Down
252 changes: 249 additions & 3 deletions src/datajoint/diagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,245 @@ def cascade(cls, table_expr, part_integrity="enforce"):
result._expanded_nodes &= keep
return result

@classmethod
def trace(cls, table_expr):
"""
Create an upstream-trace diagram for a (restricted) table expression.

The upstream mirror of :meth:`cascade`. Walks the FK graph upward
from the seed, propagating the restriction to every ancestor with
OR convergence — an ancestor entity is included if reachable through
*any* FK path from the seed.

Reuses the upward propagation rules
(``_apply_propagation_rule_upward``) defined alongside the cascade
engine, applied here in a generalized form (any child → any parent,
not just Part → Master).

Parameters
----------
table_expr : QueryExpression
A (possibly restricted) table expression.
(e.g., ``Spectrum & key``).

Returns
-------
Diagram
New Diagram restricted to the seed and its ancestors, with
per-ancestor restrictions accumulated through the FK graph.
Use ``diagram[T]`` to obtain a pre-restricted
``QueryExpression`` for an ancestor, or ``diagram.counts()``
to preview row counts per ancestor.

Examples
--------
>>> trace = dj.Diagram.trace(Spectrum & {"recording_id": 5})
>>> trace[Session].fetch1("session_date")
>>> trace.counts() # entity counts per ancestor
>>> trace["schema.Session"] # FreeTable, when class isn't in scope

See Also
--------
:meth:`cascade` — the downstream mirror.
"""
conn = table_expr.connection
conn.dependencies.load_all_upstream()
node = table_expr.full_table_name

result = cls.__new__(cls)
nx.DiGraph.__init__(result, conn.dependencies)
result._connection = conn
result.context = {}
result._cascade_restrictions = {} # trace uses cascade-shape storage (OR semantics)
result._restrict_conditions = {}
result._restriction_attrs = {}
result._mode = "trace"

# Include seed + all ancestors
ancestors = set(nx.ancestors(result, node)) | {node}
result.nodes_to_show = ancestors
result._expanded_nodes = set(ancestors)

# Seed restriction
restriction = AndList(table_expr.restriction)
result._cascade_restrictions[node] = [restriction] if restriction else []
result._restriction_attrs[node] = set(table_expr.restriction_attributes)

# Propagate upstream
result._propagate_restrictions_upstream(node)

# Trim graph to trace subgraph: only restricted tables (seed + ancestors)
# plus alias nodes connecting them.
keep = set(result._cascade_restrictions)
for alias in (n for n in result.nodes() if n.isdigit()):
if set(result.predecessors(alias)) & keep and set(result.successors(alias)) & keep:
keep.add(alias)
result.remove_nodes_from(set(result.nodes()) - keep)
result.nodes_to_show &= keep
result._expanded_nodes &= keep
return result

def _propagate_restrictions_upstream(self, start_node):
"""
Propagate the seed's restriction upstream through the FK graph.

Symmetric to :meth:`_propagate_restrictions` but walks ``in_edges``
instead of ``out_edges`` and applies the upward rules
(``_apply_propagation_rule_upward``) at each real edge. Multiple
passes until no new ancestor is restricted; termination is
guaranteed because the dependency graph is a DAG.
"""
sorted_nodes = topo_sort(self)
# Only propagate through ancestors of start_node
allowed_nodes = {start_node} | set(nx.ancestors(self, start_node))
propagated_edges = set()

restrictions = self._cascade_restrictions

any_new = True
while any_new:
any_new = False

# Walk in reverse topological order so children are processed
# before their parents — when we reach a parent, its restriction
# accumulates from all of its (already-processed) children.
for node in reversed(sorted_nodes):
if node not in restrictions or node not in allowed_nodes:
continue

child_ft = self._restricted_table(node)
child_attrs = self._restriction_attrs.get(node, set())

for parent, _, edge_props in self.in_edges(node, data=True):
edge_key = (parent, node)
if edge_key in propagated_edges:
continue
propagated_edges.add(edge_key)

if parent not in allowed_nodes:
continue

if isinstance(parent, str) and parent.isdigit():
# Alias node — find the real parent on the far side.
# The alias has its own in_edges; the props on both
# half-edges are identical, so we can use either.
for real_parent, _, real_edge_props in self.in_edges(parent, data=True):
real_edge_key = (real_parent, parent, node)
if real_edge_key in propagated_edges:
continue
propagated_edges.add(real_edge_key)
if real_parent not in allowed_nodes:
continue
attr_map = real_edge_props.get("attr_map", {})
aliased = real_edge_props.get("aliased", False)
was_new = real_parent not in restrictions
self._apply_propagation_rule_upward(
child_ft,
child_attrs,
real_parent,
attr_map,
aliased,
"cascade", # OR semantics for trace
restrictions,
)
if was_new and real_parent in restrictions:
any_new = True
else:
attr_map = edge_props.get("attr_map", {})
aliased = edge_props.get("aliased", False)
was_new = parent not in restrictions
self._apply_propagation_rule_upward(
child_ft,
child_attrs,
parent,
attr_map,
aliased,
"cascade",
restrictions,
)
if was_new and parent in restrictions:
any_new = True

def __getitem__(self, key):
"""
Return a pre-restricted query expression (or FreeTable) for an
ancestor table in this trace.

Only meaningful for trace diagrams (constructed via
:meth:`Diagram.trace`). For ordinary diagrams, defers to
:class:`networkx.DiGraph`'s adjacency-dict lookup.

Parameters
----------
key : type or str
A Table subclass (e.g. ``Session``) — returns a pre-restricted
``QueryExpression``. Or a string giving the table's class name
or fully-qualified SQL name — returns a pre-restricted
``FreeTable``.

Returns
-------
QueryExpression or FreeTable
The ancestor's table restricted to rows reachable via FK from
the seed of this trace.

Raises
------
DataJointError
If the requested table is not in the trace's subgraph (i.e.
not an ancestor of the seed, and not the seed itself).

Examples
--------
>>> trace = dj.Diagram.trace(Spectrum & key)
>>> trace[Session].fetch1("session_date") # class index
>>> trace["my_schema.Session"].to_dicts() # string index → FreeTable
"""
# Non-trace diagrams: defer to networkx adjacency lookup so existing
# `diagram[node_name]` patterns (used in diagram algebra, ERD tests)
# keep working.
if getattr(self, "_mode", None) != "trace":
return super().__getitem__(key)

from .table import Table

# Resolve `key` to a full table name
if isinstance(key, type) and issubclass(key, Table):
full_name = key.full_table_name
elif isinstance(key, Table):
full_name = key.full_table_name
elif isinstance(key, str):
# Accept either a class name (resolve via context) or a full SQL name
if "`" in key or '"' in key:
full_name = key
else:
# Class name — search graph nodes for a matching tail
candidates = [
n
for n in self.nodes()
if not (isinstance(n, str) and n.isdigit()) and n.lower().rstrip('`"').endswith(key.lower())
]
if not candidates:
raise DataJointError(f"Table {key!r} is not in this trace's subgraph " f"(not an ancestor of the seed).")
if len(candidates) > 1:
raise DataJointError(
f"Ambiguous table reference {key!r}: matches " f"{', '.join(candidates)}. Use a fully-qualified name."
)
full_name = candidates[0]
else:
raise DataJointError(f"trace[...] expects a Table class, Table instance, or string; " f"got {type(key).__name__}.")

if full_name not in self._cascade_restrictions:
raise DataJointError(f"Table {full_name} is not in this trace's subgraph " f"(not an ancestor of the seed).")

# For class-typed key, return a restricted class instance; for string,
# return a FreeTable.
if isinstance(key, (type, Table)):
ft = self._restricted_table(full_name)
return ft
else:
return self._restricted_table(full_name)

def _restricted_table(self, node):
"""
Return a FreeTable for ``node`` with this diagram's restrictions applied.
Expand Down Expand Up @@ -624,7 +863,9 @@ def _apply_propagation_rule_upward(self, child_ft, child_attrs, parent_node, att
``child.proj(**{parent: child for child, parent in attr_map.items()})``
— reverses the renaming so the result has parent's column names.
3. Non-aliased AND child restriction attrs ⊄ parent PK:
``child.proj()`` — project child to parent's PK columns.
``child.proj(*attr_map.keys())`` — project child onto its FK columns
(which, being non-aliased, share names with parent's PK columns) so
the subsequent restriction on the parent joins on the right columns.
"""
parent_pk = self.nodes[parent_node].get("primary_key", set())

Expand All @@ -648,8 +889,13 @@ def _apply_propagation_rule_upward(self, child_ft, child_attrs, parent_node, att
restrictions.setdefault(parent_node, AndList()).append(parent_item)
parent_attrs = set(attr_map.values()) # parent's PK column names
else:
# Backward Rule 3: project child to parent PK
parent_item = child_ft.proj()
# Backward Rule 3: project child to its FK columns (which by name
# match parent's PK columns in the non-aliased case). For primary
# FKs (attr_map.keys() ⊆ child_pk) this is a no-op since
# ``proj()`` already returns the PK. For non-primary FKs this
# explicitly carries the FK columns into the projection so the
# subsequent restriction on the parent joins on the right columns.
parent_item = child_ft.proj(*attr_map.keys())
if mode == "cascade":
restrictions.setdefault(parent_node, []).append(parent_item)
else:
Expand Down
Loading
Loading