Coverage for src/pypermission/service/subject.py: 100%

119 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-12-01 18:06 +0000

1from collections.abc import Sequence 

2 

3from sqlalchemy.exc import IntegrityError 

4from sqlalchemy.orm import Session 

5from sqlalchemy.sql import select 

6 

7from pypermission.exc import PermissionNotGrantedError, PyPermissionError 

8from pypermission.models import ( 

9 FrozenClass, 

10 HierarchyORM, 

11 MemberORM, 

12 Permission, 

13 Policy, 

14 PolicyORM, 

15 RoleORM, 

16 SubjectORM, 

17) 

18from pypermission.util.exception_handling import process_subject_role_integrity_error 

19from pypermission.util.input_validation import validate_rbac_parameters 

20 

21################################################################################ 

22#### SubjectService 

23################################################################################ 

24 

25 

26class SubjectService(metaclass=FrozenClass): 

27 

28 @classmethod 

29 @validate_rbac_parameters 

30 def create(cls, *, subject: str, db: Session) -> None: 

31 """ 

32 Create a new Subject. 

33 

34 Parameters 

35 ---------- 

36 subject : str 

37 The SubjectID of the **Subject** to create. 

38 db : Session 

39 The SQLAlchemy session. 

40 

41 Raises 

42 ------ 

43 PyPermissionError 

44 If a **Subject** with the given SubjectID already exists or `subject` is empty string. 

45 """ 

46 try: 

47 subject_orm = SubjectORM(id=subject) 

48 db.add(subject_orm) 

49 db.flush() 

50 except IntegrityError: 

51 db.rollback() 

52 raise PyPermissionError( 

53 f"Conflict: Subject with ID '{subject}' already exists!" 

54 ) 

55 

56 @classmethod 

57 @validate_rbac_parameters 

58 def delete(cls, *, subject: str, db: Session) -> None: 

59 """ 

60 Delete an existing Subject. 

61 

62 Parameters 

63 ---------- 

64 subject : str 

65 The SubjectID to delete. 

66 db : Session 

67 The SQLAlchemy session. 

68 

69 Raises 

70 ------ 

71 PyPermissionError 

72 If a **Subject** with the given SubjectID does not exist or `subject` is empty string. 

73 """ 

74 subject_orm = db.get(SubjectORM, subject) 

75 if subject_orm is None: 

76 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

77 db.delete(subject_orm) 

78 db.flush() 

79 

80 @classmethod 

81 @validate_rbac_parameters 

82 def list(cls, *, db: Session) -> tuple[str, ...]: 

83 """ 

84 Get all Subjects. 

85 

86 Parameters 

87 ---------- 

88 db : Session 

89 The SQLAlchemy session. 

90 

91 Returns 

92 ------- 

93 tuple[str, ...] 

94 A tuple containing all SubjectIDs. 

95 """ 

96 subjects = db.scalars(select(SubjectORM.id)).all() 

97 return tuple(subjects) 

98 

99 @classmethod 

100 @validate_rbac_parameters 

101 def assign_role(cls, *, subject: str, role: str, db: Session) -> None: 

102 """ 

103 Assign a **Subject** to a Role. 

104 

105 Parameters 

106 ---------- 

107 subject : str 

108 The target SubjectID. 

109 role : str 

110 The target **RoleID**. 

111 db : Session 

112 The SQLAlchemy session. 

113 

114 Raises 

115 ------ 

116 PyPermissionError 

117 If `subject` is empty string. 

118 If `role` is empty string. 

119 If the **Subject** does not exist. 

120 If the **Role** does not exist. 

121 If the **Role** is already assigned to the **Subject**. 

122 """ 

123 try: 

124 member_orm = MemberORM(role_id=role, subject_id=subject) 

125 db.add(member_orm) 

126 db.flush() 

127 except IntegrityError as err: 

128 db.rollback() 

129 process_subject_role_integrity_error(err=err, subject=subject, role=role) 

130 

131 @classmethod 

132 @validate_rbac_parameters 

133 def deassign_role(cls, *, subject: str, role: str, db: Session) -> None: 

134 """ 

135 Deassign a **Subject** from a Role. 

136 

137 Parameters 

138 ---------- 

139 subject : str 

140 The target SubjectID. 

141 role : str 

142 The target **RoleID**. 

143 db : Session 

144 The SQLAlchemy session. 

145 

146 Raises 

147 ------ 

148 PyPermissionError 

149 If `subject` is empty string. 

150 If `role` is empty string. 

151 If the **Subject** does not exist. 

152 If the **Role** does not exist. 

153 If the **Subject** is not assigned to the **Role**. 

154 """ 

155 member_orm = db.get(MemberORM, (role, subject)) 

156 if member_orm is None: 

157 subject_orm = db.get(SubjectORM, subject) 

158 if subject_orm is None: 

159 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

160 role_orm = db.get(RoleORM, role) 

161 if role_orm is None: 

162 raise PyPermissionError(f"Role '{role}' does not exist!") 

163 raise PyPermissionError( 

164 f"Role '{role}' is not assigned to Subject '{subject}'!" 

165 ) 

166 db.delete(member_orm) 

167 db.flush() 

168 

169 @classmethod 

170 @validate_rbac_parameters 

171 def roles( 

172 cls, *, subject: str, include_ascendant_roles: bool = False, db: Session 

173 ) -> tuple[str, ...]: 

174 """ 

175 Get all Roles assigned to a Subject. 

176 

177 Parameters 

178 ---------- 

179 subject : str 

180 The target SubjectID. 

181 include_ascendant_roles: bool 

182 Include all ascendant Roles. 

183 db : Session 

184 The SQLAlchemy session. 

185 

186 Returns 

187 ------- 

188 tuple[str, ...] 

189 A tuple containing all assigned RoleIDs. 

190 

191 Raises 

192 ------ 

193 PyPermissionError 

194 If `subject` is empty string. 

195 If the target **Subject** does not exist. 

196 """ 

197 if include_ascendant_roles: 

198 root_cte = ( 

199 select(MemberORM.role_id) 

200 .where(MemberORM.subject_id == subject) 

201 .cte(recursive=True) 

202 ) 

203 traversing_cte = root_cte.alias() 

204 relations_cte = root_cte.union_all( 

205 select(HierarchyORM.parent_role_id).join( 

206 traversing_cte, 

207 HierarchyORM.child_role_id == traversing_cte.c.role_id, 

208 ) 

209 ) 

210 selection = select(relations_cte) 

211 roles = db.scalars(selection).unique().all() 

212 else: 

213 roles = db.scalars( 

214 select(MemberORM.role_id).where(MemberORM.subject_id == subject) 

215 ).all() 

216 

217 if len(roles) == 0 and db.get(SubjectORM, subject) is None: 

218 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

219 return tuple(roles) 

220 

221 @classmethod 

222 @validate_rbac_parameters 

223 def check_permission( 

224 cls, 

225 *, 

226 subject: str, 

227 permission: Permission, 

228 db: Session, 

229 ) -> bool: 

230 """ 

231 Check if a **Subject** has access to a specific **Permission** via its **Role** hierarchy. 

232 

233 Parameters 

234 ---------- 

235 subject : str 

236 The target SubjectID. 

237 permission : Permission 

238 The **Permission** to check for. 

239 db : Session 

240 The SQLAlchemy session. 

241 

242 Returns 

243 ------- 

244 bool 

245 True if the **Permission** is granted. 

246 

247 Raises 

248 ------ 

249 PyPermissionError 

250 If `subject` is empty string. 

251 If the target **Subject** does not exist. 

252 """ 

253 root_cte = ( 

254 select(MemberORM.role_id) 

255 .where(MemberORM.subject_id == subject) 

256 .cte(recursive=True) 

257 ) 

258 

259 traversing_cte = root_cte.alias() 

260 relations_cte = root_cte.union_all( 

261 select(HierarchyORM.parent_role_id).join( 

262 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id 

263 ) 

264 ) 

265 

266 policy_orms = db.scalars( 

267 select(PolicyORM) 

268 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id) 

269 .where( 

270 PolicyORM.resource_type == permission.resource_type, 

271 PolicyORM.resource_id.in_((permission.resource_id, "*")), 

272 PolicyORM.action == permission.action, 

273 ) 

274 ).all() 

275 if len(policy_orms) > 0: 

276 return True 

277 subject_orm = db.get(SubjectORM, subject) 

278 if subject_orm is None: 

279 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

280 return False 

281 

282 @classmethod # validated by check_permission 

283 def assert_permission( 

284 cls, 

285 *, 

286 subject: str, 

287 permission: Permission, 

288 db: Session, 

289 ) -> None: 

290 """ 

291 Asserts that a **Subject** has access to a specific **Permission** via its **Role** hierarchy. 

292 

293 Parameters 

294 ---------- 

295 subject : str 

296 The target SubjectID. 

297 permission : Permission 

298 The **Permission** to check for. 

299 db : Session 

300 The SQLAlchemy session. 

301 

302 Raises 

303 ------ 

304 PermissionNotGrantedError 

305 If the **Permission** is not granted. 

306 PyPermissionError 

307 If `subject` is empty string. 

308 If the target **Subject** does not exist. 

309 """ 

310 if not cls.check_permission(subject=subject, permission=permission, db=db): 

311 raise PermissionNotGrantedError( 

312 f"Permission '{permission}' is not granted for Subject '{subject}'!" 

313 ) 

314 

315 @classmethod 

316 @validate_rbac_parameters 

317 def permissions(cls, *, subject: str, db: Session) -> tuple[Permission, ...]: 

318 """ 

319 Get all **Permissions** a **Subject** has access to via its **Role** hierarchy. 

320 

321 Parameters 

322 ---------- 

323 subject : str 

324 The target SubjectID. 

325 db : Session 

326 The SQLAlchemy session. 

327 

328 Returns 

329 ------- 

330 tuple[Permission, ...] 

331 A tuple containing all granted Permissions. 

332 

333 Raises 

334 ------ 

335 PyPermissionError 

336 If `subject` is empty string. 

337 If the target **Subject** does not exist. 

338 """ 

339 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db) 

340 

341 return tuple( 

342 Permission( 

343 resource_type=policy_orm.resource_type, 

344 resource_id=policy_orm.resource_id, 

345 action=policy_orm.action, 

346 ) 

347 for policy_orm in policy_orms 

348 ) 

349 

350 @classmethod 

351 @validate_rbac_parameters 

352 def policies(cls, *, subject: str, db: Session) -> tuple[Policy, ...]: 

353 """ 

354 Get all **Policies** associated to a **Subject** via its **Role** hierarchy. 

355 

356 Parameters 

357 ---------- 

358 subject : str 

359 The target SubjectID. 

360 db : Session 

361 The SQLAlchemy session. 

362 

363 Returns 

364 ------- 

365 tuple[Policy, ...] 

366 A tuple containing all granted **Policies**. 

367 

368 Raises 

369 ------ 

370 PyPermissionError 

371 If `subject` is empty string. 

372 If the target **Subject** does not exist. 

373 """ 

374 policy_orms = _get_policy_orms_for_subject(subject=subject, db=db) 

375 

376 return tuple( 

377 Policy( 

378 role=policy_orm.role_id, 

379 permission=Permission( 

380 resource_type=policy_orm.resource_type, 

381 resource_id=policy_orm.resource_id, 

382 action=policy_orm.action, 

383 ), 

384 ) 

385 for policy_orm in policy_orms 

386 ) 

387 

388 @classmethod 

389 @validate_rbac_parameters 

390 def actions_on_resource( 

391 cls, 

392 *, 

393 subject: str, 

394 resource_type: str, 

395 resource_id: str, 

396 inherited: bool = True, 

397 db: Session, 

398 ) -> tuple[str, ...]: 

399 """ 

400 Get all Actions granted to a **Subject** on a specific **Resource**. 

401 

402 Parameters 

403 ---------- 

404 subject : str 

405 The target **SubjectID**. 

406 resource_type : str 

407 The **ResourceType** of the **Resource**. 

408 resource_id : str 

409 The **ResourceID** of the **Resource**. 

410 inherited : bool 

411 Whether to include inherited **Actions** from **Role** hierarchies. 

412 db : Session 

413 The SQLAlchemy session. 

414 

415 Returns 

416 ------- 

417 tuple[str, ...] 

418 A tuple containing all granted **Action** values. 

419 

420 Raises 

421 ------ 

422 PyPermissionError 

423 If `subject` is empty string. 

424 If `resource_type` is empty string. 

425 If the target **Subject** does not exist. 

426 """ 

427 if inherited: 

428 root_cte = ( 

429 select(MemberORM.role_id) 

430 .where(MemberORM.subject_id == subject) 

431 .cte(recursive=True) 

432 ) 

433 traversing_cte = root_cte.alias() 

434 relations_cte = root_cte.union_all( 

435 select(HierarchyORM.parent_role_id).join( 

436 traversing_cte, 

437 HierarchyORM.child_role_id == traversing_cte.c.role_id, 

438 ) 

439 ) 

440 actions = ( 

441 db.scalars( 

442 select(PolicyORM.action, PolicyORM.role_id) 

443 .join(relations_cte, PolicyORM.role_id == relations_cte.c.role_id) 

444 .where( 

445 PolicyORM.resource_type == resource_type, 

446 PolicyORM.resource_id.in_((resource_id, "*")), 

447 ) 

448 ) 

449 .unique() 

450 .all() 

451 ) 

452 tuple(actions) 

453 else: 

454 selection = ( 

455 select(PolicyORM.action, PolicyORM.role_id) 

456 .join(MemberORM, MemberORM.role_id == PolicyORM.role_id) 

457 .where( 

458 MemberORM.subject_id == subject, 

459 PolicyORM.resource_type == resource_type, 

460 PolicyORM.resource_id.in_((resource_id, "*")), 

461 ) 

462 ) 

463 actions = db.scalars(selection).unique().all() 

464 if len(actions) == 0 and db.get(SubjectORM, subject) is None: 

465 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

466 return tuple(actions) 

467 

468 

469################################################################################ 

470#### Util 

471################################################################################ 

472 

473 

474def _get_policy_orms_for_subject(*, subject: str, db: Session) -> Sequence[PolicyORM]: 

475 """ 

476 Get all PolicyORM objects associated to a **Subject** via its **Role** hierarchy. 

477 

478 Parameters 

479 ---------- 

480 subject : str 

481 The target SubjectID. 

482 db : Session 

483 The SQLAlchemy session. 

484 

485 Returns 

486 ------- 

487 Sequence[PolicyORM] 

488 A Sequence containing all associated PolicyORM objects. 

489 

490 Raises 

491 ------ 

492 PyPermissionError 

493 If the target **Subject** does not exist. 

494 """ 

495 subject_orm = db.get(SubjectORM, subject) 

496 if not subject_orm: 

497 raise PyPermissionError(f"Subject '{subject}' does not exist!") 

498 root_cte = ( 

499 select(MemberORM.role_id) 

500 .where(MemberORM.subject_id == subject) 

501 .cte(recursive=True) 

502 ) 

503 

504 traversing_cte = root_cte.alias() 

505 relations_cte = root_cte.union_all( 

506 select(HierarchyORM.parent_role_id).join( 

507 traversing_cte, HierarchyORM.child_role_id == traversing_cte.c.role_id 

508 ) 

509 ) 

510 

511 policy_orms = ( 

512 db.scalars( 

513 select(PolicyORM).join( 

514 relations_cte, PolicyORM.role_id == relations_cte.c.role_id 

515 ) 

516 ) 

517 .unique() 

518 .all() 

519 ) 

520 

521 return policy_orms