Coverage for src/pypermission/util/plot.py: 95%
76 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
1import networkx as nx
2import plotly.graph_objects as go
4from pypermission.exc import PyPermissionError
7def plot_factory(
8 *, dag: nx.DiGraph, auto_open: bool = False, file_path: str = "dag.html"
9) -> None:
10 """
11 Generate an interactive HTML visualization of an RBAC DAG and save it to `file_path`.
13 Parameters
14 ----------
15 dag : nx.DiGraph
16 The RBAC system DAG.
17 auto_open : bool
18 Automatically opens the plot in the browser.
19 file_path: str
20 Path and filename for the generated HTML as string.
22 Raises
23 ------
24 PyPermissionError
25 Raised when the input DAG is empty, indicating that there is no data to plot.
26 """
28 if not len(dag):
29 raise PyPermissionError("RBAC system is empty. Nothing to plot!")
31 fig = _build_plotly_figure(dag=dag)
32 fig.write_html(file_path, auto_open=auto_open)
35################################################################################
36#### Util
37################################################################################
39COLOR_MAP = {
40 "subject_node": "darkturquoise",
41 "member_edge": "darkturquoise",
42 "role_node": "coral",
43 "hierarchy_edge": "coral",
44 "permission_node": "forestgreen",
45 "policy_edge": "forestgreen",
46}
48NodePositions = dict[str, tuple[float, float]]
51def _build_plotly_figure(*, dag: nx.DiGraph) -> go.Figure:
52 node_positions = _calc_node_positions(dag=dag)
53 nodes = _build_nodes(dag=dag, node_positions=node_positions)
55 edge_colors = tuple(COLOR_MAP[dag.edges[n]["type"]] for n in dag.edges())
56 edges = _build_edges(
57 dag=dag, node_positions=node_positions, edge_colors=edge_colors
58 )
60 fig = go.Figure(data=[edges[0], edges[1], edges[2], nodes[0], nodes[1], nodes[2]])
61 fig.update_layout(
62 showlegend=True,
63 margin=dict(l=20, r=20, t=20, b=20),
64 xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
65 yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
66 )
68 return fig
71def _build_nodes(
72 *, dag: nx.DiGraph, node_positions: NodePositions
73) -> tuple[go.Scatter, go.Scatter, go.Scatter]:
74 node_subjects = tuple(
75 node for node, data in dag.nodes.data() if data["type"] == "subject_node"
76 )
77 node_roles = tuple(
78 node for node, data in dag.nodes.data() if data["type"] == "role_node"
79 )
80 node_permissions = tuple(
81 node for node, data in dag.nodes.data() if data["type"] == "permission_node"
82 )
84 return (
85 go.Scatter(
86 x=tuple(node_positions[node][0] for node in node_subjects),
87 y=tuple(node_positions[node][1] for node in node_subjects),
88 mode="markers+text",
89 text=tuple(str(n) for n in node_subjects),
90 textposition="top center",
91 textfont=dict(size=20, color="black"),
92 marker=dict(size=20, color=COLOR_MAP["subject_node"]),
93 name="Subjects",
94 ),
95 go.Scatter(
96 x=tuple(node_positions[node][0] for node in node_roles),
97 y=tuple(node_positions[node][1] for node in node_roles),
98 mode="markers+text",
99 text=[str(n) for n in node_roles],
100 textposition="top center",
101 textfont=dict(size=20, color="black"),
102 marker=dict(size=20, color=COLOR_MAP["role_node"]),
103 name="Roles",
104 ),
105 go.Scatter(
106 x=tuple(node_positions[node][0] for node in node_permissions),
107 y=tuple(node_positions[node][1] for node in node_permissions),
108 mode="markers+text",
109 text=tuple(str(node) for node in node_permissions),
110 textposition="top center",
111 textfont=dict(size=20, color="black"),
112 marker=dict(size=20, color=COLOR_MAP["permission_node"]),
113 name="Permissions",
114 ),
115 )
118def _build_edges(
119 *, dag: nx.DiGraph, node_positions: NodePositions, edge_colors: tuple[str, ...]
120) -> tuple[go.Scatter, go.Scatter, go.Scatter]:
121 edge_x_member, edge_y_member = [], []
122 edge_x_hierarchy, edge_y_hierarchy = [], []
123 edge_x_policy, edge_y_policy = [], []
125 for n_0, n_1, data in dag.edges.data():
126 x_0, y_0 = node_positions[n_0]
127 x_1, y_1 = node_positions[n_1]
129 match data["type"]:
130 case "member_edge":
131 edge_x_member.extend([x_0, x_1, None])
132 edge_y_member.extend([y_0, y_1, None])
133 case "hierarchy_edge":
134 edge_x_hierarchy.extend([x_0, x_1, None])
135 edge_y_hierarchy.extend([y_0, y_1, None])
136 case "policy_edge":
137 edge_x_policy.extend([x_0, x_1, None])
138 edge_y_policy.extend([y_0, y_1, None])
140 return (
141 go.Scatter(
142 x=edge_x_member,
143 y=edge_y_member,
144 line=dict(width=2, color=COLOR_MAP["member_edge"]),
145 hoverinfo="none",
146 mode="lines",
147 name="Role assignment",
148 ),
149 go.Scatter(
150 x=edge_x_hierarchy,
151 y=edge_y_hierarchy,
152 line=dict(width=2, color=COLOR_MAP["hierarchy_edge"]),
153 hoverinfo="none",
154 mode="lines",
155 name="Role hierarchy",
156 ),
157 go.Scatter(
158 x=edge_x_policy,
159 y=edge_y_policy,
160 line=dict(width=2, color=COLOR_MAP["policy_edge"]),
161 hoverinfo="none",
162 mode="lines",
163 name="Permission assignment",
164 ),
165 )
168def _calc_node_positions(*, dag: nx.DiGraph) -> dict[str, tuple[float, float]]:
170 role_nodes = [n for n, d in dag.nodes(data=True) if d.get("type") == "role_node"]
171 hierarchy_edges = [
172 (u, v) for u, v, d in dag.edges(data=True) if d.get("type") == "hierarchy_edge"
173 ]
175 role_sdag = nx.DiGraph()
176 role_sdag.add_nodes_from(role_nodes)
177 role_sdag.add_edges_from(hierarchy_edges)
179 unsorted_role_sdags = [
180 dag.subgraph(c).copy() for c in nx.weakly_connected_components(role_sdag)
181 ]
182 role_sdags = sorted(
183 unsorted_role_sdags, key=lambda g: g.number_of_nodes(), reverse=False
184 )
186 role_layers: dict[str, int] = {}
187 for sdag in role_sdags:
188 for node in nx.topological_sort(sdag):
189 predecessors = tuple(dag.predecessors(node))
190 role_layers[node] = (
191 1
192 + max(
193 tuple(
194 role_layers[p]
195 for p in predecessors
196 if dag.nodes[p]["type"] == "role_node"
197 )
198 + (0,)
199 )
200 if predecessors
201 else 1
202 )
204 permission_layer = 0
205 subject_layer = max(role_layers.values()) + 1
207 layer_x_nodes: dict[int, list[str]] = {
208 layer: [] for layer in range(permission_layer, subject_layer + 1)
209 }
210 for node in nx.topological_sort(dag):
211 match dag.nodes[node]["type"]:
212 case "subject_node":
213 layer_x_nodes[subject_layer].append(node)
214 case "permission_node":
215 layer_x_nodes[permission_layer].append(node)
217 for sdag in role_sdags:
218 for node in nx.topological_sort(sdag):
219 layer_x_nodes[role_layers[node]].append(node)
221 node_positions = {}
222 for layer, nodes_in_layer in layer_x_nodes.items():
223 n_nodes = len(nodes_in_layer)
224 ys: tuple[float, ...]
225 if n_nodes == 1:
226 ys = (1,)
227 else:
228 ys = tuple(1 * x / (n_nodes - 1) for x in range(n_nodes))
229 x = -layer
230 for y, node in zip(ys, nodes_in_layer):
231 node_positions[node] = (float(x), y)
232 return node_positions