Coverage for src/pypermission/service/subject.py: 100%
119 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-12-01 18:06 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-12-01 18:06 +0000
1from collections.abc import Sequence
3from sqlalchemy.exc import IntegrityError
4from sqlalchemy.orm import Session
5from sqlalchemy.sql import select
7from pypermission.exc import PermissionNotGrantedError, PyPermissionError
8from pypermission.models import (
9 FrozenClass,
10 HierarchyORM,
11 MemberORM,
12 Permission,
13 Policy,
14 PolicyORM,
15 RoleORM,
16 SubjectORM,
17)
18from pypermission.util.exception_handling import process_subject_role_integrity_error
19from pypermission.util.input_validation import validate_rbac_parameters
21################################################################################
22#### SubjectService
23################################################################################
26class SubjectService(metaclass=FrozenClass):
28 @classmethod
29 @validate_rbac_parameters
30 def create(cls, *, subject: str, db: Session) -> None:
31 """
32 Create a new Subject.
34 Parameters
35 ----------
36 subject : str
37 The SubjectID of the **Subject** to create.
38 db : Session
39 The SQLAlchemy session.
41 Raises
42 ------
43 PyPermissionError
44 If a **Subject** with the given SubjectID already exists or `subject` is empty string.
45 """
46 try:
47 subject_orm = SubjectORM(id=subject)
48 db.add(subject_orm)
49 db.flush()
50 except IntegrityError:
51 db.rollback()
52 raise PyPermissionError(
53 f"Conflict: Subject with ID '{subject}' already exists!"
54 )
56 @classmethod
57 @validate_rbac_parameters
58 def delete(cls, *, subject: str, db: Session) -> None:
59 """
60 Delete an existing Subject.
62 Parameters
63 ----------
64 subject : str
65 The SubjectID to delete.
66 db : Session
67 The SQLAlchemy session.
69 Raises
70 ------
71 PyPermissionError
72 If a **Subject** with the given SubjectID does not exist or `subject` is empty string.
73 """
74 subject_orm = db.get(SubjectORM, subject)
75 if subject_orm is None:
76 raise PyPermissionError(f"Subject '{subject}' does not exist!")
77 db.delete(subject_orm)
78 db.flush()
80 @classmethod
81 @validate_rbac_parameters
82 def list(cls, *, db: Session) -> tuple[str, ...]:
83 """
84 Get all Subjects.
86 Parameters
87 ----------
88 db : Session
89 The SQLAlchemy session.
91 Returns
92 -------
93 tuple[str, ...]
94 A tuple containing all SubjectIDs.
95 """
96 subjects = db.scalars(select(SubjectORM.id)).all()
97 return tuple(subjects)
99 @classmethod
100 @validate_rbac_parameters
101 def assign_role(cls, *, subject: str, role: str, db: Session) -> None:
102 """
103 Assign a **Subject** to a Role.
105 Parameters
106 ----------
107 subject : str
108 The target SubjectID.
109 role : str
110 The target **RoleID**.
111 db : Session
112 The SQLAlchemy session.
114 Raises
115 ------
116 PyPermissionError
117 If `subject` is empty string.
118 If `role` is empty string.
119 If the **Subject** does not exist.
120 If the **Role** does not exist.
121 If the **Role** is already assigned to the **Subject**.
122 """
123 try:
124 member_orm = MemberORM(role_id=role, subject_id=subject)
125 db.add(member_orm)
126 db.flush()
127 except IntegrityError as err:
128 db.rollback()
129 process_subject_role_integrity_error(err=err, subject=subject, role=role)
131 @classmethod
132 @validate_rbac_parameters
133 def deassign_role(cls, *, subject: str, role: str, db: Session) -> None:
134 """
135 Deassign a **Subject** from a Role.
137 Parameters
138 ----------
139 subject : str
140 The target SubjectID.
141 role : str
142 The target **RoleID**.
143 db : Session
144 The SQLAlchemy session.
146 Raises
147 ------
148 PyPermissionError
149 If `subject` is empty string.
150 If `role` is empty string.
151 If the **Subject** does not exist.
152 If the **Role** does not exist.
153 If the **Subject** is not assigned to the **Role**.
154 """
155 member_orm = db.get(MemberORM, (role, subject))
156 if member_orm is None:
157 subject_orm = db.get(SubjectORM, subject)
158 if subject_orm is None:
159 raise PyPermissionError(f"Subject '{subject}' does not exist!")
160 role_orm = db.get(RoleORM, role)
161 if role_orm is None:
162 raise PyPermissionError(f"Role '{role}' does not exist!")
163 raise PyPermissionError(
164 f"Role '{role}' is not assigned to Subject '{subject}'!"
165 )
166 db.delete(member_orm)
167 db.flush()
169 @classmethod
170 @validate_rbac_parameters
171 def roles(
172 cls, *, subject: str, include_ascendant_roles: bool = False, db: Session
173 ) -> tuple[str, ...]:
174 """
175 Get all Roles assigned to a Subject.
177 Parameters
178 ----------
179 subject : str
180 The target SubjectID.
181 include_ascendant_roles: bool
182 Include all ascendant Roles.
183 db : Session
184 The SQLAlchemy session.
186 Returns
187 -------
188 tuple[str, ...]
189 A tuple containing all assigned RoleIDs.
191 Raises
192 ------
193 PyPermissionError
194 If `subject` is empty string.
195 If the target **Subject** does not exist.
196 """
197 if include_ascendant_roles:
198 root_cte = (
199 select(MemberORM.role_id)
200 .where(MemberORM.subject_id == subject)
201 .cte(recursive=True)
202 )
203 traversing_cte = root_cte.alias()
204 relations_cte = root_cte.union_all(
205 select(HierarchyORM.parent_role_id).join(
206 traversing_cte,
207 HierarchyORM.child_role_id == traversing_cte.c.role_id,
208 )
209 )
210 selection = select(relations_cte)
211 roles = db.scalars(selection).unique().all()
212 else:
213 roles = db.scalars(
214 select(MemberORM.role_id).where(MemberORM.subject_id == subject)
215 ).all()
217 if len(roles) == 0 and db.get(SubjectORM, subject) is None:
218 raise PyPermissionError(f"Subject '{subject}' does not exist!")
219 return tuple(roles)
221 @classmethod
222 @validate_rbac_parameters
223 def check_permission(
224 cls,
225 *,
226 subject: str,
227 permission: Permission,
228 db: Session,
229 ) -> bool:
230 """
231 Check if a **Subject** has access to a specific **Permission** via its **Role** hierarchy.
233 Parameters
234 ----------
235 subject : str
236 The target SubjectID.
237 permission : Permission
238 The **Permission** to check for.
239 db : Session
240 The SQLAlchemy session.
242 Returns
243 -------
244 bool
245 True if the **Permission** is granted.
247 Raises
248 ------
249 PyPermissionError
250 If `subject` is empty string.
251 If the target **Subject** does not exist.
252 """
253 root_cte = (
254 select(MemberORM.role_id)
255 .where(MemberORM.subject_id == subject)
256 .cte(recursive=True)
257 )
259 traversing_cte = root_cte.alias()
260 relations_cte = root_cte.union_all(
261 select(HierarchyORM.parent_role_id).join(
262 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id
263 )
264 )
266 policy_orms = db.scalars(
267 select(PolicyORM)
268 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id)
269 .where(
270 PolicyORM.resource_type == permission.resource_type,
271 PolicyORM.resource_id.in_((permission.resource_id, "*")),
272 PolicyORM.action == permission.action,
273 )
274 ).all()
275 if len(policy_orms) > 0:
276 return True
277 subject_orm = db.get(SubjectORM, subject)
278 if subject_orm is None:
279 raise PyPermissionError(f"Subject '{subject}' does not exist!")
280 return False
282 @classmethod # validated by check_permission
283 def assert_permission(
284 cls,
285 *,
286 subject: str,
287 permission: Permission,
288 db: Session,
289 ) -> None:
290 """
291 Asserts that a **Subject** has access to a specific **Permission** via its **Role** hierarchy.
293 Parameters
294 ----------
295 subject : str
296 The target SubjectID.
297 permission : Permission
298 The **Permission** to check for.
299 db : Session
300 The SQLAlchemy session.
302 Raises
303 ------
304 PermissionNotGrantedError
305 If the **Permission** is not granted.
306 PyPermissionError
307 If `subject` is empty string.
308 If the target **Subject** does not exist.
309 """
310 if not cls.check_permission(subject=subject, permission=permission, db=db):
311 raise PermissionNotGrantedError(
312 f"Permission '{permission}' is not granted for Subject '{subject}'!"
313 )
315 @classmethod
316 @validate_rbac_parameters
317 def permissions(cls, *, subject: str, db: Session) -> tuple[Permission, ...]:
318 """
319 Get all **Permissions** a **Subject** has access to via its **Role** hierarchy.
321 Parameters
322 ----------
323 subject : str
324 The target SubjectID.
325 db : Session
326 The SQLAlchemy session.
328 Returns
329 -------
330 tuple[Permission, ...]
331 A tuple containing all granted Permissions.
333 Raises
334 ------
335 PyPermissionError
336 If `subject` is empty string.
337 If the target **Subject** does not exist.
338 """
339 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db)
341 return tuple(
342 Permission(
343 resource_type=policy_orm.resource_type,
344 resource_id=policy_orm.resource_id,
345 action=policy_orm.action,
346 )
347 for policy_orm in policy_orms
348 )
350 @classmethod
351 @validate_rbac_parameters
352 def policies(cls, *, subject: str, db: Session) -> tuple[Policy, ...]:
353 """
354 Get all **Policies** associated to a **Subject** via its **Role** hierarchy.
356 Parameters
357 ----------
358 subject : str
359 The target SubjectID.
360 db : Session
361 The SQLAlchemy session.
363 Returns
364 -------
365 tuple[Policy, ...]
366 A tuple containing all granted **Policies**.
368 Raises
369 ------
370 PyPermissionError
371 If `subject` is empty string.
372 If the target **Subject** does not exist.
373 """
374 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db)
376 return tuple(
377 Policy(
378 role=policy_orm.role_id,
379 permission=Permission(
380 resource_type=policy_orm.resource_type,
381 resource_id=policy_orm.resource_id,
382 action=policy_orm.action,
383 ),
384 )
385 for policy_orm in policy_orms
386 )
388 @classmethod
389 @validate_rbac_parameters
390 def actions_on_resource(
391 cls,
392 *,
393 subject: str,
394 resource_type: str,
395 resource_id: str,
396 inherited: bool = True,
397 db: Session,
398 ) -> tuple[str, ...]:
399 """
400 Get all Actions granted to a **Subject** on a specific **Resource**.
402 Parameters
403 ----------
404 subject : str
405 The target **SubjectID**.
406 resource_type : str
407 The **ResourceType** of the **Resource**.
408 resource_id : str
409 The **ResourceID** of the **Resource**.
410 inherited : bool
411 Whether to include inherited **Actions** from **Role** hierarchies.
412 db : Session
413 The SQLAlchemy session.
415 Returns
416 -------
417 tuple[str, ...]
418 A tuple containing all granted **Action** values.
420 Raises
421 ------
422 PyPermissionError
423 If `subject` is empty string.
424 If `resource_type` is empty string.
425 If the target **Subject** does not exist.
426 """
427 if inherited:
428 root_cte = (
429 select(MemberORM.role_id)
430 .where(MemberORM.subject_id == subject)
431 .cte(recursive=True)
432 )
433 traversing_cte = root_cte.alias()
434 relations_cte = root_cte.union_all(
435 select(HierarchyORM.parent_role_id).join(
436 traversing_cte,
437 HierarchyORM.child_role_id == traversing_cte.c.role_id,
438 )
439 )
440 actions = (
441 db.scalars(
442 select(PolicyORM.action, PolicyORM.role_id)
443 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id)
444 .where(
445 PolicyORM.resource_type == resource_type,
446 PolicyORM.resource_id.in_((resource_id, "*")),
447 )
448 )
449 .unique()
450 .all()
451 )
452 tuple(actions)
453 else:
454 selection = (
455 select(PolicyORM.action, PolicyORM.role_id)
456 .join(MemberORM, MemberORM.role_id == PolicyORM.role_id)
457 .where(
458 MemberORM.subject_id == subject,
459 PolicyORM.resource_type == resource_type,
460 PolicyORM.resource_id.in_((resource_id, "*")),
461 )
462 )
463 actions = db.scalars(selection).unique().all()
464 if len(actions) == 0 and db.get(SubjectORM, subject) is None:
465 raise PyPermissionError(f"Subject '{subject}' does not exist!")
466 return tuple(actions)
469################################################################################
470#### Util
471################################################################################
474def _get_policy_orms_for_subject(*, subject: str, db: Session) -> Sequence[PolicyORM]:
475 """
476 Get all PolicyORM objects associated to a **Subject** via its **Role** hierarchy.
478 Parameters
479 ----------
480 subject : str
481 The target SubjectID.
482 db : Session
483 The SQLAlchemy session.
485 Returns
486 -------
487 Sequence[PolicyORM]
488 A Sequence containing all associated PolicyORM objects.
490 Raises
491 ------
492 PyPermissionError
493 If the target **Subject** does not exist.
494 """
495 subject_orm = db.get(SubjectORM, subject)
496 if not subject_orm:
497 raise PyPermissionError(f"Subject '{subject}' does not exist!")
498 root_cte = (
499 select(MemberORM.role_id)
500 .where(MemberORM.subject_id == subject)
501 .cte(recursive=True)
502 )
504 traversing_cte = root_cte.alias()
505 relations_cte = root_cte.union_all(
506 select(HierarchyORM.parent_role_id).join(
507 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id
508 )
509 )
511 policy_orms = (
512 db.scalars(
513 select(PolicyORM).join(
514 relations_cte, PolicyORM.role_id == relations_cte.c.role_id
515 )
516 )
517 .unique()
518 .all()
519 )
521 return policy_orms