Coverage for src/pypermission/util/role.py: 100%
49 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
1import networkx as nx
2from sqlalchemy.orm import Session
3from sqlalchemy.sql import select
5from pypermission.exc import PyPermissionError
6from pypermission.models import HierarchyORM, MemberORM, Permission, PolicyORM, RoleORM
8################################################################################
9#### Role dag tools
10################################################################################
13def role_dag(
14 *,
15 root_roles: set[str] | None = None,
16 include_subjects: bool = True,
17 include_permissions: bool = True,
18 db: Session,
19) -> nx.DiGraph:
20 """
21 Generate a directed acyclic graph (DAG) that reflects the role hierarchy
22 from the viewpoint of the `root_roles`.
23 By definition, the resulting DAG does **not** contain any roles that are
24 descendants of those root roles.
26 Parameters
27 ----------
28 root_roles : set[str] | None
29 Root Roles of the generated DAG. If its None, all existing Roles will be included.
30 include_subjects : bool
31 Include assigned Subjects in the DAG.
32 include_permissions : bool
33 Include granted Permissions in the DAG.
35 Returns
36 -------
37 nx.DiGraph
38 The generated DAG.
39 """
40 roles, hierarchies = _get_roles_and_hierarchies(root_roles=root_roles, db=db)
42 dag = nx.DiGraph()
43 dag.add_nodes_from(roles, type="role_node")
44 dag.add_edges_from(hierarchies, type="hierarchy_edge")
46 if include_subjects:
47 subjects, members = _get_subjects_and_members(roles=roles, db=db)
48 dag.add_nodes_from(subjects, type="subject_node")
49 dag.add_edges_from(members, type="member_edge")
51 if include_permissions:
52 subjects, members = _get_permissions_and_polices(roles=roles, db=db)
53 dag.add_nodes_from(subjects, type="permission_node")
54 dag.add_edges_from(members, type="policy_edge")
56 return dag
59################################################################################
60#### Util
61################################################################################
64def _get_roles_and_hierarchies(
65 *, root_roles: set[str] | None, db: Session
66) -> tuple[set[str], set[tuple[str, str]]]:
67 if root_roles is None:
68 role_orms = db.scalars(select(RoleORM)).all()
69 roles = set(role_orm.id for role_orm in role_orms)
71 hierarchy_orms = db.scalars(select(HierarchyORM)).all()
72 hierarchies = set(
73 (hierarchy_orm.parent_role_id, hierarchy_orm.child_role_id)
74 for hierarchy_orm in hierarchy_orms
75 )
76 return roles, hierarchies
78 role_ormes = db.scalars(select(RoleORM).where(RoleORM.id.in_(root_roles))).all()
79 roles = set(role_orme.id for role_orme in role_ormes)
81 if unknown_roles := root_roles ^ roles:
82 if len(unknown_roles) == 1:
83 raise PyPermissionError(f"Requested role does not exist: {unknown_roles}!")
84 else:
85 raise PyPermissionError(f"Requested roles do not exist: {unknown_roles}!")
87 root_cte = (
88 select(HierarchyORM)
89 .where(HierarchyORM.child_role_id.in_(root_roles))
90 .cte(name="root_cte", recursive=True)
91 )
93 traversing_cte = root_cte.alias()
94 relations_cte = root_cte.union_all(
95 select(HierarchyORM).where(
96 HierarchyORM.child_role_id == traversing_cte.c.parent_role_id
97 )
98 )
100 ancestor_relations = db.execute(
101 select(relations_cte.c.parent_role_id, relations_cte.c.child_role_id)
102 ).all()
104 roles = root_roles | {role for pair in ancestor_relations for role in pair}
105 hierarchies = set(ancestor_relations) # type: ignore
107 return roles, hierarchies
110def _get_subjects_and_members(
111 *, roles: set[str], db: Session
112) -> tuple[set[str], set[tuple[str, str]]]:
114 member_orms = db.scalars(
115 select(MemberORM).where(MemberORM.role_id.in_(roles))
116 ).all()
117 members = set(
118 (member_orm.role_id, member_orm.subject_id) for member_orm in member_orms
119 )
121 subjects = {member_orm.subject_id for member_orm in member_orms}
123 return subjects, members
126def _get_permissions_and_polices(
127 *, roles: set[str], db: Session
128) -> tuple[set[str], set[tuple[str, str]]]:
130 policy_orms = db.scalars(
131 select(PolicyORM).where(PolicyORM.role_id.in_(roles))
132 ).all()
133 policies = set(
134 (
135 policy_orm.role_id,
136 str(
137 Permission(
138 resource_type=policy_orm.resource_type,
139 resource_id=policy_orm.resource_id,
140 action=policy_orm.action,
141 )
142 ),
143 )
144 for policy_orm in policy_orms
145 )
147 permissions = set(policy[1] for policy in policies)
149 return permissions, policies