Coverage for actress / task.py: 91%

515 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-06 15:02 +0100

1"""Core task/effect scheduler primitives for actress.""" 

2# pylint: disable=missing-class-docstring,missing-function-docstring,too-many-instance-attributes,global-statement 

3 

4from __future__ import annotations 

5import asyncio 

6import weakref 

7from dataclasses import dataclass 

8from typing import Any, Generic, Literal, NoReturn, Optional, TypeVar, TypedDict, Union, cast, overload 

9from collections.abc import Awaitable, Generator, Callable, Iterable 

10from enum import Enum 

11from typing_extensions import Self, TypeAlias 

12 

13T = TypeVar("T") # value of task returned (on success) 

14X = TypeVar("X", bound= Exception) # exception raised by task (failure) 

15M = TypeVar("M") # message yielded by task 

16U = TypeVar("U") 

17 

18 

19class CurrentInstruction: 

20 def __repr__(self) -> str: 

21 return '<CURRENT>' 

22 

23 

24class SuspendInstruction: 

25 def __repr__(self) -> str: 

26 return '<SUSPEND>' 

27 

28 

29# Special control instructions recognized by the scheduler. 

30CURRENT = CurrentInstruction() 

31SUSPEND = SuspendInstruction() 

32 

33Control: TypeAlias = Union[CurrentInstruction, SuspendInstruction] 

34Instruction: TypeAlias = Union[Control, M] 

35Controller: TypeAlias = Generator[Union[Control, M], Any, T] 

36Task: TypeAlias = Generator[Union[Control, M], Any, T] # In python the generator object is also the iterator 

37""" 

38Task is a unit of computation that runs concurrently, a light-weight 

39process (in Erlang terms). You can spawn bunch of them and provided 

40cooperative scheduler will interleave their execution. 

41 

42Tasks have three type variables first two describing result of the 

43computation `Success` that corresponds to return type and `Failure` 

44describing an error type (caused by thrown exceptions). Third type 

45varibale `Message` describes type of messages this task may produce. 

46 

47Please note that that Python does not really check exceptions so `Failure` 

48type can not be guaranteed. Yet, we find them more practical than omitting 

49them as Python does for `Future` types and its derivatives. 

50 

51Our tasks are generators (not the generator functions, but what you get 

52invoking them) that are executed by (library provided) provided scheduler. 

53Scheduler recognizes two special `Control` instructions yield by generator. 

54When scheduler gets `current` instruction it will resume generator with 

55a handle that can be used to resume running generator after it is suspended. 

56When `suspend` instruction is received scheduler will suspend execution until 

57it is resumed by queueing it from the outside event. 

58""" 

59 

60# `M` == `Event` 

61Effect: TypeAlias = Generator[M, Any, None] 

62""" 

63Effect represents potentially asynchronous operations that results in a set of events. 

64It is often comprised of multiple `Task` and represents either chain of events or a 

65concurrent set of events (stretched over time). 

66Effect compares to a `Stream` in Javascript in the same way as `Task` compares to 

67`Future` in Python. It is not a representation of an eventual result but rather a 

68representation of an operation which if executed will produce certain result. `Effect` 

69can also be compared to an `EventEmitter` in Javascript, but very often their `Event` 

70type variable (`M` type variable in Python imlementation) is a union of various event 

71types, unlike `EventEmitter`s however `Effect`s have inherent finality to them and in 

72that regard are more like Javascript `Streams` 

73""" 

74 

75@dataclass 

76class Success(Generic[T]): 

77 "Result of a successful task." 

78 value: T 

79 ok: bool = True 

80 

81 

82@dataclass 

83class Failure(Generic[X]): 

84 "Result of a failed task." 

85 error: X 

86 ok: bool = False 

87 

88 

89Result: TypeAlias = Union[Success[T], Failure[X]] 

90 

91 

92@dataclass 

93class StateHandler(Generic[T, X]): 

94 onsuccess: Optional[Callable[[T], None]] = None 

95 onfailure: Optional[Callable[[X], None]] = None 

96 

97ID = 0 

98"""Unique IDs for entities (`Fork`, `Group`, ...)""" 

99 

100 

101class ForkOptions(TypedDict): 

102 name: Optional[str] 

103 

104 

105class Status(str, Enum): 

106 """Task execution status.""" 

107 IDLE = "idle" 

108 ACTIVE = "active" 

109 FINISHED = "finished" 

110 

111 

112class Stack(Generic[T, X, M]): 

113 """Stack of active and idle tasks""" 

114 def __init__( 

115 self, 

116 active: Optional[list[ControllerFork[T, X, M]]] = None, 

117 idle: Optional[set[ControllerFork[T, X, M]]] = None 

118 ) -> None: 

119 # gymnastics to align with reference JS implementation without sacrificing 

120 # safety. Using mutable types as default args in Python can lead to weird errors 

121 # that arise from a shared state created at definition time as opposed to each 

122 # time the function is called. 

123 active_eval = active if active is not None else [] 

124 idle_eval = idle if idle is not None else set() 

125 self.active: list[ControllerFork[T, X, M]] = active_eval 

126 self.idle: set[ControllerFork[T, X, M]] = idle_eval 

127 

128 @staticmethod 

129 def size(stack: Stack[T, X, M]) -> int: 

130 """Get total number of tasks in stack.""" 

131 return len(stack.active) + len(stack.idle) 

132 

133 

134def is_async(node: Any) -> bool: 

135 """ 

136 Checks if a value is awaitable (or its lookalike). 

137 """ 

138 if ( 

139 asyncio.isfuture(node) or 

140 asyncio.iscoroutine(node) or 

141 isinstance(node, Awaitable) 

142 ): 

143 return True 

144 return False 

145 

146def sleep(duration: float = 0) -> Task[Control, None]: 

147 """ 

148 Suspends execution for the given duration in milliseconds, after which execution 

149 is resumed (unless it was aborted in the meantime). 

150 

151 Args: 

152 duration: Time to sleep in milliseconds 

153 

154 Example: 

155 ``` 

156 def demo(): 

157 print("I'm going to take a small nap") 

158 yield from sleep(200) 

159 print("I am back to work") 

160 ``` 

161 """ 

162 task: Controller[Any, Any] = yield from current() 

163 loop = asyncio.get_running_loop() 

164 

165 # convert duration to millisecs 

166 handle = loop.call_later(duration/1000, lambda: enqueue(task)) 

167 

168 try: 

169 yield from suspend() 

170 finally: 

171 handle.cancel() 

172 

173@overload 

174def wait(value: Awaitable[T]) -> Task[Control, T]: ... 

175 

176@overload 

177def wait(value: T) -> Task[Control, T]: ... 

178 

179def wait(value: Union[Awaitable[T], T]) -> Task[Control, T]: 

180 """ 

181 Provides equivalent of `await` in async functions. Specifically it takes a value 

182 that you can `await` on (that is `[T]`, i.e futures, coroutines) and suspends 

183 execution until future is settled. If future succeeds execution is resumed with `T` 

184 otherwise an error of type `X` is thrown (which is by default `unknown` since 

185 futures do not encode error type). 

186 

187 It is useful when you need to deal with potentially async set of operations 

188 without having to check if thing is an `await`-able at every step. 

189 

190 Please note that execution is suspended even if given value is not a 

191 promise, however scheduler will still resume it in the same tick of the event 

192 loop after, just processing other scheduled tasks. This avoids problematic 

193 race condititions that can otherwise occur when values are sometimes promises 

194 and other times are not. 

195 

196 Args: 

197 input_value: A value or awaitable to wait on 

198 

199 Returns: 

200 The resolved value 

201 

202 Raises: 

203 Exception if the future fails 

204 

205 Example: 

206 ``` 

207 def fetch_json(url, options): 

208 const response = yield from wait(fetch(url, options)) 

209 const json = yield from wait(response.json()) 

210 return json 

211 ``` 

212 """ 

213 task: Controller[Any, Any] = yield from current() 

214 if is_async(value): 

215 # no need to track `failed = False` like in reference JS impl because failure 

216 # can be tracked by only the `Result` object in the `output` variable below 

217 output: Result[T, Exception] = None # type: ignore[assignment] 

218 

219 async def handle_async() -> None: 

220 nonlocal output 

221 try: 

222 resolved = await (cast(Awaitable[T], value)) 

223 output = Success(resolved) 

224 except Exception as error: 

225 output = Failure(error) 

226 enqueue(task) 

227 

228 loop = asyncio.get_running_loop() 

229 # schedule the async handler on the loop 

230 loop.create_task(handle_async()) 

231 

232 yield from suspend() 

233 

234 # check for failure with `output` instead of `failure` variable like in the 

235 # reference implementation 

236 if isinstance(output, Failure): 

237 raise output.error 

238 return output.value 

239 

240 # this may seem redundant but it is not, by enqueuing this task we allow 

241 # scheduler to perform other queued tasks first. This way many race conditions 

242 # can be avoided when values are sometimes promises and other times aren't. 

243 # unlike `await` however this will resume in the same tick. 

244 # 

245 # when the `wake` task is enqueued in func `main()`, it is enqueued in the `Main` 

246 # group. and when executed, it enqueues `task`. thereby maintaining consistency 

247 # in suspending immediately while `enqueue(task)` runs async after other tasks 

248 # as it similarly happens if `value` was `Awaitable` 

249 main(wake(task)) 

250 yield from suspend() # suspension happens immediately 

251 return cast(T, value) 

252 

253def wake(task: Task[M, T]) -> Task[None, None]: 

254 enqueue(task) 

255 yield None 

256 

257def main(task: Task[None, None]) -> None: 

258 """ 

259 Starts a main task. 

260 

261 Args: 

262 task: A task that produces no output (returns None, never fails, sends no messages) 

263 """ 

264 controller = iter(task) 

265 enqueue(controller) # controller == iterator 

266 

267def is_message(value: Instruction[M]) -> bool: 

268 if value is SUSPEND or value is CURRENT: 

269 return False 

270 return True 

271 

272def is_instruction(value: Instruction[M]) -> bool: 

273 return not is_message(value) 

274 

275 

276class Future_(Generic[T, X]): 

277 """ 

278 Base class for awaitable task handles. 

279 

280 Provides Promise-like interface for tasks, allowing them to be awaited 

281 from async contexts. 

282 """ 

283 

284 def __init__(self, handler: Optional[StateHandler[T, X]] = None): 

285 self.handler = handler or StateHandler() 

286 self.result: Optional[Result[T, X]] = None 

287 self._promise: Optional[asyncio.Future[T]] = None 

288 

289 def _get_promise(self) -> asyncio.Future[T]: 

290 """ 

291 Lazily create and cache an asyncio.Future for this task. 

292 """ 

293 if self._promise is not None: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 return self._promise 

295 

296 # If we already have a result, create a pre-resolved future 

297 if self.result is not None: 

298 loop = asyncio.get_running_loop() 

299 future: asyncio.Future[T] = loop.create_future() 

300 

301 if isinstance(self.result, Success): 301 ↛ 304line 301 didn't jump to line 304 because the condition on line 301 was always true

302 future.set_result(self.result.value) 

303 else: 

304 future.set_exception(self.result.error) 

305 

306 self._promise = future 

307 return future 

308 

309 # Otherwise, create a future and wire up handlers 

310 loop = asyncio.get_running_loop() 

311 future = loop.create_future() 

312 

313 # Store original handlers 

314 original_onsuccess = self.handler.onsuccess 

315 original_onfailure = self.handler.onfailure 

316 

317 def onsuccess(value: T) -> None: 

318 if not future.done(): 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true

319 future.set_result(value) 

320 if original_onsuccess: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 original_onsuccess(value) 

322 

323 def onfailure(error: X) -> None: 

324 if not future.done(): 

325 future.set_exception(error) 

326 if original_onfailure: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 original_onfailure(error) 

328 

329 self.handler.onsuccess = onsuccess 

330 self.handler.onfailure = onfailure 

331 

332 self._promise = future 

333 return future 

334 

335 def __await__(self) -> Generator[Any, Any, T]: 

336 # Get the promise and await it 

337 promise = self._get_promise() 

338 

339 # Activate the task (runs synchronously in MAIN scheduler) 

340 self.activate() 

341 

342 return promise.__await__() 

343 

344 def activate(self) -> Future_[T, X]: 

345 """ 

346 Activate the task. Overriden in subclasses. 

347 """ 

348 return self 

349 

350 

351class Fork(Future_[T, X], Generic[T, X, M]): 

352 """ 

353 A handle to a running task that can be awaited. 

354 

355 Implements both the generator protocol (for use within tasks) and 

356 the awaitable protocol (for use in async functions). 

357 """ 

358 def __init__( 

359 self, 

360 task: Task[M, T], 

361 handler: Optional[StateHandler[T, X]] = None, 

362 options: Optional[ForkOptions] = None, 

363 ) -> None: 

364 super().__init__(handler or StateHandler()) 

365 

366 global ID 

367 ID += 1 

368 self.id = ID 

369 self.name: str = "" if options is None else (options.get('name', None) or "") 

370 self.task = task 

371 self.state: Union[Instruction[M], StopIteration] = CURRENT 

372 self.status = Status.IDLE 

373 self.controller: Optional[Controller[M, T]] = None 

374 self.group: Optional[Group[T, X, M]] = None 

375 

376 

377 def resume(self) -> Task[None, None]: 

378 resume(self) 

379 yield None 

380 

381 def join(self) -> Task[Optional[M], T]: 

382 return join(self) 

383 

384 def abort(self, error: X) -> Task[None, None]: 

385 return abort(self, error) 

386 

387 def exit_(self, value: T) -> Task[None, None]: 

388 return exit_(self, value) 

389 

390 def activate(self) -> Fork[T, X, M]: 

391 """ 

392 Activate the task and enqueue it for execution. 

393 """ 

394 # Only activate if not already active or finished 

395 if self.controller is None: 

396 self.controller = iter(self.task) 

397 self.status = Status.ACTIVE 

398 enqueue(self) 

399 return self 

400 

401 def __iter__(self) -> Generator[Any, Any, Fork[T, X, M]]: 

402 """ 

403 Make Fork iterable for use with yield from. 

404 """ 

405 # making __iter__ a generator ensures that `yield from fork(work())` schedules 

406 # the fork for concurrent execution without enabling synchronous driving of the 

407 # forked task after activation and then returns immediately (since MAIN group 

408 # is likely already actively being executed by the scheduler and `work` already 

409 # queued in `MAIN`s active queue). 

410 self.activate() 

411 return self 

412 yield 

413 

414 def _panic(self, error: X) -> NoReturn: 

415 self.result = Failure(ok=False, error=error) 

416 self.status = Status.FINISHED 

417 if self.handler.onfailure is not None: 

418 self.handler.onfailure(error) 

419 raise error 

420 

421 def _step(self, state: Union[Instruction[M], StopIteration]) -> None: 

422 self.state = state 

423 #`StopIteration` signifies end of a generator in Python and holds its return 

424 # value on success 

425 if isinstance(state, StopIteration): 

426 self.result = Success(ok=True, value=state.value) 

427 self.status = Status.FINISHED 

428 if self.handler.onsuccess is not None: 

429 self.handler.onsuccess(state.value) 

430 # `state` is not returned here like in the reference js implementation because - 

431 # `state` here can be set to `StopIteration(value: T)` which signals competion 

432 # in Python, but in order to keep `Fork` behaviour predictable and similar to 

433 # normal generators (e.g `controller` generators) `StopIteration` isn't returned 

434 # but raised. So no point in returning `state` (especially on completion), 

435 # within the class or - via the generator protocol methods - outside the class. 

436 # just run success handlers, update state and set success value in result. 

437 

438 def __next__(self) -> Instruction[M]: 

439 try: 

440 # note that: `task.send(None) == next(task)` 

441 # also cast the type for `self.controller` to eliminate the type-checker 

442 # from inferring that `self.controller` is still `None` after fork 

443 # activation 

444 state = cast(Controller[M, T], self.controller).send(None) 

445 self._step(state) 

446 return state 

447 except StopIteration as e: 

448 # `StopIteration` means task is finished, therefore task `result`, `status` 

449 # can be set and success handler function ran. 

450 self._step(e) 

451 raise 

452 except Exception as e: 

453 return self._panic(e) # type: ignore[arg-type] 

454 

455 def send(self, value: Any) -> Instruction[M]: 

456 try: 

457 # cast the type for `self.controller` to eliminate the type-checker 

458 # from inferring that `self.controller` is still `None` after fork 

459 # activation 

460 state = cast(Controller[M, T], self.controller).send(value) 

461 self._step(state) 

462 return state 

463 except StopIteration as e: 

464 # `StopIteration` means task is finished, therefore task `result`, `status` 

465 # can be set and success handler function ran. 

466 self._step(e) 

467 raise 

468 except Exception as e: 

469 return self._panic(e) # type: ignore[arg-type] 

470 

471 def throw(self, error: Union[Exception, GeneratorExit]) -> Instruction[M]: 

472 try: 

473 # also cast the type for `self.controller` to eliminate the type-checker 

474 # from inferring that `self.controller` is still `None` after fork 

475 # activation 

476 state = cast(Controller[M, T], self.controller).throw(error) 

477 self._step(state) 

478 return state 

479 except StopIteration as e: 

480 # `StopIteration` means task is finished, therefore task `result`, `status` 

481 # can be set and success handler function ran. 

482 self._step(e) 

483 raise 

484 # `GeneratorExit` is outside the scope of `Exception`. it subclasses 

485 # `BaseException` instead - `Exception`'s parent class. Catch both exc. 

486 except (Exception, GeneratorExit) as e: 

487 return self._panic(e) # type: ignore[arg-type] 

488 

489 def return_(self, value: T) -> T: 

490 try: 

491 # also cast the type for `self.controller` to eliminate the type-checker 

492 # from inferring that `self.controller` is still `None` after fork 

493 # activation 

494 cast(Controller[M, T], self.controller).throw(StopIteration(value)) 

495 except StopIteration as e: 

496 self._step(e) 

497 except Exception as e: 

498 self._panic(e) # type: ignore[arg-type] 

499 else: 

500 # the controller yielded instead of terminating. therefore enforce close 

501 cast(Controller[M, T], self.controller).close() 

502 return value 

503 

504 def close(self) -> None: 

505 try: 

506 # self.controller.close() == self.controller.throw(GeneratorExit) 

507 # also cast the type for `self.controller` to eliminate the type-checker 

508 # from inferring that `self.controller` is still `None` after fork 

509 # activation 

510 cast(Controller[M, T], self.controller).close() 

511 except (Exception) as e: 

512 self._panic(e) # type: ignore[arg-type] 

513 

514 def __repr__(self) -> str: 

515 return f"Fork(id={self.id}, status='{self.status}')" 

516 

517# type alias added for convenience 

518TaskFork: TypeAlias = Union[Task, Fork[T, X, M]] # type: ignore[type-arg] 

519ControllerFork: TypeAlias = Union[Controller, Fork[T, X, M]] # type: ignore[type-arg] 

520 

521class Main(Generic[T, X, M]): 

522 """Default or Fallback Task Group.""" 

523 def __init__(self) -> None: 

524 self.status = Status.IDLE 

525 self.stack: Stack[T, X, M] = Stack() 

526 self.id: Literal[0] = 0 

527 self.parent: Optional[TaskGroup[T, X, M]] = None 

528 

529 

530MAIN: Main = Main() # type: ignore[type-arg] 

531"""Singleton main group""" 

532 

533# Python generator objects cannot carry arbitrary attributes like in the reference JS 

534# implementation, so loop/group helpers store membership here to route resumed generators 

535# back to their group. 

536_GROUP_MEMBERSHIP: weakref.WeakKeyDictionary[object, Group[Any, Any, Any]] = weakref.WeakKeyDictionary() 

537 

538 

539class TaskGroup(Generic[T, X, M]): 

540 """Task group for managing concurrent tasks.""" 

541 def __init__( 

542 self, 

543 driver: ControllerFork[T, X, M], 

544 active: Optional[list[ControllerFork[T, X, M]]] = None, 

545 idle: Optional[set[ControllerFork[T, X, M]]] = None, 

546 stack: Optional[Stack[T, X, M]] = None 

547 ) -> None: 

548 self.driver = driver 

549 self.parent = TaskGroup.of(driver) 

550 

551 # gymnastics to align with reference JS implementation without sacrificing 

552 # safety. Using mutable types as default args in Python can lead to weird errors 

553 # that arise from a shared state created at definition time as opposed to each 

554 # time the function is called. 

555 active_eval = active if active is not None else [] 

556 idle_eval = idle if idle is not None else set() 

557 if active is not None or idle is not None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 self.stack = Stack(active_eval, idle_eval) 

559 elif stack is not None: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 self.stack = stack 

561 else: 

562 self.stack = Stack() 

563 

564 global ID 

565 ID += 1 

566 self.id = ID 

567 

568 @staticmethod 

569 def of(member: ControllerFork[T, X, M]) -> Group[T, X, M]: 

570 # since `Generator` objects don't have the `group` attribute if `member` 

571 # is not a `Fork` then it's group is the default `MAIN` group 

572 group: Optional[Group[T, X, M]] = getattr(member, 'group', None) 

573 if group is not None: 

574 return group 

575 mapped = _GROUP_MEMBERSHIP.get(member) 

576 return cast(Group[T, X, M], mapped if mapped is not None else MAIN) 

577 

578 

579 @staticmethod 

580 def enqueue(member: TaskFork[T, X, M], group: TaskGroup[T, X, M]) -> None: 

581 try: 

582 member.group = group # type: ignore[union-attr] 

583 except AttributeError: 

584 _GROUP_MEMBERSHIP[member] = group 

585 finally: 

586 group.stack.active.append(member) 

587 

588 

589Group = Union[TaskGroup[T, X, M], Main[T, X, M]] 

590 

591def enqueue(task: ControllerFork[T, X, M]) -> None: 

592 group = TaskGroup.of(task) 

593 

594 group.stack.active.append(task) 

595 group.stack.idle.discard(task) 

596 

597 # then walk up the group chain and unblock their driver tasks 

598 while group.parent is not None: 

599 idle = group.parent.stack.idle 

600 active = group.parent.stack.active 

601 

602 # only to appease type checkers, since `MAIN` doesn't have a driver and `MAIN` 

603 # has a parent of `None` so this loop won't run in that case 

604 driver = getattr(group, "driver", None) 

605 if driver is not None and driver in idle: 

606 idle.remove(driver) 

607 active.append(driver) 

608 else: 

609 # if driver was not blocked it must have been unblocked by other task so 

610 # stop there 

611 break 

612 

613 # crawl up to the parent group 

614 group = group.parent 

615 

616 if MAIN.status == Status.IDLE: 

617 MAIN.status = Status.ACTIVE 

618 while True: 

619 try: 

620 for _ in step(MAIN): 

621 pass 

622 MAIN.status = Status.IDLE 

623 break 

624 except Exception: 

625 # Top level task may crash and throw an error, but given this is a main 

626 # group we do not want to interrupt other unrelated tasks, which is why 

627 # we discard the error and the task that caused it. 

628 MAIN.stack.active.pop(0) 

629 

630def step(group: Group[T, X, M]) -> Generator[M, Any, None]: 

631 active = group.stack.active 

632 task = active[0] if active else None 

633 if task: 

634 group.stack.idle.discard(task) 

635 while task: 

636 # Keep processing instructions until task is done, sends a `SUSPEND` request or 

637 # it has been removed from the active queue. 

638 # ⚠️ Group changes require extra care so please make sure to understand the 

639 # detail here. It occurs when a spawned task(s) are joined into a group which 

640 # will change the driver and the group the task belongs to, that is when the 

641 # conditional statement: `task == active[0]` will become false and the task 

642 # would need to be dropped immediately otherwise race dondition will occur due 

643 # to task been driven by multiple concurrent schedulers. 

644 try: 

645 input_value = None 

646 while task == active[0]: 646 ↛ 665line 646 didn't jump to line 665 because the condition on line 646 was always true

647 instruction = task.send(input_value) 

648 if instruction is SUSPEND: 

649 group.stack.idle.add(task) 

650 break 

651 # if task requested a context (which is usually to suspend itself) pass back 

652 # a task reference and continue. 

653 if instruction is CURRENT: 

654 input_value = task 

655 continue 

656 # otherwise task sent a message which we yield to the driver and 

657 # continue 

658 input_value = yield instruction # type: ignore[misc] 

659 continue 

660 except StopIteration: 

661 # task finished 

662 pass 

663 

664 # if task is complete or got suspended we move to a next task 

665 if active and active[0] == task: 665 ↛ 668line 665 didn't jump to line 668 because the condition on line 665 was always true

666 active.pop(0) 

667 

668 task = active[0] if active else None 

669 if task: 

670 group.stack.idle.discard(task) 

671 

672def spawn(task: Task[None, None]) -> Task[None, None]: 

673 """ 

674 Executes a given task concurrently with a current task (task that spawned it). 

675 Spawned task is detached from the task that spawned it and it can outlive it and/or 

676 fail without affecting a task that spawned it. If you need to wait on a concurrent 

677 task completion consider using `fork` instead which can later be `join`ed. If you 

678 just want a task to block on another task's execution you can just use: 

679 `yield from work()` directly instead. 

680 """ 

681 main(task) 

682 return 

683 yield 

684 

685def fork(task: Task[M, T], options: ForkOptions | None = None) -> Fork[T, Exception, M]: 

686 """ 

687 Executes a given task concurrently with current task (the task that initiated fork) 

688 Forked task is detached from the task that created it and it can outlive it and / 

689 or fail without affecting it. You do however get a handle for the fork which could 

690 used to `join` the task, in which case `join`ing task would block until fork 

691 finishes execution. 

692 

693 This is also a primary interface for executing tasks from the outside of the task 

694 context. Function returns `Fork` which implements `Future` interface so it can be 

695 awaited. Please note that calling `fork` does not really do anything, it lazily 

696 starts execution when you either `await fork(work())` from arbitrary context or 

697 `yield from fork(work())` in another task context. 

698 """ 

699 return Fork(task, options=options) 

700 

701def current() -> Generator[CurrentInstruction, Controller[M, T], Controller[M, T]]: 

702 return (yield CURRENT) 

703 

704def suspend() -> Generator[SuspendInstruction, Any, None]: 

705 yield SUSPEND 

706 

707def resume(task: Controller[M, T] | Fork[T, X, M]) -> None: 

708 enqueue(task) 

709 

710def conclude( 

711 handle: ControllerFork[T, X, M], 

712 result: Result[T, Exception] 

713) -> Task[None, None]: 

714 """ 

715 Concludes a given task with a result (either success `value` `T` or `error` `X`) 

716 

717 Args: 

718 handle: Task controller 

719 result: Success or failure result 

720 """ 

721 try: 

722 task = handle 

723 if isinstance(result, Success): 

724 try: 

725 # force the generator to end and return with `result.value` 

726 state = task.throw(StopIteration(result.value)) 

727 except StopIteration: 

728 return 

729 elif isinstance(result, Failure): 729 ↛ 737line 729 didn't jump to line 737 because the condition on line 729 was always true

730 state = task.throw(result.error) 

731 while state is CURRENT: 

732 state = task.send(task) 

733 except Exception: 

734 pass 

735 else: 

736 # incase `task` has a `finally` block that still yields values into `state` 

737 if state is SUSPEND: 737 ↛ 740line 737 didn't jump to line 740 because the condition on line 737 was always true

738 idle = TaskGroup.of(task).stack.idle 

739 idle.add(task) 

740 elif state is not None: 

741 enqueue(task) 

742 return 

743 yield 

744 

745 

746def abort(handle: ControllerFork[T, X, M], error: Exception) -> Task[None, None]: 

747 """ 

748 Aborts given task with an error. Task error type should match provided error. 

749 

750 Args: 

751 handle: Task controller to abort 

752 error: Error to throw into the task 

753 """ 

754 yield from conclude(handle, Failure(error)) 

755 

756 

757def exit_(handle: ControllerFork[T, X, M], value: Any) -> Task[None, None]: 

758 """ 

759 Exits a task successfully with a return value. 

760 

761 Args: 

762 handle: Task controller to exit 

763 value: Return value on exit 

764 """ 

765 yield from conclude(handle, Success(value)) 

766 

767 

768def terminate(handle: ControllerFork[None, X, M]) -> Task[None, None]: 

769 """ 

770 Terminates a task (only for tasks with void return type). If your task has a 

771 non-`void` return type you should use `exit` instead. 

772 

773 Args: 

774 handle: Task controller to terminate 

775 """ 

776 yield from conclude(handle, Success(value=None)) 

777 

778 

779def group(forks: list[Fork[T, X, M]]) -> Task[Optional[Instruction[M]], None]: 

780 """ 

781 Groups multiple forks together and joins them with current task. 

782 """ 

783 # abort early if there's no work to do 

784 if len(forks) == 0: 

785 return 

786 

787 self_: Controller[Any, Any] = yield from current() 

788 group: TaskGroup[T, X, M] = TaskGroup(self_) 

789 failure: Optional[Failure[X]] = None 

790 

791 for fork in forks: 

792 result = fork.result 

793 if result is not None: 

794 # only the first error should be recorded, so `failure` has to be `None` 

795 if not result.ok and failure is None: 

796 failure = cast(Failure, result) # type: ignore[type-arg] 

797 continue 

798 move(fork, group) 

799 

800 # keep work looping until there is no more work to be done 

801 try: 

802 # raise the exception that caused the first recorded failure result 

803 if failure: 

804 raise failure.error 

805 while True: 

806 # blocks the calling task: `self_`, to run all tasks in the active queue of 

807 # the group to completion 

808 yield from step(group) 

809 # but there might be suspended tasks in `group.stack.idle` 

810 if Stack.size(group.stack) > 0: 

811 # if there are grouped forked tasks that are suspended, then suspend 

812 # driver too. 

813 # NOTE: that `enqueue()` resumes the driver when suspended forked task 

814 # resumes. since `enqueue()` unblocks the drivers of a task's group 

815 # before starting the scheduler and processing the resumed task in it. 

816 yield from suspend() 

817 else: 

818 break 

819 except Exception as error: 

820 # only iterate over a copy of active/idle queue to be safe 

821 for task in list(group.stack.active): 

822 yield from abort(task, error) 

823 

824 for task in list(group.stack.idle): 

825 yield from abort(task, error) 

826 enqueue(task) # `conclude` might add idle tasks back into `idle` queue 

827 

828 raise error 

829 

830 

831def move(fork: Fork[T, X, M], group: TaskGroup[T, X, M]) -> None: 

832 """Move a fork from one group to another.""" 

833 from_ = TaskGroup.of(fork) 

834 if from_ is not group: 834 ↛ exitline 834 didn't return from function 'move' because the condition on line 834 was always true

835 active, idle = (from_.stack.active, from_.stack.idle) 

836 target = group.stack 

837 fork.group = group 

838 # if it is idle just move from one group to the other and update the group task 

839 # thinks it belongs to. 

840 if fork in idle: 

841 idle.remove(fork) 

842 target.idle.add(fork) 

843 elif fork in active: 843 ↛ exitline 843 didn't return from function 'move' because the condition on line 843 was always true

844 index = active.index(fork) 

845 # if task is in the job queue, we move it to a target job queue. Moving top 

846 # task in the queue requires extra care so it does not end up processed by 

847 # two groups which would lead to race. For that reason `step` loop checks 

848 # checks for group changes on each turn 

849 if index >= 0: 849 ↛ exitline 849 didn't return from function 'move' because the condition on line 849 was always true

850 active.pop(index) 

851 target.active.append(fork) 

852 # otherwise task is complete 

853 

854def join(fork: Fork[T, X, M]) -> Task[Optional[Instruction[M]], T]: 

855 """ 

856 Joins a forked task back into the current task. 

857 

858 Suspends the current task until the fork completes, then resumes with its result. 

859 If the fork fails, the error is thrown. 

860 

861 Args: 

862 fork_obj: The fork to join 

863 

864 Returns: 

865 The fork's result 

866 

867 Raises: 

868 The fork's error if it failed 

869 """ 

870 if fork.status == Status.IDLE: 

871 yield from fork 

872 

873 # if fork didn't complete, process `fork` in the scheduler and block until 

874 # completion 

875 if fork.result is None: 

876 yield from group([fork]) 

877 

878 result: Result[T, X] = fork.result # type: ignore[assignment] 

879 if isinstance(result, Success): 879 ↛ 881line 879 didn't jump to line 881 because the condition on line 879 was always true

880 return result.value 

881 raise result.error 

882 

883def send(message: M) -> Effect[M]: 

884 """ 

885 Task that sends a given message (or rather an effect producing this message). 

886 Please note, that while you could use `yield mesage` instead, the reference 

887 implementation for this library written in TS had risks of breaking changes in the 

888 TS generator inference which could enable a replacement for `yield *`. 

889 For uniformity purposes, we decided to stick with the same approach for the Python 

890 implementation as well. 

891 """ 

892 yield message 

893 

894def effect(task: Task[None, T]) -> Effect[T]: 

895 """ 

896 Turns a task (that never fails or sends messages) into an effect of its result. 

897 """ 

898 message = yield from task # type: ignore[misc] 

899 yield from send(message) 

900 

901def loop(init: Effect[M], next_: Callable[[M], Effect[M]]) -> Task[None, None]: 

902 controller: Controller[Any, Any] = yield from current() 

903 group: TaskGroup[M, Any, Any] = TaskGroup(controller) 

904 TaskGroup.enqueue(iter(init), group) 

905 

906 while True: 

907 for msg in step(group): 

908 try: 

909 # incase `next` only accepts keyword args 

910 effect: Effect[M] = next_(**msg) # type: ignore[call-arg] 

911 except TypeError: 

912 effect = next_(cast(M, msg)) 

913 TaskGroup.enqueue(iter(effect), group) 

914 if Stack.size(group.stack) > 0: 

915 yield from suspend() 

916 else: 

917 break 

918 

919Tag: TypeAlias = str 

920 

921 

922class Tagger(Generic[T, X, M]): 

923 def __init__(self, tags: list[str], source: Fork[T, X, M]) -> None: 

924 self.tags = tags 

925 self.source = source 

926 self.controller: Optional[Controller] = None # type: ignore[type-arg] 

927 

928 def __iter__(self)-> Self: 

929 if not self.controller: 929 ↛ 931line 929 didn't jump to line 931 because the condition on line 929 was always true

930 self.controller = iter(self.source) 

931 return self 

932 

933 def box( 

934 self, state: Union[Instruction[M], StopIteration] 

935 ) -> Union[Control, Tagged[M]]: 

936 if isinstance(state, StopIteration): 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true

937 return state.value # type: ignore[no-any-return] 

938 

939 if state is CURRENT or state is SUSPEND: 

940 return state # type: ignore[return-value] 

941 

942 # tag non-control instructions. Instead of boxing result at each transform 

943 # step we perform in-place mutation as we know nothing else is accessing this value. 

944 tagged = state 

945 for tag in self.tags: 

946 tagged = with_tag(tag, tagged) # type: ignore[assignment] 

947 return cast(Tagged[M], tagged) 

948 

949 def __next__(self) -> Union[Control, Tagged[M]]: 

950 return self.box(next(cast(Fork[T, X, M], self.controller))) 

951 

952 def close(self) -> None: 

953 cast(Fork[T, X, M], self.controller).close() 

954 

955 def send(self, instruction: Instruction[M]) -> Union[Control, Tagged[M]]: 

956 return self.box(cast(Fork[T, X, M], self.controller).send(instruction)) 

957 

958 def throw(self, error: Exception) -> Union[Control, Tagged[M]]: 

959 return self.box(cast(Fork[T, X, M], self.controller).throw(error)) 

960 

961 def return_(self, value: T) -> Union[Control, Tagged[T]]: 

962 return self.box(cast(Fork[T, X, M], self.controller).return_(value)) # type: ignore[arg-type, return-value] 

963 

964 def __str__(self) -> str: 

965 return "TaggedEffect" 

966 

967 

968def _none_effect() -> Effect[None]: 

969 return 

970 yield # necessary for this func to be recognized as a generator that yields nothing 

971 

972NONE_: Effect[None] = _none_effect() 

973 

974def none_() -> Effect[None]: 

975 """ 

976 Returns empty `Effect`, that is produces no messages. Kind of like `[]` or `""` but 

977 for effects. 

978 """ 

979 return NONE_ 

980 

981def then_( 

982 task: Task[M, T], 

983 resolve: Callable[[T], U], 

984 reject: Callable[[X], U], 

985) -> Task[M, U]: 

986 try: 

987 return resolve((yield from task)) 

988 except Exception as e: 

989 return reject(e) # type: ignore[arg-type] 

990 

991def all_(tasks: Iterable[Task[M, T]]) -> Task[Any, list[T]]: 

992 """ 

993 Takes iterable of tasks and runs them concurrently, returning an array of results in 

994 an order of the tasks (not the order of completion). If any of the tasks fail all 

995 the rest are aborted and error is thrown into the calling task. 

996 """ 

997 self_: Controller[Any, Any] = yield from current() 

998 forks: list[Optional[Fork[T, Exception, None]]] = [] 

999 results: list[Optional[T]] = [] 

1000 count_ = 0 

1001 

1002 def succeed(idx: int) -> Callable[[T], None]: 

1003 def handler(value: T) -> None: 

1004 nonlocal count_ 

1005 forks[idx] = None 

1006 results[idx] = value 

1007 count_ -= 1 

1008 if count_ == 0: 

1009 enqueue(self_) 

1010 return handler 

1011 

1012 def fail(error: Exception) -> None: 

1013 for handle in forks: 

1014 if handle is not None: 

1015 enqueue(abort(handle, error)) 

1016 enqueue(abort(self_, error)) 

1017 

1018 for i, task in enumerate(tasks): 

1019 results.append(None) # keeps the results list at a size of len(tasks) 

1020 fk = (yield from fork(then_(task, succeed(i), fail))) 

1021 forks.append(fk) # type: ignore[arg-type] 

1022 count_ += 1 

1023 

1024 if count_ > 0: 

1025 yield from suspend() 

1026 

1027 return cast(list[T], results) 

1028 

1029 

1030Tagged: TypeAlias = dict[str, Union[Tag, M]] 

1031""" 

1032The dictionary is shaped like: {"type": tag, tag: value}. There is no way to express 

1033that dictionary shape in Python's current type system at the time of writing this 

1034implementation. 

1035""" 

1036 

1037def with_tag(tag: Tag, value: M) -> Tagged[M]: 

1038 return {"type": tag, tag: value} 

1039 

1040def tag(effect: Union[ControllerFork[T, X, M], Tagger[T, X, M]], tag: str) -> Effect[Union[Control, Tagged[M]]]: 

1041 """ 

1042 Tags an effect by boxing each event with an object that has `type` field 

1043 corresponding to the given tag and same named field holding original message e.g 

1044 given `nums` effect that produces numbers, `tag(nums, "inc")` would create an effect 

1045 that produces events like `{"type": "inc", "inc": 1}` 

1046 """ 

1047 if effect is NONE_: 

1048 return NONE_ # type: ignore[return-value] 

1049 if isinstance(effect, Tagger): 

1050 return Tagger(tags=(effect.tags + [tag]), source=effect.source) # type: ignore[return-value] 

1051 return Tagger([tag], effect) # type:ignore[return-value, arg-type] 

1052 

1053def listen(sources: dict[Tag, Effect[M]]) -> Effect[Union[Control, Tagged[M]]]: 

1054 """ 

1055 Takes several effects and merges them into a single effect of tagged variants so 

1056 that their source could be identified via `type` field. 

1057 """ 

1058 forks: list[Fork] = [] # type: ignore[type-arg] 

1059 for entry in sources.items(): 

1060 name, eff = entry 

1061 if eff is not NONE_: 1061 ↛ 1059line 1061 didn't jump to line 1059 because the condition on line 1061 was always true

1062 forks.append( 

1063 (yield from fork(tag(eff, name))) 

1064 ) 

1065 yield from group(forks) # type: ignore[misc] 

1066 

1067def batch(effects: list[Effect[T]]) -> Effect[T]: 

1068 """ 

1069 Takes several effects and combines them into one effect 

1070 """ 

1071 forks: list[Fork] = [] # type: ignore[type-arg] 

1072 for eff in effects: 

1073 forks.append((yield from fork(eff))) 

1074 yield from group(forks) # type: ignore[misc] 

1075 

1076def effects(tasks: list[Task[None, T]]) -> Effect[Optional[T]]: 

1077 """ 

1078 Takes several tasks and creates an effect of them all. 

1079 """ 

1080 if tasks: 

1081 return batch([effect(task) for task in tasks]) 

1082 return NONE_