Coverage for src/pypermission/service/subject.py: 100%

132 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-14 14:14 +0000

1from collections.abc import Sequence 

2 

3from sqlalchemy.exc import IntegrityError 

4from sqlalchemy.orm import Session 

5from sqlalchemy.sql import select 

6 

7from pypermission.exc import PermissionNotGrantedError, PyPermissionError 

8from pypermission.models import ( 

9 FrozenClass, 

10 HierarchyORM, 

11 MemberORM, 

12 Permission, 

13 Policy, 

14 PolicyORM, 

15 RoleORM, 

16 SubjectORM, 

17) 

18from pypermission.util.exception_handling import process_subject_role_integrity_error 

19 

20################################################################################ 

21#### SubjectService 

22################################################################################ 

23 

24 

25class SubjectService(metaclass=FrozenClass): 

26 

27 @classmethod 

28 def create(cls, *, subject: str, db: Session) -> None: 

29 """ 

30 Create a new Subject. 

31 

32 Parameters 

33 ---------- 

34 subject : str 

35 The SubjectID of the Subject to create. 

36 db : Session 

37 The SQLAlchemy session. 

38 

39 Raises 

40 ------ 

41 PyPermissionError 

42 If a Subject with the given SubjectID already exists or `subject` is empty string. 

43 """ 

44 if subject == "": 

45 raise PyPermissionError("Subject name cannot be empty!") 

46 try: 

47 subject_orm = SubjectORM(id=subject) 

48 db.add(subject_orm) 

49 db.flush() 

50 except IntegrityError: 

51 db.rollback() 

52 raise PyPermissionError(f"Subject '{subject}' already exists!") 

53 

54 @classmethod 

55 def delete(cls, *, subject: str, db: Session) -> None: 

56 """ 

57 Delete an existing Subject. 

58 

59 Parameters 

60 ---------- 

61 subject : str 

62 The SubjectID to delete. 

63 db : Session 

64 The SQLAlchemy session. 

65 

66 Raises 

67 ------ 

68 PyPermissionError 

69 If a Subject with the given SubjectID does not exist or `subject` is empty string. 

70 """ 

71 if subject == "": 

72 raise PyPermissionError("Subject name cannot be empty!") 

73 subject_orm = db.get(SubjectORM, subject) 

74 if subject_orm is None: 

75 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

76 db.delete(subject_orm) 

77 db.flush() 

78 

79 @classmethod 

80 def list(cls, *, db: Session) -> tuple[str, ...]: 

81 """ 

82 Get all Subjects. 

83 

84 Parameters 

85 ---------- 

86 db : Session 

87 The SQLAlchemy session. 

88 

89 Returns 

90 ------- 

91 tuple[str, ...] 

92 A tuple containing all SubjectIDs. 

93 """ 

94 subjects = db.scalars(select(SubjectORM.id)).all() 

95 return tuple(subjects) 

96 

97 @classmethod 

98 def assign_role(cls, *, subject: str, role: str, db: Session) -> None: 

99 """ 

100 Assign a Subject to a Role. 

101 

102 Parameters 

103 ---------- 

104 subject : str 

105 The target SubjectID. 

106 role : str 

107 The target RoleID. 

108 db : Session 

109 The SQLAlchemy session. 

110 

111 Raises 

112 ------ 

113 PyPermissionError 

114 If `subject` is empty string. 

115 If `role` is empty string. 

116 If the Subject does not exist. 

117 If the Role does not exist. 

118 If the Subject was assigned to Role before. TODO 

119 """ 

120 if role == "": 

121 raise PyPermissionError("Role name cannot be empty!") 

122 if subject == "": 

123 raise PyPermissionError("Subject name cannot be empty!") 

124 try: 

125 member_orm = MemberORM(role_id=role, subject_id=subject) 

126 db.add(member_orm) 

127 db.flush() 

128 except IntegrityError as err: 

129 db.rollback() 

130 process_subject_role_integrity_error(err=err, subject=subject, role=role) 

131 

132 @classmethod 

133 def deassign_role(cls, *, subject: str, role: str, db: Session) -> None: 

134 """ 

135 Deassign a Subject from a Role. 

136 

137 Parameters 

138 ---------- 

139 subject : str 

140 The target SubjectID. 

141 role : str 

142 The target RoleID. 

143 db : Session 

144 The SQLAlchemy session. 

145 

146 Raises 

147 ------ 

148 PyPermissionError 

149 If `subject` is empty string. 

150 If `role` is empty string. 

151 If the Subject does not exist. 

152 If the Role does not exist. 

153 If the Subject is not assigned to the Role. TODO 

154 """ 

155 if role == "": 

156 raise PyPermissionError("Role name cannot be empty!") 

157 if subject == "": 

158 raise PyPermissionError("Subject name cannot be empty!") 

159 # TODO raise IntegrityError if subject or role is unknown and if possible via ORM 

160 member_orm = db.get(MemberORM, (role, subject)) 

161 if member_orm is None: 

162 subject_orm = db.get(SubjectORM, subject) 

163 if subject_orm is None: 

164 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

165 role_orm = db.get(RoleORM, role) 

166 if role_orm is None: 

167 raise PyPermissionError(f"Role '{role}' does not exist!") 

168 raise PyPermissionError( 

169 f"Role '{role}' is not assigned to Subject '{subject}'!" 

170 ) 

171 db.delete(member_orm) 

172 db.flush() 

173 

174 @classmethod 

175 def roles( 

176 cls, *, subject: str, include_ascendant_roles: bool = False, db: Session 

177 ) -> tuple[str, ...]: 

178 """ 

179 Get all Roles assigned to a Subject. 

180 

181 Parameters 

182 ---------- 

183 subject : str 

184 The target SubjectID. 

185 include_ascendant_roles: bool 

186 Include all ascendant Roles. 

187 db : Session 

188 The SQLAlchemy session. 

189 

190 Returns 

191 ------- 

192 tuple[str, ...] 

193 A tuple containing all assigned RoleIDs. 

194 

195 Raises 

196 ------ 

197 PyPermissionError 

198 If `subject` is empty string. 

199 If the target Subject does not exist. 

200 """ 

201 if subject == "": 

202 raise PyPermissionError("Subject name cannot be empty!") 

203 if include_ascendant_roles: 

204 root_cte = ( 

205 select(MemberORM.role_id) 

206 .where(MemberORM.subject_id == subject) 

207 .cte(recursive=True) 

208 ) 

209 traversing_cte = root_cte.alias() 

210 relations_cte = root_cte.union_all( 

211 select(HierarchyORM.parent_role_id).join( 

212 traversing_cte, 

213 HierarchyORM.child_role_id == traversing_cte.c.role_id, 

214 ) 

215 ) 

216 selection = select(relations_cte) 

217 roles = db.scalars(selection).unique().all() 

218 else: 

219 roles = db.scalars( 

220 select(MemberORM.role_id).where(MemberORM.subject_id == subject) 

221 ).all() 

222 

223 if len(roles) == 0 and db.get(SubjectORM, subject) is None: 

224 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

225 return tuple(roles) 

226 

227 @classmethod 

228 def check_permission( 

229 cls, 

230 *, 

231 subject: str, 

232 permission: Permission, 

233 db: Session, 

234 ) -> bool: 

235 """ 

236 Check if a Subject has access to a specific Permission via its Role hierarchy. 

237 

238 Parameters 

239 ---------- 

240 subject : str 

241 The target SubjectID. 

242 permission : Permission 

243 The Permission to check for. 

244 db : Session 

245 The SQLAlchemy session. 

246 

247 Returns 

248 ------- 

249 bool 

250 True if the Permission is granted. 

251 

252 Raises 

253 ------ 

254 PyPermissionError 

255 If `subject` is empty string. 

256 If the target Subject does not exist. TODO 

257 """ 

258 # TODO raise IntegrityError if subject is unknown and if possible via ORM 

259 if subject == "": 

260 raise PyPermissionError("Subject name cannot be empty!") 

261 root_cte = ( 

262 select(MemberORM.role_id) 

263 .where(MemberORM.subject_id == subject) 

264 .cte(recursive=True) 

265 ) 

266 

267 traversing_cte = root_cte.alias() 

268 relations_cte = root_cte.union_all( 

269 select(HierarchyORM.parent_role_id).join( 

270 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id 

271 ) 

272 ) 

273 

274 policy_orms = db.scalars( 

275 select(PolicyORM) 

276 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id) 

277 .where( 

278 PolicyORM.resource_type == permission.resource_type, 

279 PolicyORM.resource_id.in_((permission.resource_id, "*")), 

280 PolicyORM.action == permission.action, 

281 ) 

282 ).all() 

283 if len(policy_orms) > 0: 

284 return True 

285 subject_orm = db.get(SubjectORM, subject) 

286 if subject_orm is None: 

287 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

288 return False 

289 

290 @classmethod 

291 def assert_permission( 

292 cls, 

293 *, 

294 subject: str, 

295 permission: Permission, 

296 db: Session, 

297 ) -> None: 

298 """ 

299 Asserts that a Subject has access to a specific Permission via its Role hierarchy. 

300 

301 Parameters 

302 ---------- 

303 subject : str 

304 The target SubjectID. 

305 permission : Permission 

306 The Permission to check for. 

307 db : Session 

308 The SQLAlchemy session. 

309 

310 Raises 

311 ------ 

312 PyPermissionNotGrantedError 

313 If the Permission is not granted. 

314 PyPermissionError 

315 If `subject` is empty string. 

316 If the target Subject does not exist. 

317 """ 

318 if not cls.check_permission(subject=subject, permission=permission, db=db): 

319 raise PermissionNotGrantedError( 

320 f"Permission '{permission}' is not granted for Subject '{subject}'!" 

321 ) 

322 

323 @classmethod 

324 def permissions(cls, *, subject: str, db: Session) -> tuple[Permission, ...]: 

325 """ 

326 Get all Permissions a Subject has access to via its Role hierarchy. 

327 

328 Parameters 

329 ---------- 

330 subject : str 

331 The target SubjectID. 

332 db : Session 

333 The SQLAlchemy session. 

334 

335 Returns 

336 ------- 

337 tuple[Permission, ...] 

338 A tuple containing all granted Permissions. 

339 

340 Raises 

341 ------ 

342 PyPermissionError 

343 If `subject` is empty string. 

344 If the target Subject does not exist. 

345 """ 

346 if subject == "": 

347 raise PyPermissionError("Subject name cannot be empty!") 

348 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db) 

349 

350 return tuple( 

351 Permission( 

352 resource_type=policy_orm.resource_type, 

353 resource_id=policy_orm.resource_id, 

354 action=policy_orm.action, 

355 ) 

356 for policy_orm in policy_orms 

357 ) 

358 

359 @classmethod 

360 def policies(cls, *, subject: str, db: Session) -> tuple[Policy, ...]: 

361 """ 

362 Get all Policies associated to a Subject via its Role hierarchy. 

363 

364 Parameters 

365 ---------- 

366 subject : str 

367 The target SubjectID. 

368 db : Session 

369 The SQLAlchemy session. 

370 

371 Returns 

372 ------- 

373 tuple[Policies, ...] 

374 A tuple containing all granted Policies. 

375 

376 Raises 

377 ------ 

378 PyPermissionError 

379 If `subject` is empty string. 

380 If the target Subject does not exist. 

381 """ 

382 if subject == "": 

383 raise PyPermissionError("Subject name cannot be empty!") 

384 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db) 

385 

386 return tuple( 

387 Policy( 

388 role=policy_orm.role_id, 

389 permission=Permission( 

390 resource_type=policy_orm.resource_type, 

391 resource_id=policy_orm.resource_id, 

392 action=policy_orm.action, 

393 ), 

394 ) 

395 for policy_orm in policy_orms 

396 ) 

397 

398 @classmethod 

399 def actions_on_resource( 

400 cls, 

401 *, 

402 subject: str, 

403 resource_type: str, 

404 resource_id: str, 

405 inherited: bool = True, 

406 db: Session, 

407 ) -> tuple[str, ...]: 

408 """ 

409 Get all Actions granted to a **Subject** on a specific **Resource**. 

410 

411 Parameters 

412 ---------- 

413 subject : str 

414 The target **SubjectID**. 

415 resource_type : str 

416 The **ResourceType** of the **Resource**. 

417 resource_id : str 

418 The **ResourceID** of the **Resource**. 

419 inherited : bool 

420 Whether to include inherited **Actions** from **Role** hierarchies. 

421 db : Session 

422 The SQLAlchemy session. 

423 

424 Returns 

425 ------- 

426 tuple[str, ...] 

427 A tuple containing all granted **Action** values. 

428 

429 Raises 

430 ------ 

431 PyPermissionError 

432 If `subject` is empty string. 

433 If `resource_type` is empty string. 

434 If the target **Subject** does not exist. 

435 """ 

436 if subject == "": 

437 raise PyPermissionError("Subject name cannot be empty!") 

438 if resource_type == "": 

439 raise PyPermissionError("Resource type cannot be empty!") 

440 if inherited: 

441 root_cte = ( 

442 select(MemberORM.role_id) 

443 .where(MemberORM.subject_id == subject) 

444 .cte(recursive=True) 

445 ) 

446 traversing_cte = root_cte.alias() 

447 relations_cte = root_cte.union_all( 

448 select(HierarchyORM.parent_role_id).join( 

449 traversing_cte, 

450 HierarchyORM.child_role_id == traversing_cte.c.role_id, 

451 ) 

452 ) 

453 actions = ( 

454 db.scalars( 

455 select(PolicyORM.action, PolicyORM.role_id) 

456 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id) 

457 .where( 

458 PolicyORM.resource_type == resource_type, 

459 PolicyORM.resource_id.in_((resource_id, "*")), 

460 ) 

461 ) 

462 .unique() 

463 .all() 

464 ) 

465 tuple(actions) 

466 else: 

467 selection = ( 

468 select(PolicyORM.action, PolicyORM.role_id) 

469 .join(MemberORM, MemberORM.role_id == PolicyORM.role_id) 

470 .where( 

471 MemberORM.subject_id == subject, 

472 PolicyORM.resource_type == resource_type, 

473 PolicyORM.resource_id.in_((resource_id, "*")), 

474 ) 

475 ) 

476 actions = db.scalars(selection).unique().all() 

477 if len(actions) == 0 and db.get(SubjectORM, subject) is None: 

478 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

479 return tuple(actions) 

480 

481 

482################################################################################ 

483#### Util 

484################################################################################ 

485 

486 

487def _get_policy_orms_for_subject(*, subject: str, db: Session) -> Sequence[PolicyORM]: 

488 """ 

489 Get all PolicyORM objects associated to a Subject via its Role hierarchy. 

490 

491 Parameters 

492 ---------- 

493 subject : str 

494 The target SubjectID. 

495 db : Session 

496 The SQLAlchemy session. 

497 

498 Returns 

499 ------- 

500 Sequence[PolicyORM] 

501 A Sequence containing all associated PolicyORM objects. 

502 

503 Raises 

504 ------ 

505 PyPermissionError 

506 If the target Subject does not exist. 

507 """ 

508 subject_orm = db.get(SubjectORM, subject) 

509 if not subject_orm: 

510 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

511 root_cte = ( 

512 select(MemberORM.role_id) 

513 .where(MemberORM.subject_id == subject) 

514 .cte(recursive=True) 

515 ) 

516 

517 traversing_cte = root_cte.alias() 

518 relations_cte = root_cte.union_all( 

519 select(HierarchyORM.parent_role_id).join( 

520 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id 

521 ) 

522 ) 

523 

524 policy_orms = ( 

525 db.scalars( 

526 select(PolicyORM).join( 

527 relations_cte, PolicyORM.role_id == relations_cte.c.role_id 

528 ) 

529 ) 

530 .unique() 

531 .all() 

532 ) 

533 

534 return policy_orms