Coverage for src/pypermission/service/role.py: 99%

197 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-12-01 18:06 +0000

1from collections.abc import Sequence 

2 

3from sqlalchemy.exc import IntegrityError 

4from sqlalchemy.orm import Session 

5from sqlalchemy.sql import select 

6 

7from pypermission.exc import PermissionNotGrantedError, PyPermissionError 

8from pypermission.models import ( 

9 FrozenClass, 

10 HierarchyORM, 

11 MemberORM, 

12 Permission, 

13 Policy, 

14 PolicyORM, 

15 RoleORM, 

16) 

17from pypermission.util.exception_handling import process_policy_integrity_error 

18from pypermission.util.input_validation import validate_rbac_parameters 

19 

20################################################################################ 

21#### RoleService 

22################################################################################ 

23 

24 

25class RoleService(metaclass=FrozenClass): 

26 # NOTE: we can use the Policy object when dealing with role + permission to let it handle empty role names for us. 

27 @classmethod 

28 @validate_rbac_parameters 

29 def create(cls, *, role: str, db: Session) -> None: 

30 """ 

31 Create a new **Role**. 

32 

33 Parameters 

34 ---------- 

35 role : str 

36 The **RoleID** of the **Role** to create. 

37 db : Session 

38 The SQLAlchemy session. 

39 

40 Raises 

41 ------ 

42 PyPermissionError 

43 If `role` is an empty string. 

44 If a **Role** with the given **RoleID** already exists. 

45 """ 

46 try: 

47 role_orm = RoleORM(id=role) 

48 db.add(role_orm) 

49 db.flush() 

50 except IntegrityError as err: 

51 db.rollback() 

52 raise PyPermissionError( 

53 f"Conflict: Role with ID '{role}' already exists!" 

54 ) from err 

55 

56 @classmethod 

57 @validate_rbac_parameters 

58 def delete(cls, *, role: str, db: Session) -> None: 

59 """ 

60 Delete an existing **Role**. 

61 

62 Parameters 

63 ---------- 

64 role : str 

65 The **RoleID** to delete. 

66 db : Session 

67 The SQLAlchemy session. 

68 

69 Raises 

70 ------ 

71 PyPermissionError 

72 If `role` is an empty string. 

73 If a **Role** with the given **RoleID** does not exist. 

74 """ 

75 role_orm = db.get(RoleORM, role) 

76 if role_orm is None: 

77 raise PyPermissionError(f"Role '{role}' does not exist!") 

78 db.delete(role_orm) 

79 db.flush() 

80 

81 @classmethod 

82 def list(cls, *, db: Session) -> tuple[str, ...]: 

83 """ 

84 Get all **Roles**. 

85 

86 Parameters 

87 ---------- 

88 db : Session 

89 The SQLAlchemy session. 

90 

91 Returns 

92 ------- 

93 tuple[str, ...] 

94 A tuple containing all **RoleIDs**. 

95 """ 

96 role_orms = db.scalars(select(RoleORM)).all() 

97 return tuple(role_orm.id for role_orm in role_orms) 

98 

99 @classmethod 

100 @validate_rbac_parameters 

101 def add_hierarchy(cls, *, parent_role: str, child_role: str, db: Session) -> None: 

102 """ 

103 Add a parent-child **Hierarchy** between two **Roles**. 

104 

105 Parameters 

106 ---------- 

107 parent_role : str 

108 The parent **RoleID**. 

109 child_role : str 

110 The child **RoleID**. 

111 db : Session 

112 The SQLAlchemy session. 

113 

114 Raises 

115 ------ 

116 PyPermissionError 

117 If the `parent_role` or `child_role` is an empty string. 

118 If `parent_role` and `child_role` are identical. 

119 If one or both **Roles** do not exist in the system. 

120 If adding the **Hierarchy** would create a cycle in the RBAC **Hierarchy**. 

121 If the **Hierarchy** already exists. 

122 """ 

123 if parent_role == child_role: 

124 raise PyPermissionError( 

125 f"Conflict: A Role Hierarchy requires distinct RoleIDs, received '{parent_role}' twice!" 

126 ) 

127 

128 roles = db.scalars( 

129 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role])) 

130 ).all() 

131 if len(roles) == 1: 

132 missing_role = child_role if parent_role in roles else parent_role 

133 raise PyPermissionError(f"Role '{missing_role}' does not exist!") 

134 elif len(roles) == 0: 

135 raise PyPermissionError( 

136 f"Roles '{parent_role}' and '{child_role}' do not exist!" 

137 ) 

138 

139 root_cte = ( 

140 select(HierarchyORM) 

141 .where(HierarchyORM.parent_role_id == child_role) 

142 .cte(recursive=True) 

143 ) 

144 

145 traversing_cte = root_cte.alias() 

146 relations_cte = root_cte.union_all( 

147 select(HierarchyORM).where( 

148 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id 

149 ) 

150 ) 

151 

152 critical_leaf_relations = db.execute( 

153 select(relations_cte).where(relations_cte.c.child_role_id == parent_role) 

154 ).all() 

155 

156 if critical_leaf_relations: 

157 raise PyPermissionError( 

158 "Conflict: Desired Role Hierarchy would create a cycle!" 

159 ) 

160 

161 try: 

162 hierarchy_orm = HierarchyORM( 

163 parent_role_id=parent_role, child_role_id=child_role 

164 ) 

165 db.add(hierarchy_orm) 

166 db.flush() 

167 except IntegrityError as err: 

168 db.rollback() 

169 raise PyPermissionError( 

170 f"Conflict: Role Hierarchy '{parent_role}' -> '{child_role}' exists!" 

171 ) from err 

172 

173 @classmethod 

174 @validate_rbac_parameters 

175 def remove_hierarchy( 

176 cls, *, parent_role: str, child_role: str, db: Session 

177 ) -> None: 

178 """ 

179 Remove a parent-child **Hierarchy** between two **Roles**. 

180 

181 Parameters 

182 ---------- 

183 parent_role : str 

184 The parent **RoleID**. 

185 child_role : str 

186 The child **RoleID**. 

187 db : Session 

188 The SQLAlchemy session. 

189 

190 Raises 

191 ------ 

192 PyPermissionError 

193 If the `parent_role` or `child_role` is an empty string. 

194 If `parent_role` and `child_role` are identical. 

195 If one or both **Roles** do not exist in the system. 

196 If the **Hierarchy** does not exist. 

197 """ 

198 

199 if parent_role == child_role: 

200 raise PyPermissionError( 

201 f"Conflict: A Role Hierarchy requires distinct RoleIDs, received '{parent_role}' twice!" 

202 ) 

203 

204 hierarchy_orm = db.get(HierarchyORM, (parent_role, child_role)) 

205 if hierarchy_orm is None: 

206 roles = db.scalars( 

207 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role])) 

208 ).all() 

209 if len(roles) == 1: 

210 missing_role = child_role if parent_role in roles else parent_role 

211 raise PyPermissionError(f"Role '{missing_role}' does not exist!") 

212 elif len(roles) == 0: 

213 raise PyPermissionError( 

214 f"Roles '{parent_role}' and '{child_role}' do not exist!" 

215 ) 

216 else: 

217 raise PyPermissionError( 

218 f"Hierarchy '{parent_role}' -> '{child_role}' does not exist!" 

219 ) 

220 

221 db.delete(hierarchy_orm) 

222 db.flush() 

223 

224 @classmethod 

225 @validate_rbac_parameters 

226 def parents(cls, *, role: str, db: Session) -> tuple[str, ...]: 

227 """ 

228 Get all parent **Roles**. 

229 

230 Parameters 

231 ---------- 

232 role : str 

233 The target **RoleID**. 

234 db : Session 

235 The SQLAlchemy session. 

236 

237 Returns 

238 ------- 

239 tuple[str, ...] 

240 A tuple containing all parent **RoleIDs**. 

241 

242 Raises 

243 ------ 

244 PyPermissionError 

245 If `role` is an empty string. 

246 If the target **Role** does not exist. 

247 """ 

248 parents = db.scalars( 

249 select(HierarchyORM.parent_role_id).where( 

250 HierarchyORM.child_role_id == role 

251 ) 

252 ).all() 

253 if len(parents) == 0 and db.get(RoleORM, role) is None: 

254 raise PyPermissionError(f"Role '{role}' does not exist!") 

255 return tuple(parents) 

256 

257 @classmethod 

258 @validate_rbac_parameters 

259 def children(cls, *, role: str, db: Session) -> tuple[str, ...]: 

260 """ 

261 Get all child **Roles**. 

262 

263 Parameters 

264 ---------- 

265 role : str 

266 The target **RoleID**. 

267 db : Session 

268 The SQLAlchemy session. 

269 

270 Returns 

271 ------- 

272 tuple[str, ...] 

273 A tuple containing all child **RoleIDs**. 

274 

275 Raises 

276 ------ 

277 PyPermissionError 

278 If `role` is an empty string. 

279 If the target **Role** does not exist. 

280 """ 

281 children = db.scalars( 

282 select(HierarchyORM.child_role_id).where( 

283 HierarchyORM.parent_role_id == role 

284 ) 

285 ).all() 

286 if len(children) == 0 and db.get(RoleORM, role) is None: 

287 raise PyPermissionError(f"Role '{role}' does not exist!") 

288 return tuple(children) 

289 

290 @classmethod 

291 @validate_rbac_parameters 

292 def ascendants(cls, *, role: str, db: Session) -> tuple[str, ...]: 

293 """ 

294 Get all ascendants **Roles**. 

295 

296 Parameters 

297 ---------- 

298 role : str 

299 The target **RoleID**. 

300 db : Session 

301 The SQLAlchemy session. 

302 

303 Returns 

304 ------- 

305 tuple[str, ...] 

306 A tuple containing all ascendant **RoleIDs**. 

307 

308 Raises 

309 ------ 

310 PyPermissionError 

311 If `role` is an empty string. 

312 If the target **Role** does not exist. 

313 """ 

314 root_cte = ( 

315 select(HierarchyORM) 

316 .where(HierarchyORM.child_role_id == role) 

317 .cte(name="root_cte", recursive=True) 

318 ) 

319 

320 traversing_cte = root_cte.alias() 

321 relations_cte = root_cte.union_all( 

322 select(HierarchyORM).where( 

323 HierarchyORM.child_role_id == traversing_cte.c.parent_role_id 

324 ) 

325 ) 

326 

327 ascendant_relations = ( 

328 db.scalars(select(relations_cte.c.parent_role_id)).unique().all() 

329 ) 

330 

331 if len(ascendant_relations) == 0 and db.get(RoleORM, role) is None: 

332 raise PyPermissionError(f"Role '{role}' does not exist!") 

333 return tuple(ascendant_relations) 

334 

335 @classmethod 

336 @validate_rbac_parameters 

337 def descendants(cls, *, role: str, db: Session) -> tuple[str, ...]: 

338 """ 

339 Get all descending **Roles**. 

340 

341 Parameters 

342 ---------- 

343 role : str 

344 The target **RoleID**. 

345 db : Session 

346 The SQLAlchemy session. 

347 

348 Returns 

349 ------- 

350 tuple[str, ...] 

351 A tuple containing all descending **RoleIDs**. 

352 

353 Raises 

354 ------ 

355 PyPermissionError 

356 If `role` is an empty string. 

357 If the target **Role** does not exist. 

358 """ 

359 root_cte = ( 

360 select(HierarchyORM) 

361 .where(HierarchyORM.parent_role_id == role) 

362 .cte(name="root_cte", recursive=True) 

363 ) 

364 

365 traversing_cte = root_cte.alias() 

366 relations_cte = root_cte.union_all( 

367 select(HierarchyORM).where( 

368 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id 

369 ) 

370 ) 

371 

372 descendant_relations = ( 

373 db.scalars(select(relations_cte.c.child_role_id)).unique().all() 

374 ) 

375 

376 if len(descendant_relations) == 0 and db.get(RoleORM, role) is None: 

377 raise PyPermissionError(f"Role '{role}' does not exist!") 

378 return tuple(descendant_relations) 

379 

380 @classmethod 

381 @validate_rbac_parameters 

382 def subjects( 

383 cls, *, role: str, include_descendant_subjects: bool = False, db: Session 

384 ) -> tuple[str, ...]: 

385 """ 

386 Get all **Subjects** assigned to a **Role**. 

387 

388 Parameters 

389 ---------- 

390 role : str 

391 The target **RoleID**. 

392 include_descendant_subjects: bool 

393 Include all **Subjects** for descendant **Roles**. 

394 db : Session 

395 The SQLAlchemy session. 

396 

397 Returns 

398 ------- 

399 tuple[str, ...] 

400 A tuple containing all assigned **SubjectIDs**. 

401 

402 Raises 

403 ------ 

404 PyPermissionError 

405 If `role` is an empty string. 

406 If the target **Role** does not exist. 

407 """ 

408 if include_descendant_subjects: 

409 root_cte = ( 

410 select(RoleORM.id.label("role_id")) 

411 .where(RoleORM.id == role) 

412 .cte(recursive=True) 

413 ) 

414 

415 traversing_cte = root_cte.alias() 

416 relations_cte = root_cte.union_all( 

417 select(HierarchyORM.child_role_id).where( 

418 HierarchyORM.parent_role_id == traversing_cte.c.role_id 

419 ) 

420 ) 

421 subjects = ( 

422 db.scalars( 

423 select(MemberORM.subject_id).join( 

424 relations_cte, MemberORM.role_id == relations_cte.c.role_id 

425 ) 

426 ) 

427 .unique() 

428 .all() 

429 ) 

430 else: 

431 subjects = db.scalars( 

432 select(MemberORM.subject_id).where(MemberORM.role_id == role) 

433 ).all() 

434 

435 if len(subjects) == 0 and db.get(RoleORM, role) is None: 

436 raise PyPermissionError(f"Role '{role}' does not exist!") 

437 return tuple(subjects) 

438 

439 @classmethod 

440 def grant_permission( 

441 cls, 

442 *, 

443 role: str, 

444 permission: Permission, 

445 db: Session, 

446 ) -> None: 

447 """ 

448 Grant a **Permission** to a **Role**. 

449 

450 Parameters 

451 ---------- 

452 role : str 

453 The target **RoleID**. 

454 db : Session 

455 The SQLAlchemy session. 

456 

457 Raises 

458 ------ 

459 PyPermissionError 

460 If `role` is an empty string. 

461 If the target **Role** does not exist. 

462 If the **Permission** is already granted to the **Role** (duplicate policy). 

463 """ 

464 policy = Policy(role=role, permission=permission) # does the validation for us 

465 try: 

466 policy_orm = PolicyORM( 

467 role_id=role, 

468 resource_type=permission.resource_type, 

469 resource_id=permission.resource_id, 

470 action=permission.action, 

471 ) 

472 db.add(policy_orm) 

473 db.flush() 

474 except IntegrityError as err: 

475 db.rollback() 

476 process_policy_integrity_error(err=err, policy=policy) 

477 

478 @classmethod 

479 def revoke_permission( 

480 cls, 

481 *, 

482 role: str, 

483 permission: Permission, 

484 db: Session, 

485 ) -> None: 

486 """ 

487 Revoke a **Permission** from a **Role**. 

488 

489 Parameters 

490 ---------- 

491 role : str 

492 The target **RoleID**. 

493 db : Session 

494 The SQLAlchemy session. 

495 

496 Raises 

497 ------ 

498 PyPermissionError 

499 If `role` is an empty string. 

500 If the target **Role** does not exist. 

501 If the permission was not granted to the role (policy does not exist). 

502 """ 

503 policy = Policy(role=role, permission=permission) 

504 policy_tuple = ( 

505 role, 

506 permission.resource_type, 

507 permission.resource_id, 

508 permission.action, 

509 ) 

510 policy_orm = db.get( 

511 PolicyORM, 

512 policy_tuple, 

513 ) 

514 if policy_orm is None: 

515 role_orm = db.get(RoleORM, role) 

516 if role_orm: 

517 raise PyPermissionError(f"Policy '{str(policy)}' does not exist!") 

518 raise PyPermissionError(f"Role '{role}' does not exist!") 

519 

520 db.delete(policy_orm) 

521 db.flush() 

522 

523 @classmethod # validated by policy 

524 def check_permission( 

525 cls, 

526 *, 

527 role: str, 

528 permission: Permission, 

529 db: Session, 

530 ) -> bool: 

531 """ 

532 Check if a **Role** has a **Permission**. 

533 

534 Parameters 

535 ---------- 

536 role : str 

537 The target **RoleID**. 

538 permission : Permission 

539 The **Permission** to check for. 

540 db : Session 

541 The SQLAlchemy session. 

542 

543 Returns 

544 ------- 

545 bool 

546 True if the **Permission** is granted. 

547 

548 Raises 

549 ------ 

550 PyPermissionError 

551 If `role` is an empty string. 

552 If the target **Role** does not exist. 

553 """ 

554 _ = Policy(role=role, permission=permission) # raises if role empty 

555 root_cte = ( 

556 select(RoleORM.id.label("role_id")) 

557 .where(RoleORM.id == role) 

558 .cte(recursive=True) 

559 ) 

560 

561 traversing_cte = root_cte.alias() 

562 relations_cte = root_cte.union_all( 

563 select(HierarchyORM.parent_role_id).where( 

564 HierarchyORM.child_role_id == traversing_cte.c.role_id 

565 ) 

566 ) 

567 policy_orms = db.scalars( 

568 select(PolicyORM) 

569 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id) 

570 .where( 

571 PolicyORM.resource_type == permission.resource_type, 

572 PolicyORM.resource_id.in_((permission.resource_id, "*")), 

573 PolicyORM.action == permission.action, 

574 ) 

575 ).all() 

576 

577 if len(policy_orms) == 0: 

578 role_orm = db.get(RoleORM, role) 

579 if role_orm is None: 

580 raise PyPermissionError(f"Role '{role}' does not exist!") 

581 return False 

582 return True 

583 

584 @classmethod # validated by check_permission 

585 def assert_permission( 

586 cls, 

587 *, 

588 role: str, 

589 permission: Permission, 

590 db: Session, 

591 ) -> None: 

592 """ 

593 Check if a **Role** has a **Permission**. 

594 

595 Parameters 

596 ---------- 

597 role : str 

598 The target **RoleID**. 

599 permission : Permission 

600 The **Permission** to check for. 

601 db : Session 

602 The SQLAlchemy session. 

603 

604 Raises 

605 ------ 

606 PermissionNotGrantedError 

607 If the **Permission** is not granted (including inherited permissions). 

608 PyPermissionError 

609 If `role` is an empty string. 

610 If the target **Role** does not exist. 

611 """ 

612 if not cls.check_permission(role=role, permission=permission, db=db): 

613 raise PermissionNotGrantedError( 

614 f"Permission '{permission}' is not granted for Role '{role}'!" 

615 ) 

616 

617 @classmethod 

618 @validate_rbac_parameters 

619 def permissions( 

620 cls, 

621 *, 

622 role: str, 

623 inherited: bool = True, 

624 db: Session, 

625 ) -> tuple[Permission, ...]: 

626 """ 

627 Get all granted **Permissions** for a **Role**. 

628 

629 Parameters 

630 ---------- 

631 role : str 

632 The target **RoleID**. 

633 inherited : bool 

634 Includes all **Permissions** inherited by ascendant **Roles**. 

635 db : Session 

636 The SQLAlchemy session. 

637 

638 Returns 

639 ------- 

640 tuple[Permission, ...] 

641 A tuple containing all granted **Permissions**. 

642 

643 Raises 

644 ------ 

645 PyPermissionError 

646 If `role` is an empty string. 

647 If the target **Role** does not exist. 

648 """ 

649 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db) 

650 if len(policy_orms) == 0: 

651 role_orm = db.get(RoleORM, role) 

652 if role_orm is None: 652 ↛ 655line 652 didn't jump to line 655 because the condition on line 652 was always true

653 raise PyPermissionError(f"Role '{role}' does not exist!") 

654 

655 return tuple( 

656 Permission( 

657 resource_type=policy_orm.resource_type, 

658 resource_id=policy_orm.resource_id, 

659 action=policy_orm.action, 

660 ) 

661 for policy_orm in policy_orms 

662 ) 

663 

664 @classmethod 

665 @validate_rbac_parameters 

666 def policies( 

667 cls, 

668 *, 

669 role: str, 

670 inherited: bool = True, 

671 db: Session, 

672 ) -> tuple[Policy, ...]: 

673 """ 

674 Get all granted **Policies** for a **Role**. 

675 

676 Parameters 

677 ---------- 

678 role : str 

679 The target **RoleID**. 

680 inherited : bool 

681 Includes all **Policies** inherited by ascendant **Roles**. 

682 db : Session 

683 The SQLAlchemy session. 

684 

685 Returns 

686 ------- 

687 tuple[Policies, ...] 

688 A tuple containing all granted **Policies**. 

689 

690 Raises 

691 ------ 

692 PyPermissionError 

693 If `role` is an empty string. 

694 If the target **Role** does not exist. 

695 """ 

696 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db) 

697 

698 if len(policy_orms) == 0: 

699 role_orm = db.get(RoleORM, role) 

700 if role_orm is None: 700 ↛ 703line 700 didn't jump to line 703 because the condition on line 700 was always true

701 raise PyPermissionError(f"Role '{role}' does not exist!") 

702 

703 return tuple( 

704 Policy( 

705 role=policy_orm.role_id, 

706 permission=Permission( 

707 resource_type=policy_orm.resource_type, 

708 resource_id=policy_orm.resource_id, 

709 action=policy_orm.action, 

710 ), 

711 ) 

712 for policy_orm in policy_orms 

713 ) 

714 

715 @classmethod 

716 @validate_rbac_parameters 

717 def actions_on_resource( 

718 cls, 

719 *, 

720 role: str, 

721 resource_type: str, 

722 resource_id: str, 

723 inherited: bool = True, 

724 db: Session, 

725 ) -> tuple[str, ...]: 

726 """ 

727 Get all **Actions** granted on a **Resource** for a **Role**. 

728 

729 Parameters 

730 ---------- 

731 role : str 

732 The target **RoleID**. 

733 resource_type : str 

734 The **ResourceType** to check. 

735 resource_id : str 

736 The **ResourceID** to check. 

737 inherited : bool 

738 Includes all **Actions** inherited by ascendant **Roles**. 

739 db : Session 

740 The SQLAlchemy session. 

741 

742 Returns 

743 ------- 

744 tuple[str, ...] 

745 A tuple containing all granted **Actions**. 

746 

747 Raises 

748 ------ 

749 PyPermissionError 

750 If `role` is an empty string. 

751 If the **ResourceType** is an empty string. 

752 If the target **Role** does not exist. 

753 """ 

754 

755 if inherited: 

756 root_cte = ( 

757 select(RoleORM.id.label("role_id")) 

758 .where(RoleORM.id == role) 

759 .cte(recursive=True) 

760 ) 

761 traversing_cte = root_cte.alias() 

762 relations_cte = root_cte.union_all( 

763 select(HierarchyORM.parent_role_id).where( 

764 HierarchyORM.child_role_id == traversing_cte.c.role_id 

765 ) 

766 ) 

767 selection = ( 

768 select(PolicyORM.action) 

769 .join( 

770 relations_cte, 

771 PolicyORM.role_id == relations_cte.c.role_id, 

772 ) 

773 .where( 

774 PolicyORM.resource_type == resource_type, 

775 PolicyORM.resource_id.in_((resource_id, "*")), 

776 ) 

777 ) 

778 else: 

779 selection = select(PolicyORM.action).where( 

780 PolicyORM.role_id == role, 

781 PolicyORM.resource_type == resource_type, 

782 PolicyORM.resource_id.in_((resource_id, "*")), 

783 ) 

784 result = db.scalars(selection).all() 

785 

786 if len(result) == 0: 

787 role_orm = db.get(RoleORM, role) 

788 if role_orm is None: 

789 raise PyPermissionError(f"Role '{role}' does not exist!") 

790 return tuple(result) 

791 

792 

793################################################################################ 

794#### Util 

795################################################################################ 

796 

797 

798def _get_policy_orms_for_role( 

799 *, role: str, inherited: bool = True, db: Session 

800) -> Sequence[PolicyORM]: 

801 if inherited: 

802 root_cte = ( 

803 select(RoleORM.id.label("role_id")) 

804 .where(RoleORM.id == role) 

805 .cte(recursive=True) 

806 ) 

807 

808 traversing_cte = root_cte.alias() 

809 relations_cte = root_cte.union_all( 

810 select(HierarchyORM.parent_role_id).where( 

811 HierarchyORM.child_role_id == traversing_cte.c.role_id 

812 ) 

813 ) 

814 policy_orms = ( 

815 db.scalars( 

816 select(PolicyORM).join( 

817 relations_cte, PolicyORM.role_id == relations_cte.c.role_id 

818 ) 

819 ) 

820 .unique() 

821 .all() 

822 ) 

823 else: 

824 policy_orms = ( 

825 db.scalars(select(PolicyORM).where(PolicyORM.role_id == role)) 

826 .unique() 

827 .all() 

828 ) 

829 

830 return policy_orms