Coverage for src/pypermission/util/role.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-14 14:14 +0000

1import networkx as nx 

2from sqlalchemy.orm import Session 

3from sqlalchemy.sql import select 

4 

5from pypermission.exc import PyPermissionError 

6from pypermission.models import HierarchyORM, MemberORM, Permission, PolicyORM, RoleORM 

7 

8################################################################################ 

9#### Role dag tools 

10################################################################################ 

11 

12 

13def role_dag( 

14 *, 

15 root_roles: set[str] | None = None, 

16 include_subjects: bool = True, 

17 include_permissions: bool = True, 

18 db: Session, 

19) -> nx.DiGraph: 

20 """ 

21 Generate a directed acyclic graph (DAG) that reflects the role hierarchy 

22 from the viewpoint of the `root_roles`. 

23 By definition, the resulting DAG does **not** contain any roles that are 

24 descendants of those root roles. 

25 

26 Parameters 

27 ---------- 

28 root_roles : set[str] | None 

29 Root Roles of the generated DAG. If its None, all existing Roles will be included. 

30 include_subjects : bool 

31 Include assigned Subjects in the DAG. 

32 include_permissions : bool 

33 Include granted Permissions in the DAG. 

34 

35 Returns 

36 ------- 

37 nx.DiGraph 

38 The generated DAG. 

39 """ 

40 roles, hierarchies = _get_roles_and_hierarchies(root_roles=root_roles, db=db) 

41 

42 dag = nx.DiGraph() 

43 dag.add_nodes_from(roles, type="role_node") 

44 dag.add_edges_from(hierarchies, type="hierarchy_edge") 

45 

46 if include_subjects: 

47 subjects, members = _get_subjects_and_members(roles=roles, db=db) 

48 dag.add_nodes_from(subjects, type="subject_node") 

49 dag.add_edges_from(members, type="member_edge") 

50 

51 if include_permissions: 

52 subjects, members = _get_permissions_and_polices(roles=roles, db=db) 

53 dag.add_nodes_from(subjects, type="permission_node") 

54 dag.add_edges_from(members, type="policy_edge") 

55 

56 return dag 

57 

58 

59################################################################################ 

60#### Util 

61################################################################################ 

62 

63 

64def _get_roles_and_hierarchies( 

65 *, root_roles: set[str] | None, db: Session 

66) -> tuple[set[str], set[tuple[str, str]]]: 

67 if root_roles is None: 

68 role_orms = db.scalars(select(RoleORM)).all() 

69 roles = set(role_orm.id for role_orm in role_orms) 

70 

71 hierarchy_orms = db.scalars(select(HierarchyORM)).all() 

72 hierarchies = set( 

73 (hierarchy_orm.parent_role_id, hierarchy_orm.child_role_id) 

74 for hierarchy_orm in hierarchy_orms 

75 ) 

76 return roles, hierarchies 

77 

78 role_ormes = db.scalars(select(RoleORM).where(RoleORM.id.in_(root_roles))).all() 

79 roles = set(role_orme.id for role_orme in role_ormes) 

80 

81 if unknown_roles := root_roles ^ roles: 

82 if len(unknown_roles) == 1: 

83 raise PyPermissionError(f"Requested role does not exist: {unknown_roles}!") 

84 else: 

85 raise PyPermissionError(f"Requested roles do not exist: {unknown_roles}!") 

86 

87 root_cte = ( 

88 select(HierarchyORM) 

89 .where(HierarchyORM.child_role_id.in_(root_roles)) 

90 .cte(name="root_cte", recursive=True) 

91 ) 

92 

93 traversing_cte = root_cte.alias() 

94 relations_cte = root_cte.union_all( 

95 select(HierarchyORM).where( 

96 HierarchyORM.child_role_id == traversing_cte.c.parent_role_id 

97 ) 

98 ) 

99 

100 ancestor_relations = db.execute( 

101 select(relations_cte.c.parent_role_id, relations_cte.c.child_role_id) 

102 ).all() 

103 

104 roles = root_roles | {role for pair in ancestor_relations for role in pair} 

105 hierarchies = set(ancestor_relations) # type: ignore 

106 

107 return roles, hierarchies 

108 

109 

110def _get_subjects_and_members( 

111 *, roles: set[str], db: Session 

112) -> tuple[set[str], set[tuple[str, str]]]: 

113 

114 member_orms = db.scalars( 

115 select(MemberORM).where(MemberORM.role_id.in_(roles)) 

116 ).all() 

117 members = set( 

118 (member_orm.role_id, member_orm.subject_id) for member_orm in member_orms 

119 ) 

120 

121 subjects = {member_orm.subject_id for member_orm in member_orms} 

122 

123 return subjects, members 

124 

125 

126def _get_permissions_and_polices( 

127 *, roles: set[str], db: Session 

128) -> tuple[set[str], set[tuple[str, str]]]: 

129 

130 policy_orms = db.scalars( 

131 select(PolicyORM).where(PolicyORM.role_id.in_(roles)) 

132 ).all() 

133 policies = set( 

134 ( 

135 policy_orm.role_id, 

136 str( 

137 Permission( 

138 resource_type=policy_orm.resource_type, 

139 resource_id=policy_orm.resource_id, 

140 action=policy_orm.action, 

141 ) 

142 ), 

143 ) 

144 for policy_orm in policy_orms 

145 ) 

146 

147 permissions = set(policy[1] for policy in policies) 

148 

149 return permissions, policies