Coverage for src/pypermission/service/role.py: 100%
219 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
1from collections.abc import Sequence
3from sqlalchemy.exc import IntegrityError
4from sqlalchemy.orm import Session
5from sqlalchemy.sql import select
7from pypermission.exc import PermissionNotGrantedError, PyPermissionError
8from pypermission.models import (
9 FrozenClass,
10 HierarchyORM,
11 MemberORM,
12 Permission,
13 Policy,
14 PolicyORM,
15 RoleORM,
16)
17from pypermission.util.exception_handling import process_policy_integrity_error
19################################################################################
20#### RoleService
21################################################################################
24class RoleService(metaclass=FrozenClass):
26 @classmethod
27 def create(cls, *, role: str, db: Session) -> None:
28 """
29 Create a new Role.
31 Parameters
32 ----------
33 role : str
34 The RoleID of the Role to create.
35 db : Session
36 The SQLAlchemy session.
38 Raises
39 ------
40 PyPermissionError
41 If a Role with the given RoleID already exists.
42 """
43 if role == "":
44 raise PyPermissionError("Role name cannot be empty!")
45 try:
46 role_orm = RoleORM(id=role)
47 db.add(role_orm)
48 db.flush()
49 except IntegrityError as err:
50 db.rollback()
51 raise PyPermissionError(f"Role '{role}' already exists!") from err
53 @classmethod
54 def delete(cls, *, role: str, db: Session) -> None:
55 """
56 Delete an existing Role.
58 Parameters
59 ----------
60 role : str
61 The RoleID to delete.
62 db : Session
63 The SQLAlchemy session.
65 Raises
66 ------
67 PyPermissionError
68 If a Role with the given RoleID does not exist.
69 """
70 if role == "":
71 raise PyPermissionError("Role name cannot be empty!")
72 role_orm = db.get(RoleORM, role)
73 if role_orm is None:
74 raise PyPermissionError(f"Role '{role}' does not exist!")
75 db.delete(role_orm)
76 db.flush()
78 @classmethod
79 def list(cls, *, db: Session) -> tuple[str, ...]:
80 """
81 Get all Roles.
83 Parameters
84 ----------
85 db : Session
86 The SQLAlchemy session.
88 Returns
89 -------
90 tuple[str, ...]
91 A tuple containing all RoleIDs.
92 """
93 role_orms = db.scalars(select(RoleORM)).all()
94 return tuple(role_orm.id for role_orm in role_orms)
96 @classmethod
97 def add_hierarchy(cls, *, parent_role: str, child_role: str, db: Session) -> None:
98 """
99 Add a parent-child hierarchy between two Roles.
101 Parameters
102 ----------
103 parent_role : str
104 The parent RoleID.
105 child_role : str
106 The child RoleID.
107 db : Session
108 The SQLAlchemy session.
110 Raises
111 ------
112 PyPermissionError
113 If arguments `parent_role` and `child_role` are equal.
114 If one or both Roles do not exist.
115 If adding the hierarchy would create a cycle.
116 If the hierarchy already exists.
117 """
118 if parent_role == "":
119 raise PyPermissionError(
120 "Role name cannot be empty, but `parent_role` is empty!"
121 )
122 if child_role == "":
123 raise PyPermissionError(
124 "Role name cannot be empty, but `child_role` is empty!"
125 )
126 if parent_role == child_role:
127 raise PyPermissionError(f"RoleIDs must not be equal: '{parent_role}'!")
129 roles = db.scalars(
130 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role]))
131 ).all()
132 if len(roles) == 1:
133 missing_role = child_role if parent_role in roles else parent_role
134 raise PyPermissionError(f"Role '{missing_role}' does not exist!")
135 elif len(roles) == 0:
136 raise PyPermissionError(
137 f"Roles '{parent_role}' and '{child_role}' do not exist!"
138 )
140 root_cte = (
141 select(HierarchyORM)
142 .where(HierarchyORM.parent_role_id == child_role)
143 .cte(recursive=True)
144 )
146 traversing_cte = root_cte.alias()
147 relations_cte = root_cte.union_all(
148 select(HierarchyORM).where(
149 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id
150 )
151 )
153 critical_leaf_relations = db.execute(
154 select(relations_cte).where(relations_cte.c.child_role_id == parent_role)
155 ).all()
157 if critical_leaf_relations:
158 raise PyPermissionError("Desired hierarchy would create a cycle!")
160 try:
161 hierarchy_orm = HierarchyORM(
162 parent_role_id=parent_role, child_role_id=child_role
163 )
164 db.add(hierarchy_orm)
165 db.flush()
166 except IntegrityError as err:
167 db.rollback()
168 raise PyPermissionError(
169 f"Hierarchy '{parent_role}' -> '{child_role}' exists!"
170 ) from err
172 @classmethod
173 def remove_hierarchy(
174 cls, *, parent_role: str, child_role: str, db: Session
175 ) -> None:
176 """
177 Remove a parent-child hierarchy between two Roles.
179 Parameters
180 ----------
181 parent_role : str
182 The parent RoleID.
183 child_role : str
184 The child RoleID.
185 db : Session
186 The SQLAlchemy session.
188 Raises
189 ------
190 PyPermissionError
191 If arguments `parent_role` and `child_role` are equal.
192 If one or both Roles do not exist.
193 If the hierarchy does not exist.
194 """
195 if parent_role == "":
196 raise PyPermissionError(
197 "Role name cannot be empty, but `parent_role` is empty!"
198 )
199 if child_role == "":
200 raise PyPermissionError(
201 "Role name cannot be empty, but `child_role` is empty!"
202 )
204 if parent_role == child_role:
205 raise PyPermissionError(f"RoleIDs must not be equal: '{parent_role}'!")
207 hierarchy_orm = db.get(HierarchyORM, (parent_role, child_role))
208 if hierarchy_orm is None:
209 roles = db.scalars(
210 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role]))
211 ).all()
212 if len(roles) == 1:
213 missing_role = child_role if parent_role in roles else parent_role
214 raise PyPermissionError(f"Role '{missing_role}' does not exist!")
215 elif len(roles) == 0:
216 raise PyPermissionError(
217 f"Roles '{parent_role}' and '{child_role}' do not exist!"
218 )
219 else:
220 raise PyPermissionError(
221 f"Hierarchy '{parent_role}' -> '{child_role}' does not exist!"
222 )
224 db.delete(hierarchy_orm)
225 db.flush()
227 @classmethod
228 def parents(cls, *, role: str, db: Session) -> tuple[str, ...]:
229 """
230 Get all parent Roles.
232 Parameters
233 ----------
234 role : str
235 The target RoleID.
236 db : Session
237 The SQLAlchemy session.
239 Returns
240 -------
241 tuple[str, ...]
242 A tuple containing all parent RoleIDs.
244 Raises
245 ------
246 PyPermissionError
247 If the target Role does not exist.
248 """
249 if role == "":
250 raise PyPermissionError("Role name cannot be empty!")
251 parents = db.scalars(
252 select(HierarchyORM.parent_role_id).where(
253 HierarchyORM.child_role_id == role
254 )
255 ).all()
256 if len(parents) == 0 and db.get(RoleORM, role) is None:
257 raise PyPermissionError(f"Role '{role}' does not exist!")
258 return tuple(parents)
260 @classmethod
261 def children(cls, *, role: str, db: Session) -> tuple[str, ...]:
262 """
263 Get all child Roles.
265 Parameters
266 ----------
267 role : str
268 The target RoleID.
269 db : Session
270 The SQLAlchemy session.
272 Returns
273 -------
274 tuple[str, ...]
275 A tuple containing all child RoleIDs.
277 Raises
278 ------
279 PyPermissionError
280 If the target Role does not exist.
281 """
282 if role == "":
283 raise PyPermissionError("Role name cannot be empty!")
284 children = db.scalars(
285 select(HierarchyORM.child_role_id).where(
286 HierarchyORM.parent_role_id == role
287 )
288 ).all()
289 if len(children) == 0 and db.get(RoleORM, role) is None:
290 raise PyPermissionError(f"Role '{role}' does not exist!")
291 return tuple(children)
293 @classmethod
294 def ancestors(cls, *, role: str, db: Session) -> tuple[str, ...]:
295 """
296 Get all ancestor Roles.
298 Parameters
299 ----------
300 role : str
301 The target RoleID.
302 db : Session
303 The SQLAlchemy session.
305 Returns
306 -------
307 tuple[str, ...]
308 A tuple containing all ancestor RoleIDs.
310 Raises
311 ------
312 PyPermissionError
313 If the target Role does not exist.
314 """
315 if role == "":
316 raise PyPermissionError("Role name cannot be empty!")
317 root_cte = (
318 select(HierarchyORM)
319 .where(HierarchyORM.child_role_id == role)
320 .cte(name="root_cte", recursive=True)
321 )
323 traversing_cte = root_cte.alias()
324 relations_cte = root_cte.union_all(
325 select(HierarchyORM).where(
326 HierarchyORM.child_role_id == traversing_cte.c.parent_role_id
327 )
328 )
330 ancestor_relations = (
331 db.scalars(select(relations_cte.c.parent_role_id)).unique().all()
332 )
334 if len(ancestor_relations) == 0 and db.get(RoleORM, role) is None:
335 raise PyPermissionError(f"Role '{role}' does not exist!")
336 return tuple(ancestor_relations)
338 @classmethod
339 def descendants(cls, *, role: str, db: Session) -> tuple[str, ...]:
340 """
341 Get all descending Roles.
343 Parameters
344 ----------
345 role : str
346 The target RoleID.
347 db : Session
348 The SQLAlchemy session.
350 Returns
351 -------
352 tuple[str, ...]
353 A tuple containing all descending RoleIDs.
355 Raises
356 ------
357 PyPermissionError
358 If the target Role does not exist.
359 """
360 if role == "":
361 raise PyPermissionError("Role name cannot be empty!")
362 root_cte = (
363 select(HierarchyORM)
364 .where(HierarchyORM.parent_role_id == role)
365 .cte(name="root_cte", recursive=True)
366 )
368 traversing_cte = root_cte.alias()
369 relations_cte = root_cte.union_all(
370 select(HierarchyORM).where(
371 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id
372 )
373 )
375 descendant_relations = (
376 db.scalars(select(relations_cte.c.child_role_id)).unique().all()
377 )
379 if len(descendant_relations) == 0 and db.get(RoleORM, role) is None:
380 raise PyPermissionError(f"Role '{role}' does not exist!")
381 return tuple(descendant_relations)
383 @classmethod
384 def subjects(
385 cls, *, role: str, include_descendant_subjects: bool = False, db: Session
386 ) -> tuple[str, ...]:
387 """
388 Get all Subjects assigned to a Role.
390 Parameters
391 ----------
392 role : str
393 The target RoleID.
394 include_descendant_subjects: bool
395 Include all Subjects for descendant Roles.
396 db : Session
397 The SQLAlchemy session.
399 Returns
400 -------
401 tuple[str, ...]
402 A tuple containing all assigned SubjectIDs.
404 Raises
405 ------
406 PyPermissionError
407 If the target Role does not exist.
408 """
409 if role == "":
410 raise PyPermissionError("Role name cannot be empty!")
411 if include_descendant_subjects:
412 root_cte = (
413 select(RoleORM.id.label("role_id"))
414 .where(RoleORM.id == role)
415 .cte(recursive=True)
416 )
418 traversing_cte = root_cte.alias()
419 relations_cte = root_cte.union_all(
420 select(HierarchyORM.child_role_id).where(
421 HierarchyORM.parent_role_id == traversing_cte.c.role_id
422 )
423 )
424 subjects = (
425 db.scalars(
426 select(MemberORM.subject_id).join(
427 relations_cte, MemberORM.role_id == relations_cte.c.role_id
428 )
429 )
430 .unique()
431 .all()
432 )
433 else:
434 subjects = db.scalars(
435 select(MemberORM.subject_id).where(MemberORM.role_id == role)
436 ).all()
438 if len(subjects) == 0 and db.get(RoleORM, role) is None:
439 raise PyPermissionError(f"Role '{role}' does not exist!")
440 return tuple(subjects)
442 @classmethod
443 def grant_permission(
444 cls,
445 *,
446 role: str,
447 permission: Permission,
448 db: Session,
449 ) -> None:
450 """
451 Grant a Permission to a Role.
453 Parameters
454 ----------
455 role : str
456 The target RoleID.
457 db : Session
458 The SQLAlchemy session.
460 Raises
461 ------
462 PyPermissionError
463 If the target Role does not exist.
464 If the Permission was granted before. TODO
465 """
466 if role == "":
467 raise PyPermissionError("Role name cannot be empty!")
468 try:
469 policy_orm = PolicyORM(
470 role_id=role,
471 resource_type=permission.resource_type,
472 resource_id=permission.resource_id,
473 action=permission.action,
474 )
475 db.add(policy_orm)
476 db.flush()
477 except IntegrityError as err:
478 db.rollback()
479 process_policy_integrity_error(err=err, role=role, permission=permission)
481 @classmethod
482 def revoke_permission(
483 cls,
484 *,
485 role: str,
486 permission: Permission,
487 db: Session,
488 ) -> None:
489 """
490 Revoke a Permission from a Role.
492 Parameters
493 ----------
494 role : str
495 The target Role ID.
496 db : Session
497 The SQLAlchemy session.
499 Raises
500 ------
501 PyPermissionError
502 If the target Role does not exist.
503 If the Permission was not granted before. TODO
504 """
505 if role == "":
506 raise PyPermissionError("Role name cannot be empty!")
507 policy_tuple = (
508 role,
509 permission.resource_type,
510 permission.resource_id,
511 permission.action,
512 )
513 policy_orm = db.get(
514 PolicyORM,
515 policy_tuple,
516 )
517 if policy_orm is None:
518 role_orm = db.get(RoleORM, role)
519 if role_orm:
520 raise PyPermissionError(
521 f"Permission '{str(permission)}' does not exist!"
522 )
523 raise PyPermissionError(f"Role '{role}' does not exist!")
525 db.delete(policy_orm)
526 db.flush()
528 @classmethod
529 def check_permission(
530 cls,
531 *,
532 role: str,
533 permission: Permission,
534 db: Session,
535 ) -> bool:
536 """
537 Check if a Role has a Permission.
539 Parameters
540 ----------
541 role : str
542 The target RoleID.
543 permission : Permission
544 The Permission to check for.
545 db : Session
546 The SQLAlchemy session.
548 Returns
549 -------
550 bool
551 True if the Permission is granted.
553 Raises
554 ------
555 PyPermissionError
556 If the target Role does not exist.
557 """
558 if role == "":
559 raise PyPermissionError("Role name cannot be empty!")
560 root_cte = (
561 select(RoleORM.id.label("role_id"))
562 .where(RoleORM.id == role)
563 .cte(recursive=True)
564 )
566 traversing_cte = root_cte.alias()
567 relations_cte = root_cte.union_all(
568 select(HierarchyORM.parent_role_id).where(
569 HierarchyORM.child_role_id == traversing_cte.c.role_id
570 )
571 )
572 policy_orms = db.scalars(
573 select(PolicyORM)
574 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id)
575 .where(
576 PolicyORM.resource_type == permission.resource_type,
577 PolicyORM.resource_id.in_((permission.resource_id, "*")),
578 PolicyORM.action == permission.action,
579 )
580 ).all()
582 if len(policy_orms) == 0:
583 role_orm = db.get(RoleORM, role)
584 if role_orm is None:
585 raise PyPermissionError(f"Role '{role}' does not exist!")
586 return False
587 return True
589 @classmethod
590 def assert_permission(
591 cls,
592 *,
593 role: str,
594 permission: Permission,
595 db: Session,
596 ) -> None:
597 """
598 Check if a Role has a Permission.
600 Parameters
601 ----------
602 role : str
603 The target RoleID.
604 permission : Permission
605 The Permission to check for.
606 db : Session
607 The SQLAlchemy session.
609 Raises
610 ------
611 PyPermissionNotGrantedError
612 If the Permission is not granted.
613 PyPermissionError
614 If the target Role does not exist.
615 """
616 if role == "":
617 raise PyPermissionError("Role name cannot be empty!")
618 if not cls.check_permission(role=role, permission=permission, db=db):
619 raise PermissionNotGrantedError(
620 f"Permission '{permission}' is not granted for Role '{role}'!"
621 )
623 @classmethod
624 def permissions(
625 cls,
626 *,
627 role: str,
628 inherited: bool = True,
629 db: Session,
630 ) -> tuple[Permission, ...]:
631 """
632 Get all granted Permissions for a Role.
634 Parameters
635 ----------
636 role : str
637 The target RoleID.
638 inherited : bool
639 Includes all Permissions inherited by ancestor Roles.
640 db : Session
641 The SQLAlchemy session.
643 Returns
644 -------
645 tuple[Permission, ...]
646 A tuple containing all granted Permissions.
648 Raises
649 ------
650 PyPermissionError
651 If the target Role does not exist.
652 """
653 if role == "":
654 raise PyPermissionError("Role name cannot be empty!")
655 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db)
656 if len(policy_orms) == 0:
657 role_orm = db.get(RoleORM, role)
658 if role_orm is None:
659 raise PyPermissionError(f"Role '{role}' does not exist!")
661 return tuple(
662 Permission(
663 resource_type=policy_orm.resource_type,
664 resource_id=policy_orm.resource_id,
665 action=policy_orm.action,
666 )
667 for policy_orm in policy_orms
668 )
670 @classmethod
671 def policies(
672 cls,
673 *,
674 role: str,
675 inherited: bool = True,
676 db: Session,
677 ) -> tuple[Policy, ...]:
678 """
679 Get all granted Policies for a Role.
681 Parameters
682 ----------
683 role : str
684 The target RoleID.
685 inherited : bool
686 Includes all Policies inherited by ancestor Roles.
687 db : Session
688 The SQLAlchemy session.
690 Returns
691 -------
692 tuple[Policies, ...]
693 A tuple containing all granted Policies.
695 Raises
696 ------
697 PyPermissionError
698 If `role` is empty string.
699 If the target Role does not exist.
700 """
701 if role == "":
702 raise PyPermissionError("Role name cannot be empty!")
703 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db)
705 if len(policy_orms) == 0:
706 role_orm = db.get(RoleORM, role)
707 if role_orm is None:
708 raise PyPermissionError(f"Role '{role}' does not exist!")
710 return tuple(
711 Policy(
712 role=policy_orm.role_id,
713 permission=Permission(
714 resource_type=policy_orm.resource_type,
715 resource_id=policy_orm.resource_id,
716 action=policy_orm.action,
717 ),
718 )
719 for policy_orm in policy_orms
720 )
722 @classmethod
723 def actions_on_resource(
724 cls,
725 *,
726 role: str,
727 resource_type: str,
728 resource_id: str,
729 inherited: bool = True,
730 db: Session,
731 ) -> tuple[str, ...]:
732 """
733 Get all **Actions** granted on a **Resource** for a **Role**.
735 Parameters
736 ----------
737 role : str
738 The target **RoleID**.
739 resource_type : str
740 The **ResourceType** to check.
741 resource_id : str
742 The **ResourceID** to check.
743 inherited : bool
744 Includes all **Actions** inherited by ancestor **Roles**.
745 db : Session
746 The SQLAlchemy session.
748 Returns
749 -------
750 tuple[str, ...]
751 A tuple containing all granted action IDs.
753 Raises
754 ------
755 PyPermissionError
756 If `role` is empty string.
757 If `resource_type` is empty string.
758 If the target **Role** does not exist.
759 """
760 if role == "":
761 raise PyPermissionError("Role name cannot be empty!")
762 if resource_type == "":
763 raise PyPermissionError("Resource type cannot be empty!")
765 if inherited:
766 root_cte = (
767 select(RoleORM.id.label("role_id"))
768 .where(RoleORM.id == role)
769 .cte(recursive=True)
770 )
771 traversing_cte = root_cte.alias()
772 relations_cte = root_cte.union_all(
773 select(HierarchyORM.parent_role_id).where(
774 HierarchyORM.child_role_id == traversing_cte.c.role_id
775 )
776 )
777 selection = (
778 select(PolicyORM.action)
779 .join(
780 relations_cte,
781 PolicyORM.role_id == relations_cte.c.role_id,
782 )
783 .where(
784 PolicyORM.resource_type == resource_type,
785 PolicyORM.resource_id.in_((resource_id, "*")),
786 )
787 )
788 else:
789 selection = select(PolicyORM.action).where(
790 PolicyORM.role_id == role,
791 PolicyORM.resource_type == resource_type,
792 PolicyORM.resource_id.in_((resource_id, "*")),
793 )
794 result = db.scalars(selection).all()
796 if len(result) == 0:
797 role_orm = db.get(RoleORM, role)
798 if role_orm is None:
799 raise PyPermissionError(f"Role '{role}' does not exist!")
800 return tuple(result)
803################################################################################
804#### Util
805################################################################################
808def _get_policy_orms_for_role(
809 *, role: str, inherited: bool = True, db: Session
810) -> Sequence[PolicyORM]:
811 # TODO raise IntegrityError if role is unknown and if possible via ORM
812 if inherited:
813 root_cte = (
814 select(RoleORM.id.label("role_id"))
815 .where(RoleORM.id == role)
816 .cte(recursive=True)
817 )
819 traversing_cte = root_cte.alias()
820 relations_cte = root_cte.union_all(
821 select(HierarchyORM.parent_role_id).where(
822 HierarchyORM.child_role_id == traversing_cte.c.role_id
823 )
824 )
825 policy_orms = (
826 db.scalars(
827 select(PolicyORM).join(
828 relations_cte, PolicyORM.role_id == relations_cte.c.role_id
829 )
830 )
831 .unique()
832 .all()
833 )
834 else:
835 policy_orms = (
836 db.scalars(select(PolicyORM).where(PolicyORM.role_id == role))
837 .unique()
838 .all()
839 )
841 return policy_orms