|
| 1 | +"""Cross-language topological layout for Cytoscape elements. |
| 2 | +
|
| 3 | +Mirror of the TypeScript port at |
| 4 | +``galaxy-tool-util-ts/packages/schema/src/workflow/cytoscape-layout.ts``. |
| 5 | +
|
| 6 | +Both implementations MUST produce byte-identical (x, y) coordinates for a |
| 7 | +given input. The normative spec lives in the galaxy-tool-util-ts repo at |
| 8 | +``docs/architecture/cytoscape-layout.md``. Any change here is a breaking |
| 9 | +visual diff and must land in lockstep with that file. |
| 10 | +""" |
| 11 | + |
| 12 | +from __future__ import annotations |
| 13 | + |
| 14 | +from typing import get_args, Literal |
| 15 | + |
| 16 | +from .models import CytoscapeElements, CytoscapePosition |
| 17 | + |
| 18 | +COL_STRIDE = 220 |
| 19 | +ROW_STRIDE = 100 |
| 20 | + |
| 21 | +LayoutName = Literal[ |
| 22 | + "preset", |
| 23 | + "topological", |
| 24 | + "dagre", |
| 25 | + "breadthfirst", |
| 26 | + "grid", |
| 27 | + "cose", |
| 28 | + "random", |
| 29 | +] |
| 30 | + |
| 31 | +LAYOUT_NAMES: tuple[str, ...] = get_args(LayoutName) |
| 32 | + |
| 33 | + |
| 34 | +def is_layout_name(value: str) -> bool: |
| 35 | + return value in LAYOUT_NAMES |
| 36 | + |
| 37 | + |
| 38 | +def bakes_coordinates(layout: str) -> bool: |
| 39 | + """Layouts that bake coordinates into ``data.position``. |
| 40 | +
|
| 41 | + All other layouts are hint-only and rely on the runtime renderer. |
| 42 | + """ |
| 43 | + return layout in ("preset", "topological") |
| 44 | + |
| 45 | + |
| 46 | +def topological_positions(elements: CytoscapeElements) -> dict[str, CytoscapePosition]: |
| 47 | + """Compute positions per the topological layering spec. |
| 48 | +
|
| 49 | + Returns a mapping keyed by node ``data.id``. |
| 50 | + """ |
| 51 | + nodes = elements.nodes |
| 52 | + node_ids = [n.data.id for n in nodes] |
| 53 | + index_by_id = {node_id: i for i, node_id in enumerate(node_ids)} |
| 54 | + |
| 55 | + incoming: dict[str, list[str]] = {node_id: [] for node_id in node_ids} |
| 56 | + for edge in elements.edges: |
| 57 | + source = edge.data.source |
| 58 | + target = edge.data.target |
| 59 | + if source not in index_by_id or target not in index_by_id: |
| 60 | + continue |
| 61 | + incoming[target].append(source) |
| 62 | + |
| 63 | + in_degree: dict[str, int] = {node_id: len(srcs) for node_id, srcs in incoming.items()} |
| 64 | + |
| 65 | + dependents: dict[str, list[str]] = {node_id: [] for node_id in node_ids} |
| 66 | + for target, sources in incoming.items(): |
| 67 | + for s in sources: |
| 68 | + dependents[s].append(target) |
| 69 | + |
| 70 | + column: dict[str, int] = {} |
| 71 | + visited: set[str] = set() |
| 72 | + |
| 73 | + # Kahn topo sort, declaration-index tie break. |
| 74 | + queue: list[str] = [node_id for node_id in node_ids if in_degree[node_id] == 0] |
| 75 | + queue.sort(key=lambda nid: index_by_id[nid]) |
| 76 | + |
| 77 | + while queue: |
| 78 | + # Pop lowest declaration index. |
| 79 | + best = 0 |
| 80 | + for i in range(1, len(queue)): |
| 81 | + if index_by_id[queue[i]] < index_by_id[queue[best]]: |
| 82 | + best = i |
| 83 | + node_id = queue.pop(best) |
| 84 | + visited.add(node_id) |
| 85 | + |
| 86 | + sources = incoming[node_id] |
| 87 | + if not sources: |
| 88 | + column[node_id] = 0 |
| 89 | + else: |
| 90 | + max_col = 0 |
| 91 | + for s in sources: |
| 92 | + c = column.get(s) |
| 93 | + if c is not None and c + 1 > max_col: |
| 94 | + max_col = c + 1 |
| 95 | + column[node_id] = max_col |
| 96 | + |
| 97 | + for dep in dependents[node_id]: |
| 98 | + in_degree[dep] -= 1 |
| 99 | + if in_degree[dep] == 0: |
| 100 | + queue.append(dep) |
| 101 | + |
| 102 | + # Cycle fallback: any unvisited node gets column = declaration index. |
| 103 | + for node_id in node_ids: |
| 104 | + if node_id not in visited: |
| 105 | + column[node_id] = index_by_id[node_id] |
| 106 | + |
| 107 | + # Row assignment: per column, declaration order. |
| 108 | + rows_by_column: dict[int, list[str]] = {} |
| 109 | + for node_id in node_ids: |
| 110 | + c = column[node_id] |
| 111 | + rows_by_column.setdefault(c, []).append(node_id) |
| 112 | + |
| 113 | + positions: dict[str, CytoscapePosition] = {} |
| 114 | + for c, ids in rows_by_column.items(): |
| 115 | + for row, node_id in enumerate(ids): |
| 116 | + positions[node_id] = CytoscapePosition(x=c * COL_STRIDE, y=row * ROW_STRIDE) |
| 117 | + return positions |
0 commit comments