Coverage for src/pypermission/service/role.py: 100%

219 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-14 14:14 +0000

1from collections.abc import Sequence 

2 

3from sqlalchemy.exc import IntegrityError 

4from sqlalchemy.orm import Session 

5from sqlalchemy.sql import select 

6 

7from pypermission.exc import PermissionNotGrantedError, PyPermissionError 

8from pypermission.models import ( 

9 FrozenClass, 

10 HierarchyORM, 

11 MemberORM, 

12 Permission, 

13 Policy, 

14 PolicyORM, 

15 RoleORM, 

16) 

17from pypermission.util.exception_handling import process_policy_integrity_error 

18 

19################################################################################ 

20#### RoleService 

21################################################################################ 

22 

23 

24class RoleService(metaclass=FrozenClass): 

25 

26 @classmethod 

27 def create(cls, *, role: str, db: Session) -> None: 

28 """ 

29 Create a new Role. 

30 

31 Parameters 

32 ---------- 

33 role : str 

34 The RoleID of the Role to create. 

35 db : Session 

36 The SQLAlchemy session. 

37 

38 Raises 

39 ------ 

40 PyPermissionError 

41 If a Role with the given RoleID already exists. 

42 """ 

43 if role == "": 

44 raise PyPermissionError("Role name cannot be empty!") 

45 try: 

46 role_orm = RoleORM(id=role) 

47 db.add(role_orm) 

48 db.flush() 

49 except IntegrityError as err: 

50 db.rollback() 

51 raise PyPermissionError(f"Role '{role}' already exists!") from err 

52 

53 @classmethod 

54 def delete(cls, *, role: str, db: Session) -> None: 

55 """ 

56 Delete an existing Role. 

57 

58 Parameters 

59 ---------- 

60 role : str 

61 The RoleID to delete. 

62 db : Session 

63 The SQLAlchemy session. 

64 

65 Raises 

66 ------ 

67 PyPermissionError 

68 If a Role with the given RoleID does not exist. 

69 """ 

70 if role == "": 

71 raise PyPermissionError("Role name cannot be empty!") 

72 role_orm = db.get(RoleORM, role) 

73 if role_orm is None: 

74 raise PyPermissionError(f"Role '{role}' does not exist!") 

75 db.delete(role_orm) 

76 db.flush() 

77 

78 @classmethod 

79 def list(cls, *, db: Session) -> tuple[str, ...]: 

80 """ 

81 Get all Roles. 

82 

83 Parameters 

84 ---------- 

85 db : Session 

86 The SQLAlchemy session. 

87 

88 Returns 

89 ------- 

90 tuple[str, ...] 

91 A tuple containing all RoleIDs. 

92 """ 

93 role_orms = db.scalars(select(RoleORM)).all() 

94 return tuple(role_orm.id for role_orm in role_orms) 

95 

96 @classmethod 

97 def add_hierarchy(cls, *, parent_role: str, child_role: str, db: Session) -> None: 

98 """ 

99 Add a parent-child hierarchy between two Roles. 

100 

101 Parameters 

102 ---------- 

103 parent_role : str 

104 The parent RoleID. 

105 child_role : str 

106 The child RoleID. 

107 db : Session 

108 The SQLAlchemy session. 

109 

110 Raises 

111 ------ 

112 PyPermissionError 

113 If arguments `parent_role` and `child_role` are equal. 

114 If one or both Roles do not exist. 

115 If adding the hierarchy would create a cycle. 

116 If the hierarchy already exists. 

117 """ 

118 if parent_role == "": 

119 raise PyPermissionError( 

120 "Role name cannot be empty, but `parent_role` is empty!" 

121 ) 

122 if child_role == "": 

123 raise PyPermissionError( 

124 "Role name cannot be empty, but `child_role` is empty!" 

125 ) 

126 if parent_role == child_role: 

127 raise PyPermissionError(f"RoleIDs must not be equal: '{parent_role}'!") 

128 

129 roles = db.scalars( 

130 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role])) 

131 ).all() 

132 if len(roles) == 1: 

133 missing_role = child_role if parent_role in roles else parent_role 

134 raise PyPermissionError(f"Role '{missing_role}' does not exist!") 

135 elif len(roles) == 0: 

136 raise PyPermissionError( 

137 f"Roles '{parent_role}' and '{child_role}' do not exist!" 

138 ) 

139 

140 root_cte = ( 

141 select(HierarchyORM) 

142 .where(HierarchyORM.parent_role_id == child_role) 

143 .cte(recursive=True) 

144 ) 

145 

146 traversing_cte = root_cte.alias() 

147 relations_cte = root_cte.union_all( 

148 select(HierarchyORM).where( 

149 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id 

150 ) 

151 ) 

152 

153 critical_leaf_relations = db.execute( 

154 select(relations_cte).where(relations_cte.c.child_role_id == parent_role) 

155 ).all() 

156 

157 if critical_leaf_relations: 

158 raise PyPermissionError("Desired hierarchy would create a cycle!") 

159 

160 try: 

161 hierarchy_orm = HierarchyORM( 

162 parent_role_id=parent_role, child_role_id=child_role 

163 ) 

164 db.add(hierarchy_orm) 

165 db.flush() 

166 except IntegrityError as err: 

167 db.rollback() 

168 raise PyPermissionError( 

169 f"Hierarchy '{parent_role}' -> '{child_role}' exists!" 

170 ) from err 

171 

172 @classmethod 

173 def remove_hierarchy( 

174 cls, *, parent_role: str, child_role: str, db: Session 

175 ) -> None: 

176 """ 

177 Remove a parent-child hierarchy between two Roles. 

178 

179 Parameters 

180 ---------- 

181 parent_role : str 

182 The parent RoleID. 

183 child_role : str 

184 The child RoleID. 

185 db : Session 

186 The SQLAlchemy session. 

187 

188 Raises 

189 ------ 

190 PyPermissionError 

191 If arguments `parent_role` and `child_role` are equal. 

192 If one or both Roles do not exist. 

193 If the hierarchy does not exist. 

194 """ 

195 if parent_role == "": 

196 raise PyPermissionError( 

197 "Role name cannot be empty, but `parent_role` is empty!" 

198 ) 

199 if child_role == "": 

200 raise PyPermissionError( 

201 "Role name cannot be empty, but `child_role` is empty!" 

202 ) 

203 

204 if parent_role == child_role: 

205 raise PyPermissionError(f"RoleIDs must not be equal: '{parent_role}'!") 

206 

207 hierarchy_orm = db.get(HierarchyORM, (parent_role, child_role)) 

208 if hierarchy_orm is None: 

209 roles = db.scalars( 

210 select(RoleORM.id).where(RoleORM.id.in_([parent_role, child_role])) 

211 ).all() 

212 if len(roles) == 1: 

213 missing_role = child_role if parent_role in roles else parent_role 

214 raise PyPermissionError(f"Role '{missing_role}' does not exist!") 

215 elif len(roles) == 0: 

216 raise PyPermissionError( 

217 f"Roles '{parent_role}' and '{child_role}' do not exist!" 

218 ) 

219 else: 

220 raise PyPermissionError( 

221 f"Hierarchy '{parent_role}' -> '{child_role}' does not exist!" 

222 ) 

223 

224 db.delete(hierarchy_orm) 

225 db.flush() 

226 

227 @classmethod 

228 def parents(cls, *, role: str, db: Session) -> tuple[str, ...]: 

229 """ 

230 Get all parent Roles. 

231 

232 Parameters 

233 ---------- 

234 role : str 

235 The target RoleID. 

236 db : Session 

237 The SQLAlchemy session. 

238 

239 Returns 

240 ------- 

241 tuple[str, ...] 

242 A tuple containing all parent RoleIDs. 

243 

244 Raises 

245 ------ 

246 PyPermissionError 

247 If the target Role does not exist. 

248 """ 

249 if role == "": 

250 raise PyPermissionError("Role name cannot be empty!") 

251 parents = db.scalars( 

252 select(HierarchyORM.parent_role_id).where( 

253 HierarchyORM.child_role_id == role 

254 ) 

255 ).all() 

256 if len(parents) == 0 and db.get(RoleORM, role) is None: 

257 raise PyPermissionError(f"Role '{role}' does not exist!") 

258 return tuple(parents) 

259 

260 @classmethod 

261 def children(cls, *, role: str, db: Session) -> tuple[str, ...]: 

262 """ 

263 Get all child Roles. 

264 

265 Parameters 

266 ---------- 

267 role : str 

268 The target RoleID. 

269 db : Session 

270 The SQLAlchemy session. 

271 

272 Returns 

273 ------- 

274 tuple[str, ...] 

275 A tuple containing all child RoleIDs. 

276 

277 Raises 

278 ------ 

279 PyPermissionError 

280 If the target Role does not exist. 

281 """ 

282 if role == "": 

283 raise PyPermissionError("Role name cannot be empty!") 

284 children = db.scalars( 

285 select(HierarchyORM.child_role_id).where( 

286 HierarchyORM.parent_role_id == role 

287 ) 

288 ).all() 

289 if len(children) == 0 and db.get(RoleORM, role) is None: 

290 raise PyPermissionError(f"Role '{role}' does not exist!") 

291 return tuple(children) 

292 

293 @classmethod 

294 def ancestors(cls, *, role: str, db: Session) -> tuple[str, ...]: 

295 """ 

296 Get all ancestor Roles. 

297 

298 Parameters 

299 ---------- 

300 role : str 

301 The target RoleID. 

302 db : Session 

303 The SQLAlchemy session. 

304 

305 Returns 

306 ------- 

307 tuple[str, ...] 

308 A tuple containing all ancestor RoleIDs. 

309 

310 Raises 

311 ------ 

312 PyPermissionError 

313 If the target Role does not exist. 

314 """ 

315 if role == "": 

316 raise PyPermissionError("Role name cannot be empty!") 

317 root_cte = ( 

318 select(HierarchyORM) 

319 .where(HierarchyORM.child_role_id == role) 

320 .cte(name="root_cte", recursive=True) 

321 ) 

322 

323 traversing_cte = root_cte.alias() 

324 relations_cte = root_cte.union_all( 

325 select(HierarchyORM).where( 

326 HierarchyORM.child_role_id == traversing_cte.c.parent_role_id 

327 ) 

328 ) 

329 

330 ancestor_relations = ( 

331 db.scalars(select(relations_cte.c.parent_role_id)).unique().all() 

332 ) 

333 

334 if len(ancestor_relations) == 0 and db.get(RoleORM, role) is None: 

335 raise PyPermissionError(f"Role '{role}' does not exist!") 

336 return tuple(ancestor_relations) 

337 

338 @classmethod 

339 def descendants(cls, *, role: str, db: Session) -> tuple[str, ...]: 

340 """ 

341 Get all descending Roles. 

342 

343 Parameters 

344 ---------- 

345 role : str 

346 The target RoleID. 

347 db : Session 

348 The SQLAlchemy session. 

349 

350 Returns 

351 ------- 

352 tuple[str, ...] 

353 A tuple containing all descending RoleIDs. 

354 

355 Raises 

356 ------ 

357 PyPermissionError 

358 If the target Role does not exist. 

359 """ 

360 if role == "": 

361 raise PyPermissionError("Role name cannot be empty!") 

362 root_cte = ( 

363 select(HierarchyORM) 

364 .where(HierarchyORM.parent_role_id == role) 

365 .cte(name="root_cte", recursive=True) 

366 ) 

367 

368 traversing_cte = root_cte.alias() 

369 relations_cte = root_cte.union_all( 

370 select(HierarchyORM).where( 

371 HierarchyORM.parent_role_id == traversing_cte.c.child_role_id 

372 ) 

373 ) 

374 

375 descendant_relations = ( 

376 db.scalars(select(relations_cte.c.child_role_id)).unique().all() 

377 ) 

378 

379 if len(descendant_relations) == 0 and db.get(RoleORM, role) is None: 

380 raise PyPermissionError(f"Role '{role}' does not exist!") 

381 return tuple(descendant_relations) 

382 

383 @classmethod 

384 def subjects( 

385 cls, *, role: str, include_descendant_subjects: bool = False, db: Session 

386 ) -> tuple[str, ...]: 

387 """ 

388 Get all Subjects assigned to a Role. 

389 

390 Parameters 

391 ---------- 

392 role : str 

393 The target RoleID. 

394 include_descendant_subjects: bool 

395 Include all Subjects for descendant Roles. 

396 db : Session 

397 The SQLAlchemy session. 

398 

399 Returns 

400 ------- 

401 tuple[str, ...] 

402 A tuple containing all assigned SubjectIDs. 

403 

404 Raises 

405 ------ 

406 PyPermissionError 

407 If the target Role does not exist. 

408 """ 

409 if role == "": 

410 raise PyPermissionError("Role name cannot be empty!") 

411 if include_descendant_subjects: 

412 root_cte = ( 

413 select(RoleORM.id.label("role_id")) 

414 .where(RoleORM.id == role) 

415 .cte(recursive=True) 

416 ) 

417 

418 traversing_cte = root_cte.alias() 

419 relations_cte = root_cte.union_all( 

420 select(HierarchyORM.child_role_id).where( 

421 HierarchyORM.parent_role_id == traversing_cte.c.role_id 

422 ) 

423 ) 

424 subjects = ( 

425 db.scalars( 

426 select(MemberORM.subject_id).join( 

427 relations_cte, MemberORM.role_id == relations_cte.c.role_id 

428 ) 

429 ) 

430 .unique() 

431 .all() 

432 ) 

433 else: 

434 subjects = db.scalars( 

435 select(MemberORM.subject_id).where(MemberORM.role_id == role) 

436 ).all() 

437 

438 if len(subjects) == 0 and db.get(RoleORM, role) is None: 

439 raise PyPermissionError(f"Role '{role}' does not exist!") 

440 return tuple(subjects) 

441 

442 @classmethod 

443 def grant_permission( 

444 cls, 

445 *, 

446 role: str, 

447 permission: Permission, 

448 db: Session, 

449 ) -> None: 

450 """ 

451 Grant a Permission to a Role. 

452 

453 Parameters 

454 ---------- 

455 role : str 

456 The target RoleID. 

457 db : Session 

458 The SQLAlchemy session. 

459 

460 Raises 

461 ------ 

462 PyPermissionError 

463 If the target Role does not exist. 

464 If the Permission was granted before. TODO 

465 """ 

466 if role == "": 

467 raise PyPermissionError("Role name cannot be empty!") 

468 try: 

469 policy_orm = PolicyORM( 

470 role_id=role, 

471 resource_type=permission.resource_type, 

472 resource_id=permission.resource_id, 

473 action=permission.action, 

474 ) 

475 db.add(policy_orm) 

476 db.flush() 

477 except IntegrityError as err: 

478 db.rollback() 

479 process_policy_integrity_error(err=err, role=role, permission=permission) 

480 

481 @classmethod 

482 def revoke_permission( 

483 cls, 

484 *, 

485 role: str, 

486 permission: Permission, 

487 db: Session, 

488 ) -> None: 

489 """ 

490 Revoke a Permission from a Role. 

491 

492 Parameters 

493 ---------- 

494 role : str 

495 The target Role ID. 

496 db : Session 

497 The SQLAlchemy session. 

498 

499 Raises 

500 ------ 

501 PyPermissionError 

502 If the target Role does not exist. 

503 If the Permission was not granted before. TODO 

504 """ 

505 if role == "": 

506 raise PyPermissionError("Role name cannot be empty!") 

507 policy_tuple = ( 

508 role, 

509 permission.resource_type, 

510 permission.resource_id, 

511 permission.action, 

512 ) 

513 policy_orm = db.get( 

514 PolicyORM, 

515 policy_tuple, 

516 ) 

517 if policy_orm is None: 

518 role_orm = db.get(RoleORM, role) 

519 if role_orm: 

520 raise PyPermissionError( 

521 f"Permission '{str(permission)}' does not exist!" 

522 ) 

523 raise PyPermissionError(f"Role '{role}' does not exist!") 

524 

525 db.delete(policy_orm) 

526 db.flush() 

527 

528 @classmethod 

529 def check_permission( 

530 cls, 

531 *, 

532 role: str, 

533 permission: Permission, 

534 db: Session, 

535 ) -> bool: 

536 """ 

537 Check if a Role has a Permission. 

538 

539 Parameters 

540 ---------- 

541 role : str 

542 The target RoleID. 

543 permission : Permission 

544 The Permission to check for. 

545 db : Session 

546 The SQLAlchemy session. 

547 

548 Returns 

549 ------- 

550 bool 

551 True if the Permission is granted. 

552 

553 Raises 

554 ------ 

555 PyPermissionError 

556 If the target Role does not exist. 

557 """ 

558 if role == "": 

559 raise PyPermissionError("Role name cannot be empty!") 

560 root_cte = ( 

561 select(RoleORM.id.label("role_id")) 

562 .where(RoleORM.id == role) 

563 .cte(recursive=True) 

564 ) 

565 

566 traversing_cte = root_cte.alias() 

567 relations_cte = root_cte.union_all( 

568 select(HierarchyORM.parent_role_id).where( 

569 HierarchyORM.child_role_id == traversing_cte.c.role_id 

570 ) 

571 ) 

572 policy_orms = db.scalars( 

573 select(PolicyORM) 

574 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id) 

575 .where( 

576 PolicyORM.resource_type == permission.resource_type, 

577 PolicyORM.resource_id.in_((permission.resource_id, "*")), 

578 PolicyORM.action == permission.action, 

579 ) 

580 ).all() 

581 

582 if len(policy_orms) == 0: 

583 role_orm = db.get(RoleORM, role) 

584 if role_orm is None: 

585 raise PyPermissionError(f"Role '{role}' does not exist!") 

586 return False 

587 return True 

588 

589 @classmethod 

590 def assert_permission( 

591 cls, 

592 *, 

593 role: str, 

594 permission: Permission, 

595 db: Session, 

596 ) -> None: 

597 """ 

598 Check if a Role has a Permission. 

599 

600 Parameters 

601 ---------- 

602 role : str 

603 The target RoleID. 

604 permission : Permission 

605 The Permission to check for. 

606 db : Session 

607 The SQLAlchemy session. 

608 

609 Raises 

610 ------ 

611 PyPermissionNotGrantedError 

612 If the Permission is not granted. 

613 PyPermissionError 

614 If the target Role does not exist. 

615 """ 

616 if role == "": 

617 raise PyPermissionError("Role name cannot be empty!") 

618 if not cls.check_permission(role=role, permission=permission, db=db): 

619 raise PermissionNotGrantedError( 

620 f"Permission '{permission}' is not granted for Role '{role}'!" 

621 ) 

622 

623 @classmethod 

624 def permissions( 

625 cls, 

626 *, 

627 role: str, 

628 inherited: bool = True, 

629 db: Session, 

630 ) -> tuple[Permission, ...]: 

631 """ 

632 Get all granted Permissions for a Role. 

633 

634 Parameters 

635 ---------- 

636 role : str 

637 The target RoleID. 

638 inherited : bool 

639 Includes all Permissions inherited by ancestor Roles. 

640 db : Session 

641 The SQLAlchemy session. 

642 

643 Returns 

644 ------- 

645 tuple[Permission, ...] 

646 A tuple containing all granted Permissions. 

647 

648 Raises 

649 ------ 

650 PyPermissionError 

651 If the target Role does not exist. 

652 """ 

653 if role == "": 

654 raise PyPermissionError("Role name cannot be empty!") 

655 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db) 

656 if len(policy_orms) == 0: 

657 role_orm = db.get(RoleORM, role) 

658 if role_orm is None: 

659 raise PyPermissionError(f"Role '{role}' does not exist!") 

660 

661 return tuple( 

662 Permission( 

663 resource_type=policy_orm.resource_type, 

664 resource_id=policy_orm.resource_id, 

665 action=policy_orm.action, 

666 ) 

667 for policy_orm in policy_orms 

668 ) 

669 

670 @classmethod 

671 def policies( 

672 cls, 

673 *, 

674 role: str, 

675 inherited: bool = True, 

676 db: Session, 

677 ) -> tuple[Policy, ...]: 

678 """ 

679 Get all granted Policies for a Role. 

680 

681 Parameters 

682 ---------- 

683 role : str 

684 The target RoleID. 

685 inherited : bool 

686 Includes all Policies inherited by ancestor Roles. 

687 db : Session 

688 The SQLAlchemy session. 

689 

690 Returns 

691 ------- 

692 tuple[Policies, ...] 

693 A tuple containing all granted Policies. 

694 

695 Raises 

696 ------ 

697 PyPermissionError 

698 If `role` is empty string. 

699 If the target Role does not exist. 

700 """ 

701 if role == "": 

702 raise PyPermissionError("Role name cannot be empty!") 

703 policy_orms = _get_policy_orms_for_role(role=role, inherited=inherited, db=db) 

704 

705 if len(policy_orms) == 0: 

706 role_orm = db.get(RoleORM, role) 

707 if role_orm is None: 

708 raise PyPermissionError(f"Role '{role}' does not exist!") 

709 

710 return tuple( 

711 Policy( 

712 role=policy_orm.role_id, 

713 permission=Permission( 

714 resource_type=policy_orm.resource_type, 

715 resource_id=policy_orm.resource_id, 

716 action=policy_orm.action, 

717 ), 

718 ) 

719 for policy_orm in policy_orms 

720 ) 

721 

722 @classmethod 

723 def actions_on_resource( 

724 cls, 

725 *, 

726 role: str, 

727 resource_type: str, 

728 resource_id: str, 

729 inherited: bool = True, 

730 db: Session, 

731 ) -> tuple[str, ...]: 

732 """ 

733 Get all **Actions** granted on a **Resource** for a **Role**. 

734 

735 Parameters 

736 ---------- 

737 role : str 

738 The target **RoleID**. 

739 resource_type : str 

740 The **ResourceType** to check. 

741 resource_id : str 

742 The **ResourceID** to check. 

743 inherited : bool 

744 Includes all **Actions** inherited by ancestor **Roles**. 

745 db : Session 

746 The SQLAlchemy session. 

747 

748 Returns 

749 ------- 

750 tuple[str, ...] 

751 A tuple containing all granted action IDs. 

752 

753 Raises 

754 ------ 

755 PyPermissionError 

756 If `role` is empty string. 

757 If `resource_type` is empty string. 

758 If the target **Role** does not exist. 

759 """ 

760 if role == "": 

761 raise PyPermissionError("Role name cannot be empty!") 

762 if resource_type == "": 

763 raise PyPermissionError("Resource type cannot be empty!") 

764 

765 if inherited: 

766 root_cte = ( 

767 select(RoleORM.id.label("role_id")) 

768 .where(RoleORM.id == role) 

769 .cte(recursive=True) 

770 ) 

771 traversing_cte = root_cte.alias() 

772 relations_cte = root_cte.union_all( 

773 select(HierarchyORM.parent_role_id).where( 

774 HierarchyORM.child_role_id == traversing_cte.c.role_id 

775 ) 

776 ) 

777 selection = ( 

778 select(PolicyORM.action) 

779 .join( 

780 relations_cte, 

781 PolicyORM.role_id == relations_cte.c.role_id, 

782 ) 

783 .where( 

784 PolicyORM.resource_type == resource_type, 

785 PolicyORM.resource_id.in_((resource_id, "*")), 

786 ) 

787 ) 

788 else: 

789 selection = select(PolicyORM.action).where( 

790 PolicyORM.role_id == role, 

791 PolicyORM.resource_type == resource_type, 

792 PolicyORM.resource_id.in_((resource_id, "*")), 

793 ) 

794 result = db.scalars(selection).all() 

795 

796 if len(result) == 0: 

797 role_orm = db.get(RoleORM, role) 

798 if role_orm is None: 

799 raise PyPermissionError(f"Role '{role}' does not exist!") 

800 return tuple(result) 

801 

802 

803################################################################################ 

804#### Util 

805################################################################################ 

806 

807 

808def _get_policy_orms_for_role( 

809 *, role: str, inherited: bool = True, db: Session 

810) -> Sequence[PolicyORM]: 

811 # TODO raise IntegrityError if role is unknown and if possible via ORM 

812 if inherited: 

813 root_cte = ( 

814 select(RoleORM.id.label("role_id")) 

815 .where(RoleORM.id == role) 

816 .cte(recursive=True) 

817 ) 

818 

819 traversing_cte = root_cte.alias() 

820 relations_cte = root_cte.union_all( 

821 select(HierarchyORM.parent_role_id).where( 

822 HierarchyORM.child_role_id == traversing_cte.c.role_id 

823 ) 

824 ) 

825 policy_orms = ( 

826 db.scalars( 

827 select(PolicyORM).join( 

828 relations_cte, PolicyORM.role_id == relations_cte.c.role_id 

829 ) 

830 ) 

831 .unique() 

832 .all() 

833 ) 

834 else: 

835 policy_orms = ( 

836 db.scalars(select(PolicyORM).where(PolicyORM.role_id == role)) 

837 .unique() 

838 .all() 

839 ) 

840 

841 return policy_orms