Coverage for src/pypermission/service/subject.py: 100%
132 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 14:14 +0000
1from collections.abc import Sequence
3from sqlalchemy.exc import IntegrityError
4from sqlalchemy.orm import Session
5from sqlalchemy.sql import select
7from pypermission.exc import PermissionNotGrantedError, PyPermissionError
8from pypermission.models import (
9 FrozenClass,
10 HierarchyORM,
11 MemberORM,
12 Permission,
13 Policy,
14 PolicyORM,
15 RoleORM,
16 SubjectORM,
17)
18from pypermission.util.exception_handling import process_subject_role_integrity_error
20################################################################################
21#### SubjectService
22################################################################################
25class SubjectService(metaclass=FrozenClass):
27 @classmethod
28 def create(cls, *, subject: str, db: Session) -> None:
29 """
30 Create a new Subject.
32 Parameters
33 ----------
34 subject : str
35 The SubjectID of the Subject to create.
36 db : Session
37 The SQLAlchemy session.
39 Raises
40 ------
41 PyPermissionError
42 If a Subject with the given SubjectID already exists or `subject` is empty string.
43 """
44 if subject == "":
45 raise PyPermissionError("Subject name cannot be empty!")
46 try:
47 subject_orm = SubjectORM(id=subject)
48 db.add(subject_orm)
49 db.flush()
50 except IntegrityError:
51 db.rollback()
52 raise PyPermissionError(f"Subject '{subject}' already exists!")
54 @classmethod
55 def delete(cls, *, subject: str, db: Session) -> None:
56 """
57 Delete an existing Subject.
59 Parameters
60 ----------
61 subject : str
62 The SubjectID to delete.
63 db : Session
64 The SQLAlchemy session.
66 Raises
67 ------
68 PyPermissionError
69 If a Subject with the given SubjectID does not exist or `subject` is empty string.
70 """
71 if subject == "":
72 raise PyPermissionError("Subject name cannot be empty!")
73 subject_orm = db.get(SubjectORM, subject)
74 if subject_orm is None:
75 raise PyPermissionError(f"Subject '{subject}' does not exist!")
76 db.delete(subject_orm)
77 db.flush()
79 @classmethod
80 def list(cls, *, db: Session) -> tuple[str, ...]:
81 """
82 Get all Subjects.
84 Parameters
85 ----------
86 db : Session
87 The SQLAlchemy session.
89 Returns
90 -------
91 tuple[str, ...]
92 A tuple containing all SubjectIDs.
93 """
94 subjects = db.scalars(select(SubjectORM.id)).all()
95 return tuple(subjects)
97 @classmethod
98 def assign_role(cls, *, subject: str, role: str, db: Session) -> None:
99 """
100 Assign a Subject to a Role.
102 Parameters
103 ----------
104 subject : str
105 The target SubjectID.
106 role : str
107 The target RoleID.
108 db : Session
109 The SQLAlchemy session.
111 Raises
112 ------
113 PyPermissionError
114 If `subject` is empty string.
115 If `role` is empty string.
116 If the Subject does not exist.
117 If the Role does not exist.
118 If the Subject was assigned to Role before. TODO
119 """
120 if role == "":
121 raise PyPermissionError("Role name cannot be empty!")
122 if subject == "":
123 raise PyPermissionError("Subject name cannot be empty!")
124 try:
125 member_orm = MemberORM(role_id=role, subject_id=subject)
126 db.add(member_orm)
127 db.flush()
128 except IntegrityError as err:
129 db.rollback()
130 process_subject_role_integrity_error(err=err, subject=subject, role=role)
132 @classmethod
133 def deassign_role(cls, *, subject: str, role: str, db: Session) -> None:
134 """
135 Deassign a Subject from a Role.
137 Parameters
138 ----------
139 subject : str
140 The target SubjectID.
141 role : str
142 The target RoleID.
143 db : Session
144 The SQLAlchemy session.
146 Raises
147 ------
148 PyPermissionError
149 If `subject` is empty string.
150 If `role` is empty string.
151 If the Subject does not exist.
152 If the Role does not exist.
153 If the Subject is not assigned to the Role. TODO
154 """
155 if role == "":
156 raise PyPermissionError("Role name cannot be empty!")
157 if subject == "":
158 raise PyPermissionError("Subject name cannot be empty!")
159 # TODO raise IntegrityError if subject or role is unknown and if possible via ORM
160 member_orm = db.get(MemberORM, (role, subject))
161 if member_orm is None:
162 subject_orm = db.get(SubjectORM, subject)
163 if subject_orm is None:
164 raise PyPermissionError(f"Subject '{subject}' does not exist!")
165 role_orm = db.get(RoleORM, role)
166 if role_orm is None:
167 raise PyPermissionError(f"Role '{role}' does not exist!")
168 raise PyPermissionError(
169 f"Role '{role}' is not assigned to Subject '{subject}'!"
170 )
171 db.delete(member_orm)
172 db.flush()
174 @classmethod
175 def roles(
176 cls, *, subject: str, include_ascendant_roles: bool = False, db: Session
177 ) -> tuple[str, ...]:
178 """
179 Get all Roles assigned to a Subject.
181 Parameters
182 ----------
183 subject : str
184 The target SubjectID.
185 include_ascendant_roles: bool
186 Include all ascendant Roles.
187 db : Session
188 The SQLAlchemy session.
190 Returns
191 -------
192 tuple[str, ...]
193 A tuple containing all assigned RoleIDs.
195 Raises
196 ------
197 PyPermissionError
198 If `subject` is empty string.
199 If the target Subject does not exist.
200 """
201 if subject == "":
202 raise PyPermissionError("Subject name cannot be empty!")
203 if include_ascendant_roles:
204 root_cte = (
205 select(MemberORM.role_id)
206 .where(MemberORM.subject_id == subject)
207 .cte(recursive=True)
208 )
209 traversing_cte = root_cte.alias()
210 relations_cte = root_cte.union_all(
211 select(HierarchyORM.parent_role_id).join(
212 traversing_cte,
213 HierarchyORM.child_role_id == traversing_cte.c.role_id,
214 )
215 )
216 selection = select(relations_cte)
217 roles = db.scalars(selection).unique().all()
218 else:
219 roles = db.scalars(
220 select(MemberORM.role_id).where(MemberORM.subject_id == subject)
221 ).all()
223 if len(roles) == 0 and db.get(SubjectORM, subject) is None:
224 raise PyPermissionError(f"Subject '{subject}' does not exist!")
225 return tuple(roles)
227 @classmethod
228 def check_permission(
229 cls,
230 *,
231 subject: str,
232 permission: Permission,
233 db: Session,
234 ) -> bool:
235 """
236 Check if a Subject has access to a specific Permission via its Role hierarchy.
238 Parameters
239 ----------
240 subject : str
241 The target SubjectID.
242 permission : Permission
243 The Permission to check for.
244 db : Session
245 The SQLAlchemy session.
247 Returns
248 -------
249 bool
250 True if the Permission is granted.
252 Raises
253 ------
254 PyPermissionError
255 If `subject` is empty string.
256 If the target Subject does not exist. TODO
257 """
258 # TODO raise IntegrityError if subject is unknown and if possible via ORM
259 if subject == "":
260 raise PyPermissionError("Subject name cannot be empty!")
261 root_cte = (
262 select(MemberORM.role_id)
263 .where(MemberORM.subject_id == subject)
264 .cte(recursive=True)
265 )
267 traversing_cte = root_cte.alias()
268 relations_cte = root_cte.union_all(
269 select(HierarchyORM.parent_role_id).join(
270 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id
271 )
272 )
274 policy_orms = db.scalars(
275 select(PolicyORM)
276 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id)
277 .where(
278 PolicyORM.resource_type == permission.resource_type,
279 PolicyORM.resource_id.in_((permission.resource_id, "*")),
280 PolicyORM.action == permission.action,
281 )
282 ).all()
283 if len(policy_orms) > 0:
284 return True
285 subject_orm = db.get(SubjectORM, subject)
286 if subject_orm is None:
287 raise PyPermissionError(f"Subject '{subject}' does not exist!")
288 return False
290 @classmethod
291 def assert_permission(
292 cls,
293 *,
294 subject: str,
295 permission: Permission,
296 db: Session,
297 ) -> None:
298 """
299 Asserts that a Subject has access to a specific Permission via its Role hierarchy.
301 Parameters
302 ----------
303 subject : str
304 The target SubjectID.
305 permission : Permission
306 The Permission to check for.
307 db : Session
308 The SQLAlchemy session.
310 Raises
311 ------
312 PyPermissionNotGrantedError
313 If the Permission is not granted.
314 PyPermissionError
315 If `subject` is empty string.
316 If the target Subject does not exist.
317 """
318 if not cls.check_permission(subject=subject, permission=permission, db=db):
319 raise PermissionNotGrantedError(
320 f"Permission '{permission}' is not granted for Subject '{subject}'!"
321 )
323 @classmethod
324 def permissions(cls, *, subject: str, db: Session) -> tuple[Permission, ...]:
325 """
326 Get all Permissions a Subject has access to via its Role hierarchy.
328 Parameters
329 ----------
330 subject : str
331 The target SubjectID.
332 db : Session
333 The SQLAlchemy session.
335 Returns
336 -------
337 tuple[Permission, ...]
338 A tuple containing all granted Permissions.
340 Raises
341 ------
342 PyPermissionError
343 If `subject` is empty string.
344 If the target Subject does not exist.
345 """
346 if subject == "":
347 raise PyPermissionError("Subject name cannot be empty!")
348 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db)
350 return tuple(
351 Permission(
352 resource_type=policy_orm.resource_type,
353 resource_id=policy_orm.resource_id,
354 action=policy_orm.action,
355 )
356 for policy_orm in policy_orms
357 )
359 @classmethod
360 def policies(cls, *, subject: str, db: Session) -> tuple[Policy, ...]:
361 """
362 Get all Policies associated to a Subject via its Role hierarchy.
364 Parameters
365 ----------
366 subject : str
367 The target SubjectID.
368 db : Session
369 The SQLAlchemy session.
371 Returns
372 -------
373 tuple[Policies, ...]
374 A tuple containing all granted Policies.
376 Raises
377 ------
378 PyPermissionError
379 If `subject` is empty string.
380 If the target Subject does not exist.
381 """
382 if subject == "":
383 raise PyPermissionError("Subject name cannot be empty!")
384 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db)
386 return tuple(
387 Policy(
388 role=policy_orm.role_id,
389 permission=Permission(
390 resource_type=policy_orm.resource_type,
391 resource_id=policy_orm.resource_id,
392 action=policy_orm.action,
393 ),
394 )
395 for policy_orm in policy_orms
396 )
398 @classmethod
399 def actions_on_resource(
400 cls,
401 *,
402 subject: str,
403 resource_type: str,
404 resource_id: str,
405 inherited: bool = True,
406 db: Session,
407 ) -> tuple[str, ...]:
408 """
409 Get all Actions granted to a **Subject** on a specific **Resource**.
411 Parameters
412 ----------
413 subject : str
414 The target **SubjectID**.
415 resource_type : str
416 The **ResourceType** of the **Resource**.
417 resource_id : str
418 The **ResourceID** of the **Resource**.
419 inherited : bool
420 Whether to include inherited **Actions** from **Role** hierarchies.
421 db : Session
422 The SQLAlchemy session.
424 Returns
425 -------
426 tuple[str, ...]
427 A tuple containing all granted **Action** values.
429 Raises
430 ------
431 PyPermissionError
432 If `subject` is empty string.
433 If `resource_type` is empty string.
434 If the target **Subject** does not exist.
435 """
436 if subject == "":
437 raise PyPermissionError("Subject name cannot be empty!")
438 if resource_type == "":
439 raise PyPermissionError("Resource type cannot be empty!")
440 if inherited:
441 root_cte = (
442 select(MemberORM.role_id)
443 .where(MemberORM.subject_id == subject)
444 .cte(recursive=True)
445 )
446 traversing_cte = root_cte.alias()
447 relations_cte = root_cte.union_all(
448 select(HierarchyORM.parent_role_id).join(
449 traversing_cte,
450 HierarchyORM.child_role_id == traversing_cte.c.role_id,
451 )
452 )
453 actions = (
454 db.scalars(
455 select(PolicyORM.action, PolicyORM.role_id)
456 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id)
457 .where(
458 PolicyORM.resource_type == resource_type,
459 PolicyORM.resource_id.in_((resource_id, "*")),
460 )
461 )
462 .unique()
463 .all()
464 )
465 tuple(actions)
466 else:
467 selection = (
468 select(PolicyORM.action, PolicyORM.role_id)
469 .join(MemberORM, MemberORM.role_id == PolicyORM.role_id)
470 .where(
471 MemberORM.subject_id == subject,
472 PolicyORM.resource_type == resource_type,
473 PolicyORM.resource_id.in_((resource_id, "*")),
474 )
475 )
476 actions = db.scalars(selection).unique().all()
477 if len(actions) == 0 and db.get(SubjectORM, subject) is None:
478 raise PyPermissionError(f"Subject '{subject}' does not exist!")
479 return tuple(actions)
482################################################################################
483#### Util
484################################################################################
487def _get_policy_orms_for_subject(*, subject: str, db: Session) -> Sequence[PolicyORM]:
488 """
489 Get all PolicyORM objects associated to a Subject via its Role hierarchy.
491 Parameters
492 ----------
493 subject : str
494 The target SubjectID.
495 db : Session
496 The SQLAlchemy session.
498 Returns
499 -------
500 Sequence[PolicyORM]
501 A Sequence containing all associated PolicyORM objects.
503 Raises
504 ------
505 PyPermissionError
506 If the target Subject does not exist.
507 """
508 subject_orm = db.get(SubjectORM, subject)
509 if not subject_orm:
510 raise PyPermissionError(f"Subject '{subject}' does not exist!")
511 root_cte = (
512 select(MemberORM.role_id)
513 .where(MemberORM.subject_id == subject)
514 .cte(recursive=True)
515 )
517 traversing_cte = root_cte.alias()
518 relations_cte = root_cte.union_all(
519 select(HierarchyORM.parent_role_id).join(
520 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id
521 )
522 )
524 policy_orms = (
525 db.scalars(
526 select(PolicyORM).join(
527 relations_cte, PolicyORM.role_id == relations_cte.c.role_id
528 )
529 )
530 .unique()
531 .all()
532 )
534 return policy_orms