Coverage for src/pypermission/service/role.py: 99%
197 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-12-01 18:06 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-12-01 18:06 +0000
1from collections.abc import Sequence
3from sqlalchemy.exc import IntegrityError
4from sqlalchemy.orm import Session
5from sqlalchemy.sql import select
7from pypermission.exc import PermissionNotGrantedError, PyPermissionError
8from pypermission.models import (
9 FrozenClass,
10 HierarchyORM,
11 MemberORM,
12 Permission,
13 Policy,
14 PolicyORM,
15 RoleORM,
16)
17from pypermission.util.exception_handling import process_policy_integrity_error
18from pypermission.util.input_validation import validate_rbac_parameters
20################################################################################
21#### RoleService
22################################################################################
25class RoleService(metaclass=FrozenClass):
26 # NOTE: we can use the Policy object when dealing with role + permission to let it handle empty role names for us.
27 @classmethod
28 @validate_rbac_parameters
29 def create(cls, *, role: str, db: Session) -> None:
30 """
31 Create a new **Role**.
33 Parameters
34 ----------
35 role : str
36 The **RoleID** of the **Role** to create.
37 db : Session
38 The SQLAlchemy session.
40 Raises
41 ------
42 PyPermissionError
43 If `role` is an empty string.
44 If a **Role** with the given **RoleID** already exists.
45 """
46 try:
47 role_orm = RoleORM(id=role)
48 db.add(role_orm)
49 db.flush()
50 except IntegrityError as err:
51 db.rollback()
52 raise PyPermissionError(
53 f"Conflict: Role with ID '{role}' already exists!"
54 ) from err
56 @classmethod
57 @validate_rbac_parameters
58 def delete(cls, *, role: str, db: Session) -> None:
59 """
60 Delete an existing **Role**.
62 Parameters
63 ----------
64 role : str
65 The **RoleID** to delete.
66 db : Session
67 The SQLAlchemy session.
69 Raises
70 ------
71 PyPermissionError
72 If `role` is an empty string.
73 If a **Role** with the given **RoleID** does not exist.
74 """
75 role_orm = db.get(RoleORM, role)
76 if role_orm is None:
77 raise PyPermissionError(f"Role '{role}' does not exist!")
78 db.delete(role_orm)
79 db.flush()
81 @classmethod
82 def list(cls, *, db: Session) -> tuple[str, ...]:
83 """
84 Get all **Roles**.
86 Parameters
87 ----------
88 db : Session
89 The SQLAlchemy session.
91 Returns
92 -------
93 tuple[str, ...]
94 A tuple containing all **RoleIDs**.
95 """
96 role_orms = db.scalars(select(RoleORM)).all()
97 return tuple(role_orm.id for role_orm in role_orms)
99 @classmethod
100 @validate_rbac_parameters
101 def add_hierarchy(cls, *, parent_role: str, child_role: str, db: Session) -> None:
102 """
103 Add a parent-child **Hierarchy** between two **Roles**.
105 Parameters
106 ----------
107 parent_role : str
108 The parent **RoleID**.
109 child_role : str
110 The child **RoleID**.
111 db : Session
112 The SQLAlchemy session.
114 Raises
115 ------
116 PyPermissionError
117 If the `parent_role` or `child_role` is an empty string.
118 If `parent_role` and `child_role` are identical.
119 If one or both **Roles** do not exist in the system.
120 If adding the **Hierarchy** would create a cycle in the RBAC **Hierarchy**.
121 If the **Hierarchy** already exists.
122 """
123 if parent_role == child_role:
124 raise PyPermissionError(
125 f"Conflict: A Role Hierarchy requires distinct RoleIDs, received '{parent_role}' twice!"
126 )
128 roles = db.scalars(
129 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role]))
130 ).all()
131 if len(roles) == 1:
132 missing_role = child_role if parent_role in roles else parent_role
133 raise PyPermissionError(f"Role '{missing_role}' does not exist!")
134 elif len(roles) == 0:
135 raise PyPermissionError(
136 f"Roles '{parent_role}' and '{child_role}' do not exist!"
137 )
139 root_cte = (
140 select(HierarchyORM)
141 .where(HierarchyORM.parent_role_id == child_role)
142 .cte(recursive=True)
143 )
145 traversing_cte = root_cte.alias()
146 relations_cte = root_cte.union_all(
147 select(HierarchyORM).where(
148 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id
149 )
150 )
152 critical_leaf_relations = db.execute(
153 select(relations_cte).where(relations_cte.c.child_role_id == parent_role)
154 ).all()
156 if critical_leaf_relations:
157 raise PyPermissionError(
158 "Conflict: Desired Role Hierarchy would create a cycle!"
159 )
161 try:
162 hierarchy_orm = HierarchyORM(
163 parent_role_id=parent_role, child_role_id=child_role
164 )
165 db.add(hierarchy_orm)
166 db.flush()
167 except IntegrityError as err:
168 db.rollback()
169 raise PyPermissionError(
170 f"Conflict: Role Hierarchy '{parent_role}' -> '{child_role}' exists!"
171 ) from err
173 @classmethod
174 @validate_rbac_parameters
175 def remove_hierarchy(
176 cls, *, parent_role: str, child_role: str, db: Session
177 ) -> None:
178 """
179 Remove a parent-child **Hierarchy** between two **Roles**.
181 Parameters
182 ----------
183 parent_role : str
184 The parent **RoleID**.
185 child_role : str
186 The child **RoleID**.
187 db : Session
188 The SQLAlchemy session.
190 Raises
191 ------
192 PyPermissionError
193 If the `parent_role` or `child_role` is an empty string.
194 If `parent_role` and `child_role` are identical.
195 If one or both **Roles** do not exist in the system.
196 If the **Hierarchy** does not exist.
197 """
199 if parent_role == child_role:
200 raise PyPermissionError(
201 f"Conflict: A Role Hierarchy requires distinct RoleIDs, received '{parent_role}' twice!"
202 )
204 hierarchy_orm = db.get(HierarchyORM, (parent_role, child_role))
205 if hierarchy_orm is None:
206 roles = db.scalars(
207 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role]))
208 ).all()
209 if len(roles) == 1:
210 missing_role = child_role if parent_role in roles else parent_role
211 raise PyPermissionError(f"Role '{missing_role}' does not exist!")
212 elif len(roles) == 0:
213 raise PyPermissionError(
214 f"Roles '{parent_role}' and '{child_role}' do not exist!"
215 )
216 else:
217 raise PyPermissionError(
218 f"Hierarchy '{parent_role}' -> '{child_role}' does not exist!"
219 )
221 db.delete(hierarchy_orm)
222 db.flush()
224 @classmethod
225 @validate_rbac_parameters
226 def parents(cls, *, role: str, db: Session) -> tuple[str, ...]:
227 """
228 Get all parent **Roles**.
230 Parameters
231 ----------
232 role : str
233 The target **RoleID**.
234 db : Session
235 The SQLAlchemy session.
237 Returns
238 -------
239 tuple[str, ...]
240 A tuple containing all parent **RoleIDs**.
242 Raises
243 ------
244 PyPermissionError
245 If `role` is an empty string.
246 If the target **Role** does not exist.
247 """
248 parents = db.scalars(
249 select(HierarchyORM.parent_role_id).where(
250 HierarchyORM.child_role_id == role
251 )
252 ).all()
253 if len(parents) == 0 and db.get(RoleORM, role) is None:
254 raise PyPermissionError(f"Role '{role}' does not exist!")
255 return tuple(parents)
257 @classmethod
258 @validate_rbac_parameters
259 def children(cls, *, role: str, db: Session) -> tuple[str, ...]:
260 """
261 Get all child **Roles**.
263 Parameters
264 ----------
265 role : str
266 The target **RoleID**.
267 db : Session
268 The SQLAlchemy session.
270 Returns
271 -------
272 tuple[str, ...]
273 A tuple containing all child **RoleIDs**.
275 Raises
276 ------
277 PyPermissionError
278 If `role` is an empty string.
279 If the target **Role** does not exist.
280 """
281 children = db.scalars(
282 select(HierarchyORM.child_role_id).where(
283 HierarchyORM.parent_role_id == role
284 )
285 ).all()
286 if len(children) == 0 and db.get(RoleORM, role) is None:
287 raise PyPermissionError(f"Role '{role}' does not exist!")
288 return tuple(children)
290 @classmethod
291 @validate_rbac_parameters
292 def ascendants(cls, *, role: str, db: Session) -> tuple[str, ...]:
293 """
294 Get all ascendants **Roles**.
296 Parameters
297 ----------
298 role : str
299 The target **RoleID**.
300 db : Session
301 The SQLAlchemy session.
303 Returns
304 -------
305 tuple[str, ...]
306 A tuple containing all ascendant **RoleIDs**.
308 Raises
309 ------
310 PyPermissionError
311 If `role` is an empty string.
312 If the target **Role** does not exist.
313 """
314 root_cte = (
315 select(HierarchyORM)
316 .where(HierarchyORM.child_role_id == role)
317 .cte(name="root_cte", recursive=True)
318 )
320 traversing_cte = root_cte.alias()
321 relations_cte = root_cte.union_all(
322 select(HierarchyORM).where(
323 HierarchyORM.child_role_id == traversing_cte.c.parent_role_id
324 )
325 )
327 ascendant_relations = (
328 db.scalars(select(relations_cte.c.parent_role_id)).unique().all()
329 )
331 if len(ascendant_relations) == 0 and db.get(RoleORM, role) is None:
332 raise PyPermissionError(f"Role '{role}' does not exist!")
333 return tuple(ascendant_relations)
335 @classmethod
336 @validate_rbac_parameters
337 def descendants(cls, *, role: str, db: Session) -> tuple[str, ...]:
338 """
339 Get all descending **Roles**.
341 Parameters
342 ----------
343 role : str
344 The target **RoleID**.
345 db : Session
346 The SQLAlchemy session.
348 Returns
349 -------
350 tuple[str, ...]
351 A tuple containing all descending **RoleIDs**.
353 Raises
354 ------
355 PyPermissionError
356 If `role` is an empty string.
357 If the target **Role** does not exist.
358 """
359 root_cte = (
360 select(HierarchyORM)
361 .where(HierarchyORM.parent_role_id == role)
362 .cte(name="root_cte", recursive=True)
363 )
365 traversing_cte = root_cte.alias()
366 relations_cte = root_cte.union_all(
367 select(HierarchyORM).where(
368 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id
369 )
370 )
372 descendant_relations = (
373 db.scalars(select(relations_cte.c.child_role_id)).unique().all()
374 )
376 if len(descendant_relations) == 0 and db.get(RoleORM, role) is None:
377 raise PyPermissionError(f"Role '{role}' does not exist!")
378 return tuple(descendant_relations)
380 @classmethod
381 @validate_rbac_parameters
382 def subjects(
383 cls, *, role: str, include_descendant_subjects: bool = False, db: Session
384 ) -> tuple[str, ...]:
385 """
386 Get all **Subjects** assigned to a **Role**.
388 Parameters
389 ----------
390 role : str
391 The target **RoleID**.
392 include_descendant_subjects: bool
393 Include all **Subjects** for descendant **Roles**.
394 db : Session
395 The SQLAlchemy session.
397 Returns
398 -------
399 tuple[str, ...]
400 A tuple containing all assigned **SubjectIDs**.
402 Raises
403 ------
404 PyPermissionError
405 If `role` is an empty string.
406 If the target **Role** does not exist.
407 """
408 if include_descendant_subjects:
409 root_cte = (
410 select(RoleORM.id.label("role_id"))
411 .where(RoleORM.id == role)
412 .cte(recursive=True)
413 )
415 traversing_cte = root_cte.alias()
416 relations_cte = root_cte.union_all(
417 select(HierarchyORM.child_role_id).where(
418 HierarchyORM.parent_role_id == traversing_cte.c.role_id
419 )
420 )
421 subjects = (
422 db.scalars(
423 select(MemberORM.subject_id).join(
424 relations_cte, MemberORM.role_id == relations_cte.c.role_id
425 )
426 )
427 .unique()
428 .all()
429 )
430 else:
431 subjects = db.scalars(
432 select(MemberORM.subject_id).where(MemberORM.role_id == role)
433 ).all()
435 if len(subjects) == 0 and db.get(RoleORM, role) is None:
436 raise PyPermissionError(f"Role '{role}' does not exist!")
437 return tuple(subjects)
439 @classmethod
440 def grant_permission(
441 cls,
442 *,
443 role: str,
444 permission: Permission,
445 db: Session,
446 ) -> None:
447 """
448 Grant a **Permission** to a **Role**.
450 Parameters
451 ----------
452 role : str
453 The target **RoleID**.
454 db : Session
455 The SQLAlchemy session.
457 Raises
458 ------
459 PyPermissionError
460 If `role` is an empty string.
461 If the target **Role** does not exist.
462 If the **Permission** is already granted to the **Role** (duplicate policy).
463 """
464 policy = Policy(role=role, permission=permission) # does the validation for us
465 try:
466 policy_orm = PolicyORM(
467 role_id=role,
468 resource_type=permission.resource_type,
469 resource_id=permission.resource_id,
470 action=permission.action,
471 )
472 db.add(policy_orm)
473 db.flush()
474 except IntegrityError as err:
475 db.rollback()
476 process_policy_integrity_error(err=err, policy=policy)
478 @classmethod
479 def revoke_permission(
480 cls,
481 *,
482 role: str,
483 permission: Permission,
484 db: Session,
485 ) -> None:
486 """
487 Revoke a **Permission** from a **Role**.
489 Parameters
490 ----------
491 role : str
492 The target **RoleID**.
493 db : Session
494 The SQLAlchemy session.
496 Raises
497 ------
498 PyPermissionError
499 If `role` is an empty string.
500 If the target **Role** does not exist.
501 If the permission was not granted to the role (policy does not exist).
502 """
503 policy = Policy(role=role, permission=permission)
504 policy_tuple = (
505 role,
506 permission.resource_type,
507 permission.resource_id,
508 permission.action,
509 )
510 policy_orm = db.get(
511 PolicyORM,
512 policy_tuple,
513 )
514 if policy_orm is None:
515 role_orm = db.get(RoleORM, role)
516 if role_orm:
517 raise PyPermissionError(f"Policy '{str(policy)}' does not exist!")
518 raise PyPermissionError(f"Role '{role}' does not exist!")
520 db.delete(policy_orm)
521 db.flush()
523 @classmethod # validated by policy
524 def check_permission(
525 cls,
526 *,
527 role: str,
528 permission: Permission,
529 db: Session,
530 ) -> bool:
531 """
532 Check if a **Role** has a **Permission**.
534 Parameters
535 ----------
536 role : str
537 The target **RoleID**.
538 permission : Permission
539 The **Permission** to check for.
540 db : Session
541 The SQLAlchemy session.
543 Returns
544 -------
545 bool
546 True if the **Permission** is granted.
548 Raises
549 ------
550 PyPermissionError
551 If `role` is an empty string.
552 If the target **Role** does not exist.
553 """
554 _ = Policy(role=role, permission=permission) # raises if role empty
555 root_cte = (
556 select(RoleORM.id.label("role_id"))
557 .where(RoleORM.id == role)
558 .cte(recursive=True)
559 )
561 traversing_cte = root_cte.alias()
562 relations_cte = root_cte.union_all(
563 select(HierarchyORM.parent_role_id).where(
564 HierarchyORM.child_role_id == traversing_cte.c.role_id
565 )
566 )
567 policy_orms = db.scalars(
568 select(PolicyORM)
569 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id)
570 .where(
571 PolicyORM.resource_type == permission.resource_type,
572 PolicyORM.resource_id.in_((permission.resource_id, "*")),
573 PolicyORM.action == permission.action,
574 )
575 ).all()
577 if len(policy_orms) == 0:
578 role_orm = db.get(RoleORM, role)
579 if role_orm is None:
580 raise PyPermissionError(f"Role '{role}' does not exist!")
581 return False
582 return True
584 @classmethod # validated by check_permission
585 def assert_permission(
586 cls,
587 *,
588 role: str,
589 permission: Permission,
590 db: Session,
591 ) -> None:
592 """
593 Check if a **Role** has a **Permission**.
595 Parameters
596 ----------
597 role : str
598 The target **RoleID**.
599 permission : Permission
600 The **Permission** to check for.
601 db : Session
602 The SQLAlchemy session.
604 Raises
605 ------
606 PermissionNotGrantedError
607 If the **Permission** is not granted (including inherited permissions).
608 PyPermissionError
609 If `role` is an empty string.
610 If the target **Role** does not exist.
611 """
612 if not cls.check_permission(role=role, permission=permission, db=db):
613 raise PermissionNotGrantedError(
614 f"Permission '{permission}' is not granted for Role '{role}'!"
615 )
617 @classmethod
618 @validate_rbac_parameters
619 def permissions(
620 cls,
621 *,
622 role: str,
623 inherited: bool = True,
624 db: Session,
625 ) -> tuple[Permission, ...]:
626 """
627 Get all granted **Permissions** for a **Role**.
629 Parameters
630 ----------
631 role : str
632 The target **RoleID**.
633 inherited : bool
634 Includes all **Permissions** inherited by ascendant **Roles**.
635 db : Session
636 The SQLAlchemy session.
638 Returns
639 -------
640 tuple[Permission, ...]
641 A tuple containing all granted **Permissions**.
643 Raises
644 ------
645 PyPermissionError
646 If `role` is an empty string.
647 If the target **Role** does not exist.
648 """
649 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db)
650 if len(policy_orms) == 0:
651 role_orm = db.get(RoleORM, role)
652 if role_orm is None: 652 ↛ 655line 652 didn't jump to line 655 because the condition on line 652 was always true
653 raise PyPermissionError(f"Role '{role}' does not exist!")
655 return tuple(
656 Permission(
657 resource_type=policy_orm.resource_type,
658 resource_id=policy_orm.resource_id,
659 action=policy_orm.action,
660 )
661 for policy_orm in policy_orms
662 )
664 @classmethod
665 @validate_rbac_parameters
666 def policies(
667 cls,
668 *,
669 role: str,
670 inherited: bool = True,
671 db: Session,
672 ) -> tuple[Policy, ...]:
673 """
674 Get all granted **Policies** for a **Role**.
676 Parameters
677 ----------
678 role : str
679 The target **RoleID**.
680 inherited : bool
681 Includes all **Policies** inherited by ascendant **Roles**.
682 db : Session
683 The SQLAlchemy session.
685 Returns
686 -------
687 tuple[Policies, ...]
688 A tuple containing all granted **Policies**.
690 Raises
691 ------
692 PyPermissionError
693 If `role` is an empty string.
694 If the target **Role** does not exist.
695 """
696 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db)
698 if len(policy_orms) == 0:
699 role_orm = db.get(RoleORM, role)
700 if role_orm is None: 700 ↛ 703line 700 didn't jump to line 703 because the condition on line 700 was always true
701 raise PyPermissionError(f"Role '{role}' does not exist!")
703 return tuple(
704 Policy(
705 role=policy_orm.role_id,
706 permission=Permission(
707 resource_type=policy_orm.resource_type,
708 resource_id=policy_orm.resource_id,
709 action=policy_orm.action,
710 ),
711 )
712 for policy_orm in policy_orms
713 )
715 @classmethod
716 @validate_rbac_parameters
717 def actions_on_resource(
718 cls,
719 *,
720 role: str,
721 resource_type: str,
722 resource_id: str,
723 inherited: bool = True,
724 db: Session,
725 ) -> tuple[str, ...]:
726 """
727 Get all **Actions** granted on a **Resource** for a **Role**.
729 Parameters
730 ----------
731 role : str
732 The target **RoleID**.
733 resource_type : str
734 The **ResourceType** to check.
735 resource_id : str
736 The **ResourceID** to check.
737 inherited : bool
738 Includes all **Actions** inherited by ascendant **Roles**.
739 db : Session
740 The SQLAlchemy session.
742 Returns
743 -------
744 tuple[str, ...]
745 A tuple containing all granted **Actions**.
747 Raises
748 ------
749 PyPermissionError
750 If `role` is an empty string.
751 If the **ResourceType** is an empty string.
752 If the target **Role** does not exist.
753 """
755 if inherited:
756 root_cte = (
757 select(RoleORM.id.label("role_id"))
758 .where(RoleORM.id == role)
759 .cte(recursive=True)
760 )
761 traversing_cte = root_cte.alias()
762 relations_cte = root_cte.union_all(
763 select(HierarchyORM.parent_role_id).where(
764 HierarchyORM.child_role_id == traversing_cte.c.role_id
765 )
766 )
767 selection = (
768 select(PolicyORM.action)
769 .join(
770 relations_cte,
771 PolicyORM.role_id == relations_cte.c.role_id,
772 )
773 .where(
774 PolicyORM.resource_type == resource_type,
775 PolicyORM.resource_id.in_((resource_id, "*")),
776 )
777 )
778 else:
779 selection = select(PolicyORM.action).where(
780 PolicyORM.role_id == role,
781 PolicyORM.resource_type == resource_type,
782 PolicyORM.resource_id.in_((resource_id, "*")),
783 )
784 result = db.scalars(selection).all()
786 if len(result) == 0:
787 role_orm = db.get(RoleORM, role)
788 if role_orm is None:
789 raise PyPermissionError(f"Role '{role}' does not exist!")
790 return tuple(result)
793################################################################################
794#### Util
795################################################################################
798def _get_policy_orms_for_role(
799 *, role: str, inherited: bool = True, db: Session
800) -> Sequence[PolicyORM]:
801 if inherited:
802 root_cte = (
803 select(RoleORM.id.label("role_id"))
804 .where(RoleORM.id == role)
805 .cte(recursive=True)
806 )
808 traversing_cte = root_cte.alias()
809 relations_cte = root_cte.union_all(
810 select(HierarchyORM.parent_role_id).where(
811 HierarchyORM.child_role_id == traversing_cte.c.role_id
812 )
813 )
814 policy_orms = (
815 db.scalars(
816 select(PolicyORM).join(
817 relations_cte, PolicyORM.role_id == relations_cte.c.role_id
818 )
819 )
820 .unique()
821 .all()
822 )
823 else:
824 policy_orms = (
825 db.scalars(select(PolicyORM).where(PolicyORM.role_id == role))
826 .unique()
827 .all()
828 )
830 return policy_orms