Coverage for actress / task.py: 91%
515 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-06 15:02 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-06 15:02 +0100
1"""Core task/effect scheduler primitives for actress."""
2# pylint: disable=missing-class-docstring,missing-function-docstring,too-many-instance-attributes,global-statement
4from __future__ import annotations
5import asyncio
6import weakref
7from dataclasses import dataclass
8from typing import Any, Generic, Literal, NoReturn, Optional, TypeVar, TypedDict, Union, cast, overload
9from collections.abc import Awaitable, Generator, Callable, Iterable
10from enum import Enum
11from typing_extensions import Self, TypeAlias
13T = TypeVar("T") # value of task returned (on success)
14X = TypeVar("X", bound= Exception) # exception raised by task (failure)
15M = TypeVar("M") # message yielded by task
16U = TypeVar("U")
19class CurrentInstruction:
20 def __repr__(self) -> str:
21 return '<CURRENT>'
24class SuspendInstruction:
25 def __repr__(self) -> str:
26 return '<SUSPEND>'
29# Special control instructions recognized by the scheduler.
30CURRENT = CurrentInstruction()
31SUSPEND = SuspendInstruction()
33Control: TypeAlias = Union[CurrentInstruction, SuspendInstruction]
34Instruction: TypeAlias = Union[Control, M]
35Controller: TypeAlias = Generator[Union[Control, M], Any, T]
36Task: TypeAlias = Generator[Union[Control, M], Any, T] # In python the generator object is also the iterator
37"""
38Task is a unit of computation that runs concurrently, a light-weight
39process (in Erlang terms). You can spawn bunch of them and provided
40cooperative scheduler will interleave their execution.
42Tasks have three type variables first two describing result of the
43computation `Success` that corresponds to return type and `Failure`
44describing an error type (caused by thrown exceptions). Third type
45varibale `Message` describes type of messages this task may produce.
47Please note that that Python does not really check exceptions so `Failure`
48type can not be guaranteed. Yet, we find them more practical than omitting
49them as Python does for `Future` types and its derivatives.
51Our tasks are generators (not the generator functions, but what you get
52invoking them) that are executed by (library provided) provided scheduler.
53Scheduler recognizes two special `Control` instructions yield by generator.
54When scheduler gets `current` instruction it will resume generator with
55a handle that can be used to resume running generator after it is suspended.
56When `suspend` instruction is received scheduler will suspend execution until
57it is resumed by queueing it from the outside event.
58"""
60# `M` == `Event`
61Effect: TypeAlias = Generator[M, Any, None]
62"""
63Effect represents potentially asynchronous operations that results in a set of events.
64It is often comprised of multiple `Task` and represents either chain of events or a
65concurrent set of events (stretched over time).
66Effect compares to a `Stream` in Javascript in the same way as `Task` compares to
67`Future` in Python. It is not a representation of an eventual result but rather a
68representation of an operation which if executed will produce certain result. `Effect`
69can also be compared to an `EventEmitter` in Javascript, but very often their `Event`
70type variable (`M` type variable in Python imlementation) is a union of various event
71types, unlike `EventEmitter`s however `Effect`s have inherent finality to them and in
72that regard are more like Javascript `Streams`
73"""
75@dataclass
76class Success(Generic[T]):
77 "Result of a successful task."
78 value: T
79 ok: bool = True
82@dataclass
83class Failure(Generic[X]):
84 "Result of a failed task."
85 error: X
86 ok: bool = False
89Result: TypeAlias = Union[Success[T], Failure[X]]
92@dataclass
93class StateHandler(Generic[T, X]):
94 onsuccess: Optional[Callable[[T], None]] = None
95 onfailure: Optional[Callable[[X], None]] = None
97ID = 0
98"""Unique IDs for entities (`Fork`, `Group`, ...)"""
101class ForkOptions(TypedDict):
102 name: Optional[str]
105class Status(str, Enum):
106 """Task execution status."""
107 IDLE = "idle"
108 ACTIVE = "active"
109 FINISHED = "finished"
112class Stack(Generic[T, X, M]):
113 """Stack of active and idle tasks"""
114 def __init__(
115 self,
116 active: Optional[list[ControllerFork[T, X, M]]] = None,
117 idle: Optional[set[ControllerFork[T, X, M]]] = None
118 ) -> None:
119 # gymnastics to align with reference JS implementation without sacrificing
120 # safety. Using mutable types as default args in Python can lead to weird errors
121 # that arise from a shared state created at definition time as opposed to each
122 # time the function is called.
123 active_eval = active if active is not None else []
124 idle_eval = idle if idle is not None else set()
125 self.active: list[ControllerFork[T, X, M]] = active_eval
126 self.idle: set[ControllerFork[T, X, M]] = idle_eval
128 @staticmethod
129 def size(stack: Stack[T, X, M]) -> int:
130 """Get total number of tasks in stack."""
131 return len(stack.active) + len(stack.idle)
134def is_async(node: Any) -> bool:
135 """
136 Checks if a value is awaitable (or its lookalike).
137 """
138 if (
139 asyncio.isfuture(node) or
140 asyncio.iscoroutine(node) or
141 isinstance(node, Awaitable)
142 ):
143 return True
144 return False
146def sleep(duration: float = 0) -> Task[Control, None]:
147 """
148 Suspends execution for the given duration in milliseconds, after which execution
149 is resumed (unless it was aborted in the meantime).
151 Args:
152 duration: Time to sleep in milliseconds
154 Example:
155 ```
156 def demo():
157 print("I'm going to take a small nap")
158 yield from sleep(200)
159 print("I am back to work")
160 ```
161 """
162 task: Controller[Any, Any] = yield from current()
163 loop = asyncio.get_running_loop()
165 # convert duration to millisecs
166 handle = loop.call_later(duration/1000, lambda: enqueue(task))
168 try:
169 yield from suspend()
170 finally:
171 handle.cancel()
173@overload
174def wait(value: Awaitable[T]) -> Task[Control, T]: ...
176@overload
177def wait(value: T) -> Task[Control, T]: ...
179def wait(value: Union[Awaitable[T], T]) -> Task[Control, T]:
180 """
181 Provides equivalent of `await` in async functions. Specifically it takes a value
182 that you can `await` on (that is `[T]`, i.e futures, coroutines) and suspends
183 execution until future is settled. If future succeeds execution is resumed with `T`
184 otherwise an error of type `X` is thrown (which is by default `unknown` since
185 futures do not encode error type).
187 It is useful when you need to deal with potentially async set of operations
188 without having to check if thing is an `await`-able at every step.
190 Please note that execution is suspended even if given value is not a
191 promise, however scheduler will still resume it in the same tick of the event
192 loop after, just processing other scheduled tasks. This avoids problematic
193 race condititions that can otherwise occur when values are sometimes promises
194 and other times are not.
196 Args:
197 input_value: A value or awaitable to wait on
199 Returns:
200 The resolved value
202 Raises:
203 Exception if the future fails
205 Example:
206 ```
207 def fetch_json(url, options):
208 const response = yield from wait(fetch(url, options))
209 const json = yield from wait(response.json())
210 return json
211 ```
212 """
213 task: Controller[Any, Any] = yield from current()
214 if is_async(value):
215 # no need to track `failed = False` like in reference JS impl because failure
216 # can be tracked by only the `Result` object in the `output` variable below
217 output: Result[T, Exception] = None # type: ignore[assignment]
219 async def handle_async() -> None:
220 nonlocal output
221 try:
222 resolved = await (cast(Awaitable[T], value))
223 output = Success(resolved)
224 except Exception as error:
225 output = Failure(error)
226 enqueue(task)
228 loop = asyncio.get_running_loop()
229 # schedule the async handler on the loop
230 loop.create_task(handle_async())
232 yield from suspend()
234 # check for failure with `output` instead of `failure` variable like in the
235 # reference implementation
236 if isinstance(output, Failure):
237 raise output.error
238 return output.value
240 # this may seem redundant but it is not, by enqueuing this task we allow
241 # scheduler to perform other queued tasks first. This way many race conditions
242 # can be avoided when values are sometimes promises and other times aren't.
243 # unlike `await` however this will resume in the same tick.
244 #
245 # when the `wake` task is enqueued in func `main()`, it is enqueued in the `Main`
246 # group. and when executed, it enqueues `task`. thereby maintaining consistency
247 # in suspending immediately while `enqueue(task)` runs async after other tasks
248 # as it similarly happens if `value` was `Awaitable`
249 main(wake(task))
250 yield from suspend() # suspension happens immediately
251 return cast(T, value)
253def wake(task: Task[M, T]) -> Task[None, None]:
254 enqueue(task)
255 yield None
257def main(task: Task[None, None]) -> None:
258 """
259 Starts a main task.
261 Args:
262 task: A task that produces no output (returns None, never fails, sends no messages)
263 """
264 controller = iter(task)
265 enqueue(controller) # controller == iterator
267def is_message(value: Instruction[M]) -> bool:
268 if value is SUSPEND or value is CURRENT:
269 return False
270 return True
272def is_instruction(value: Instruction[M]) -> bool:
273 return not is_message(value)
276class Future_(Generic[T, X]):
277 """
278 Base class for awaitable task handles.
280 Provides Promise-like interface for tasks, allowing them to be awaited
281 from async contexts.
282 """
284 def __init__(self, handler: Optional[StateHandler[T, X]] = None):
285 self.handler = handler or StateHandler()
286 self.result: Optional[Result[T, X]] = None
287 self._promise: Optional[asyncio.Future[T]] = None
289 def _get_promise(self) -> asyncio.Future[T]:
290 """
291 Lazily create and cache an asyncio.Future for this task.
292 """
293 if self._promise is not None: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 return self._promise
296 # If we already have a result, create a pre-resolved future
297 if self.result is not None:
298 loop = asyncio.get_running_loop()
299 future: asyncio.Future[T] = loop.create_future()
301 if isinstance(self.result, Success): 301 ↛ 304line 301 didn't jump to line 304 because the condition on line 301 was always true
302 future.set_result(self.result.value)
303 else:
304 future.set_exception(self.result.error)
306 self._promise = future
307 return future
309 # Otherwise, create a future and wire up handlers
310 loop = asyncio.get_running_loop()
311 future = loop.create_future()
313 # Store original handlers
314 original_onsuccess = self.handler.onsuccess
315 original_onfailure = self.handler.onfailure
317 def onsuccess(value: T) -> None:
318 if not future.done(): 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true
319 future.set_result(value)
320 if original_onsuccess: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 original_onsuccess(value)
323 def onfailure(error: X) -> None:
324 if not future.done():
325 future.set_exception(error)
326 if original_onfailure: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 original_onfailure(error)
329 self.handler.onsuccess = onsuccess
330 self.handler.onfailure = onfailure
332 self._promise = future
333 return future
335 def __await__(self) -> Generator[Any, Any, T]:
336 # Get the promise and await it
337 promise = self._get_promise()
339 # Activate the task (runs synchronously in MAIN scheduler)
340 self.activate()
342 return promise.__await__()
344 def activate(self) -> Future_[T, X]:
345 """
346 Activate the task. Overriden in subclasses.
347 """
348 return self
351class Fork(Future_[T, X], Generic[T, X, M]):
352 """
353 A handle to a running task that can be awaited.
355 Implements both the generator protocol (for use within tasks) and
356 the awaitable protocol (for use in async functions).
357 """
358 def __init__(
359 self,
360 task: Task[M, T],
361 handler: Optional[StateHandler[T, X]] = None,
362 options: Optional[ForkOptions] = None,
363 ) -> None:
364 super().__init__(handler or StateHandler())
366 global ID
367 ID += 1
368 self.id = ID
369 self.name: str = "" if options is None else (options.get('name', None) or "")
370 self.task = task
371 self.state: Union[Instruction[M], StopIteration] = CURRENT
372 self.status = Status.IDLE
373 self.controller: Optional[Controller[M, T]] = None
374 self.group: Optional[Group[T, X, M]] = None
377 def resume(self) -> Task[None, None]:
378 resume(self)
379 yield None
381 def join(self) -> Task[Optional[M], T]:
382 return join(self)
384 def abort(self, error: X) -> Task[None, None]:
385 return abort(self, error)
387 def exit_(self, value: T) -> Task[None, None]:
388 return exit_(self, value)
390 def activate(self) -> Fork[T, X, M]:
391 """
392 Activate the task and enqueue it for execution.
393 """
394 # Only activate if not already active or finished
395 if self.controller is None:
396 self.controller = iter(self.task)
397 self.status = Status.ACTIVE
398 enqueue(self)
399 return self
401 def __iter__(self) -> Generator[Any, Any, Fork[T, X, M]]:
402 """
403 Make Fork iterable for use with yield from.
404 """
405 # making __iter__ a generator ensures that `yield from fork(work())` schedules
406 # the fork for concurrent execution without enabling synchronous driving of the
407 # forked task after activation and then returns immediately (since MAIN group
408 # is likely already actively being executed by the scheduler and `work` already
409 # queued in `MAIN`s active queue).
410 self.activate()
411 return self
412 yield
414 def _panic(self, error: X) -> NoReturn:
415 self.result = Failure(ok=False, error=error)
416 self.status = Status.FINISHED
417 if self.handler.onfailure is not None:
418 self.handler.onfailure(error)
419 raise error
421 def _step(self, state: Union[Instruction[M], StopIteration]) -> None:
422 self.state = state
423 #`StopIteration` signifies end of a generator in Python and holds its return
424 # value on success
425 if isinstance(state, StopIteration):
426 self.result = Success(ok=True, value=state.value)
427 self.status = Status.FINISHED
428 if self.handler.onsuccess is not None:
429 self.handler.onsuccess(state.value)
430 # `state` is not returned here like in the reference js implementation because -
431 # `state` here can be set to `StopIteration(value: T)` which signals competion
432 # in Python, but in order to keep `Fork` behaviour predictable and similar to
433 # normal generators (e.g `controller` generators) `StopIteration` isn't returned
434 # but raised. So no point in returning `state` (especially on completion),
435 # within the class or - via the generator protocol methods - outside the class.
436 # just run success handlers, update state and set success value in result.
438 def __next__(self) -> Instruction[M]:
439 try:
440 # note that: `task.send(None) == next(task)`
441 # also cast the type for `self.controller` to eliminate the type-checker
442 # from inferring that `self.controller` is still `None` after fork
443 # activation
444 state = cast(Controller[M, T], self.controller).send(None)
445 self._step(state)
446 return state
447 except StopIteration as e:
448 # `StopIteration` means task is finished, therefore task `result`, `status`
449 # can be set and success handler function ran.
450 self._step(e)
451 raise
452 except Exception as e:
453 return self._panic(e) # type: ignore[arg-type]
455 def send(self, value: Any) -> Instruction[M]:
456 try:
457 # cast the type for `self.controller` to eliminate the type-checker
458 # from inferring that `self.controller` is still `None` after fork
459 # activation
460 state = cast(Controller[M, T], self.controller).send(value)
461 self._step(state)
462 return state
463 except StopIteration as e:
464 # `StopIteration` means task is finished, therefore task `result`, `status`
465 # can be set and success handler function ran.
466 self._step(e)
467 raise
468 except Exception as e:
469 return self._panic(e) # type: ignore[arg-type]
471 def throw(self, error: Union[Exception, GeneratorExit]) -> Instruction[M]:
472 try:
473 # also cast the type for `self.controller` to eliminate the type-checker
474 # from inferring that `self.controller` is still `None` after fork
475 # activation
476 state = cast(Controller[M, T], self.controller).throw(error)
477 self._step(state)
478 return state
479 except StopIteration as e:
480 # `StopIteration` means task is finished, therefore task `result`, `status`
481 # can be set and success handler function ran.
482 self._step(e)
483 raise
484 # `GeneratorExit` is outside the scope of `Exception`. it subclasses
485 # `BaseException` instead - `Exception`'s parent class. Catch both exc.
486 except (Exception, GeneratorExit) as e:
487 return self._panic(e) # type: ignore[arg-type]
489 def return_(self, value: T) -> T:
490 try:
491 # also cast the type for `self.controller` to eliminate the type-checker
492 # from inferring that `self.controller` is still `None` after fork
493 # activation
494 cast(Controller[M, T], self.controller).throw(StopIteration(value))
495 except StopIteration as e:
496 self._step(e)
497 except Exception as e:
498 self._panic(e) # type: ignore[arg-type]
499 else:
500 # the controller yielded instead of terminating. therefore enforce close
501 cast(Controller[M, T], self.controller).close()
502 return value
504 def close(self) -> None:
505 try:
506 # self.controller.close() == self.controller.throw(GeneratorExit)
507 # also cast the type for `self.controller` to eliminate the type-checker
508 # from inferring that `self.controller` is still `None` after fork
509 # activation
510 cast(Controller[M, T], self.controller).close()
511 except (Exception) as e:
512 self._panic(e) # type: ignore[arg-type]
514 def __repr__(self) -> str:
515 return f"Fork(id={self.id}, status='{self.status}')"
517# type alias added for convenience
518TaskFork: TypeAlias = Union[Task, Fork[T, X, M]] # type: ignore[type-arg]
519ControllerFork: TypeAlias = Union[Controller, Fork[T, X, M]] # type: ignore[type-arg]
521class Main(Generic[T, X, M]):
522 """Default or Fallback Task Group."""
523 def __init__(self) -> None:
524 self.status = Status.IDLE
525 self.stack: Stack[T, X, M] = Stack()
526 self.id: Literal[0] = 0
527 self.parent: Optional[TaskGroup[T, X, M]] = None
530MAIN: Main = Main() # type: ignore[type-arg]
531"""Singleton main group"""
533# Python generator objects cannot carry arbitrary attributes like in the reference JS
534# implementation, so loop/group helpers store membership here to route resumed generators
535# back to their group.
536_GROUP_MEMBERSHIP: weakref.WeakKeyDictionary[object, Group[Any, Any, Any]] = weakref.WeakKeyDictionary()
539class TaskGroup(Generic[T, X, M]):
540 """Task group for managing concurrent tasks."""
541 def __init__(
542 self,
543 driver: ControllerFork[T, X, M],
544 active: Optional[list[ControllerFork[T, X, M]]] = None,
545 idle: Optional[set[ControllerFork[T, X, M]]] = None,
546 stack: Optional[Stack[T, X, M]] = None
547 ) -> None:
548 self.driver = driver
549 self.parent = TaskGroup.of(driver)
551 # gymnastics to align with reference JS implementation without sacrificing
552 # safety. Using mutable types as default args in Python can lead to weird errors
553 # that arise from a shared state created at definition time as opposed to each
554 # time the function is called.
555 active_eval = active if active is not None else []
556 idle_eval = idle if idle is not None else set()
557 if active is not None or idle is not None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 self.stack = Stack(active_eval, idle_eval)
559 elif stack is not None: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 self.stack = stack
561 else:
562 self.stack = Stack()
564 global ID
565 ID += 1
566 self.id = ID
568 @staticmethod
569 def of(member: ControllerFork[T, X, M]) -> Group[T, X, M]:
570 # since `Generator` objects don't have the `group` attribute if `member`
571 # is not a `Fork` then it's group is the default `MAIN` group
572 group: Optional[Group[T, X, M]] = getattr(member, 'group', None)
573 if group is not None:
574 return group
575 mapped = _GROUP_MEMBERSHIP.get(member)
576 return cast(Group[T, X, M], mapped if mapped is not None else MAIN)
579 @staticmethod
580 def enqueue(member: TaskFork[T, X, M], group: TaskGroup[T, X, M]) -> None:
581 try:
582 member.group = group # type: ignore[union-attr]
583 except AttributeError:
584 _GROUP_MEMBERSHIP[member] = group
585 finally:
586 group.stack.active.append(member)
589Group = Union[TaskGroup[T, X, M], Main[T, X, M]]
591def enqueue(task: ControllerFork[T, X, M]) -> None:
592 group = TaskGroup.of(task)
594 group.stack.active.append(task)
595 group.stack.idle.discard(task)
597 # then walk up the group chain and unblock their driver tasks
598 while group.parent is not None:
599 idle = group.parent.stack.idle
600 active = group.parent.stack.active
602 # only to appease type checkers, since `MAIN` doesn't have a driver and `MAIN`
603 # has a parent of `None` so this loop won't run in that case
604 driver = getattr(group, "driver", None)
605 if driver is not None and driver in idle:
606 idle.remove(driver)
607 active.append(driver)
608 else:
609 # if driver was not blocked it must have been unblocked by other task so
610 # stop there
611 break
613 # crawl up to the parent group
614 group = group.parent
616 if MAIN.status == Status.IDLE:
617 MAIN.status = Status.ACTIVE
618 while True:
619 try:
620 for _ in step(MAIN):
621 pass
622 MAIN.status = Status.IDLE
623 break
624 except Exception:
625 # Top level task may crash and throw an error, but given this is a main
626 # group we do not want to interrupt other unrelated tasks, which is why
627 # we discard the error and the task that caused it.
628 MAIN.stack.active.pop(0)
630def step(group: Group[T, X, M]) -> Generator[M, Any, None]:
631 active = group.stack.active
632 task = active[0] if active else None
633 if task:
634 group.stack.idle.discard(task)
635 while task:
636 # Keep processing instructions until task is done, sends a `SUSPEND` request or
637 # it has been removed from the active queue.
638 # ⚠️ Group changes require extra care so please make sure to understand the
639 # detail here. It occurs when a spawned task(s) are joined into a group which
640 # will change the driver and the group the task belongs to, that is when the
641 # conditional statement: `task == active[0]` will become false and the task
642 # would need to be dropped immediately otherwise race dondition will occur due
643 # to task been driven by multiple concurrent schedulers.
644 try:
645 input_value = None
646 while task == active[0]: 646 ↛ 665line 646 didn't jump to line 665 because the condition on line 646 was always true
647 instruction = task.send(input_value)
648 if instruction is SUSPEND:
649 group.stack.idle.add(task)
650 break
651 # if task requested a context (which is usually to suspend itself) pass back
652 # a task reference and continue.
653 if instruction is CURRENT:
654 input_value = task
655 continue
656 # otherwise task sent a message which we yield to the driver and
657 # continue
658 input_value = yield instruction # type: ignore[misc]
659 continue
660 except StopIteration:
661 # task finished
662 pass
664 # if task is complete or got suspended we move to a next task
665 if active and active[0] == task: 665 ↛ 668line 665 didn't jump to line 668 because the condition on line 665 was always true
666 active.pop(0)
668 task = active[0] if active else None
669 if task:
670 group.stack.idle.discard(task)
672def spawn(task: Task[None, None]) -> Task[None, None]:
673 """
674 Executes a given task concurrently with a current task (task that spawned it).
675 Spawned task is detached from the task that spawned it and it can outlive it and/or
676 fail without affecting a task that spawned it. If you need to wait on a concurrent
677 task completion consider using `fork` instead which can later be `join`ed. If you
678 just want a task to block on another task's execution you can just use:
679 `yield from work()` directly instead.
680 """
681 main(task)
682 return
683 yield
685def fork(task: Task[M, T], options: ForkOptions | None = None) -> Fork[T, Exception, M]:
686 """
687 Executes a given task concurrently with current task (the task that initiated fork)
688 Forked task is detached from the task that created it and it can outlive it and /
689 or fail without affecting it. You do however get a handle for the fork which could
690 used to `join` the task, in which case `join`ing task would block until fork
691 finishes execution.
693 This is also a primary interface for executing tasks from the outside of the task
694 context. Function returns `Fork` which implements `Future` interface so it can be
695 awaited. Please note that calling `fork` does not really do anything, it lazily
696 starts execution when you either `await fork(work())` from arbitrary context or
697 `yield from fork(work())` in another task context.
698 """
699 return Fork(task, options=options)
701def current() -> Generator[CurrentInstruction, Controller[M, T], Controller[M, T]]:
702 return (yield CURRENT)
704def suspend() -> Generator[SuspendInstruction, Any, None]:
705 yield SUSPEND
707def resume(task: Controller[M, T] | Fork[T, X, M]) -> None:
708 enqueue(task)
710def conclude(
711 handle: ControllerFork[T, X, M],
712 result: Result[T, Exception]
713) -> Task[None, None]:
714 """
715 Concludes a given task with a result (either success `value` `T` or `error` `X`)
717 Args:
718 handle: Task controller
719 result: Success or failure result
720 """
721 try:
722 task = handle
723 if isinstance(result, Success):
724 try:
725 # force the generator to end and return with `result.value`
726 state = task.throw(StopIteration(result.value))
727 except StopIteration:
728 return
729 elif isinstance(result, Failure): 729 ↛ 737line 729 didn't jump to line 737 because the condition on line 729 was always true
730 state = task.throw(result.error)
731 while state is CURRENT:
732 state = task.send(task)
733 except Exception:
734 pass
735 else:
736 # incase `task` has a `finally` block that still yields values into `state`
737 if state is SUSPEND: 737 ↛ 740line 737 didn't jump to line 740 because the condition on line 737 was always true
738 idle = TaskGroup.of(task).stack.idle
739 idle.add(task)
740 elif state is not None:
741 enqueue(task)
742 return
743 yield
746def abort(handle: ControllerFork[T, X, M], error: Exception) -> Task[None, None]:
747 """
748 Aborts given task with an error. Task error type should match provided error.
750 Args:
751 handle: Task controller to abort
752 error: Error to throw into the task
753 """
754 yield from conclude(handle, Failure(error))
757def exit_(handle: ControllerFork[T, X, M], value: Any) -> Task[None, None]:
758 """
759 Exits a task successfully with a return value.
761 Args:
762 handle: Task controller to exit
763 value: Return value on exit
764 """
765 yield from conclude(handle, Success(value))
768def terminate(handle: ControllerFork[None, X, M]) -> Task[None, None]:
769 """
770 Terminates a task (only for tasks with void return type). If your task has a
771 non-`void` return type you should use `exit` instead.
773 Args:
774 handle: Task controller to terminate
775 """
776 yield from conclude(handle, Success(value=None))
779def group(forks: list[Fork[T, X, M]]) -> Task[Optional[Instruction[M]], None]:
780 """
781 Groups multiple forks together and joins them with current task.
782 """
783 # abort early if there's no work to do
784 if len(forks) == 0:
785 return
787 self_: Controller[Any, Any] = yield from current()
788 group: TaskGroup[T, X, M] = TaskGroup(self_)
789 failure: Optional[Failure[X]] = None
791 for fork in forks:
792 result = fork.result
793 if result is not None:
794 # only the first error should be recorded, so `failure` has to be `None`
795 if not result.ok and failure is None:
796 failure = cast(Failure, result) # type: ignore[type-arg]
797 continue
798 move(fork, group)
800 # keep work looping until there is no more work to be done
801 try:
802 # raise the exception that caused the first recorded failure result
803 if failure:
804 raise failure.error
805 while True:
806 # blocks the calling task: `self_`, to run all tasks in the active queue of
807 # the group to completion
808 yield from step(group)
809 # but there might be suspended tasks in `group.stack.idle`
810 if Stack.size(group.stack) > 0:
811 # if there are grouped forked tasks that are suspended, then suspend
812 # driver too.
813 # NOTE: that `enqueue()` resumes the driver when suspended forked task
814 # resumes. since `enqueue()` unblocks the drivers of a task's group
815 # before starting the scheduler and processing the resumed task in it.
816 yield from suspend()
817 else:
818 break
819 except Exception as error:
820 # only iterate over a copy of active/idle queue to be safe
821 for task in list(group.stack.active):
822 yield from abort(task, error)
824 for task in list(group.stack.idle):
825 yield from abort(task, error)
826 enqueue(task) # `conclude` might add idle tasks back into `idle` queue
828 raise error
831def move(fork: Fork[T, X, M], group: TaskGroup[T, X, M]) -> None:
832 """Move a fork from one group to another."""
833 from_ = TaskGroup.of(fork)
834 if from_ is not group: 834 ↛ exitline 834 didn't return from function 'move' because the condition on line 834 was always true
835 active, idle = (from_.stack.active, from_.stack.idle)
836 target = group.stack
837 fork.group = group
838 # if it is idle just move from one group to the other and update the group task
839 # thinks it belongs to.
840 if fork in idle:
841 idle.remove(fork)
842 target.idle.add(fork)
843 elif fork in active: 843 ↛ exitline 843 didn't return from function 'move' because the condition on line 843 was always true
844 index = active.index(fork)
845 # if task is in the job queue, we move it to a target job queue. Moving top
846 # task in the queue requires extra care so it does not end up processed by
847 # two groups which would lead to race. For that reason `step` loop checks
848 # checks for group changes on each turn
849 if index >= 0: 849 ↛ exitline 849 didn't return from function 'move' because the condition on line 849 was always true
850 active.pop(index)
851 target.active.append(fork)
852 # otherwise task is complete
854def join(fork: Fork[T, X, M]) -> Task[Optional[Instruction[M]], T]:
855 """
856 Joins a forked task back into the current task.
858 Suspends the current task until the fork completes, then resumes with its result.
859 If the fork fails, the error is thrown.
861 Args:
862 fork_obj: The fork to join
864 Returns:
865 The fork's result
867 Raises:
868 The fork's error if it failed
869 """
870 if fork.status == Status.IDLE:
871 yield from fork
873 # if fork didn't complete, process `fork` in the scheduler and block until
874 # completion
875 if fork.result is None:
876 yield from group([fork])
878 result: Result[T, X] = fork.result # type: ignore[assignment]
879 if isinstance(result, Success): 879 ↛ 881line 879 didn't jump to line 881 because the condition on line 879 was always true
880 return result.value
881 raise result.error
883def send(message: M) -> Effect[M]:
884 """
885 Task that sends a given message (or rather an effect producing this message).
886 Please note, that while you could use `yield mesage` instead, the reference
887 implementation for this library written in TS had risks of breaking changes in the
888 TS generator inference which could enable a replacement for `yield *`.
889 For uniformity purposes, we decided to stick with the same approach for the Python
890 implementation as well.
891 """
892 yield message
894def effect(task: Task[None, T]) -> Effect[T]:
895 """
896 Turns a task (that never fails or sends messages) into an effect of its result.
897 """
898 message = yield from task # type: ignore[misc]
899 yield from send(message)
901def loop(init: Effect[M], next_: Callable[[M], Effect[M]]) -> Task[None, None]:
902 controller: Controller[Any, Any] = yield from current()
903 group: TaskGroup[M, Any, Any] = TaskGroup(controller)
904 TaskGroup.enqueue(iter(init), group)
906 while True:
907 for msg in step(group):
908 try:
909 # incase `next` only accepts keyword args
910 effect: Effect[M] = next_(**msg) # type: ignore[call-arg]
911 except TypeError:
912 effect = next_(cast(M, msg))
913 TaskGroup.enqueue(iter(effect), group)
914 if Stack.size(group.stack) > 0:
915 yield from suspend()
916 else:
917 break
919Tag: TypeAlias = str
922class Tagger(Generic[T, X, M]):
923 def __init__(self, tags: list[str], source: Fork[T, X, M]) -> None:
924 self.tags = tags
925 self.source = source
926 self.controller: Optional[Controller] = None # type: ignore[type-arg]
928 def __iter__(self)-> Self:
929 if not self.controller: 929 ↛ 931line 929 didn't jump to line 931 because the condition on line 929 was always true
930 self.controller = iter(self.source)
931 return self
933 def box(
934 self, state: Union[Instruction[M], StopIteration]
935 ) -> Union[Control, Tagged[M]]:
936 if isinstance(state, StopIteration): 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true
937 return state.value # type: ignore[no-any-return]
939 if state is CURRENT or state is SUSPEND:
940 return state # type: ignore[return-value]
942 # tag non-control instructions. Instead of boxing result at each transform
943 # step we perform in-place mutation as we know nothing else is accessing this value.
944 tagged = state
945 for tag in self.tags:
946 tagged = with_tag(tag, tagged) # type: ignore[assignment]
947 return cast(Tagged[M], tagged)
949 def __next__(self) -> Union[Control, Tagged[M]]:
950 return self.box(next(cast(Fork[T, X, M], self.controller)))
952 def close(self) -> None:
953 cast(Fork[T, X, M], self.controller).close()
955 def send(self, instruction: Instruction[M]) -> Union[Control, Tagged[M]]:
956 return self.box(cast(Fork[T, X, M], self.controller).send(instruction))
958 def throw(self, error: Exception) -> Union[Control, Tagged[M]]:
959 return self.box(cast(Fork[T, X, M], self.controller).throw(error))
961 def return_(self, value: T) -> Union[Control, Tagged[T]]:
962 return self.box(cast(Fork[T, X, M], self.controller).return_(value)) # type: ignore[arg-type, return-value]
964 def __str__(self) -> str:
965 return "TaggedEffect"
968def _none_effect() -> Effect[None]:
969 return
970 yield # necessary for this func to be recognized as a generator that yields nothing
972NONE_: Effect[None] = _none_effect()
974def none_() -> Effect[None]:
975 """
976 Returns empty `Effect`, that is produces no messages. Kind of like `[]` or `""` but
977 for effects.
978 """
979 return NONE_
981def then_(
982 task: Task[M, T],
983 resolve: Callable[[T], U],
984 reject: Callable[[X], U],
985) -> Task[M, U]:
986 try:
987 return resolve((yield from task))
988 except Exception as e:
989 return reject(e) # type: ignore[arg-type]
991def all_(tasks: Iterable[Task[M, T]]) -> Task[Any, list[T]]:
992 """
993 Takes iterable of tasks and runs them concurrently, returning an array of results in
994 an order of the tasks (not the order of completion). If any of the tasks fail all
995 the rest are aborted and error is thrown into the calling task.
996 """
997 self_: Controller[Any, Any] = yield from current()
998 forks: list[Optional[Fork[T, Exception, None]]] = []
999 results: list[Optional[T]] = []
1000 count_ = 0
1002 def succeed(idx: int) -> Callable[[T], None]:
1003 def handler(value: T) -> None:
1004 nonlocal count_
1005 forks[idx] = None
1006 results[idx] = value
1007 count_ -= 1
1008 if count_ == 0:
1009 enqueue(self_)
1010 return handler
1012 def fail(error: Exception) -> None:
1013 for handle in forks:
1014 if handle is not None:
1015 enqueue(abort(handle, error))
1016 enqueue(abort(self_, error))
1018 for i, task in enumerate(tasks):
1019 results.append(None) # keeps the results list at a size of len(tasks)
1020 fk = (yield from fork(then_(task, succeed(i), fail)))
1021 forks.append(fk) # type: ignore[arg-type]
1022 count_ += 1
1024 if count_ > 0:
1025 yield from suspend()
1027 return cast(list[T], results)
1030Tagged: TypeAlias = dict[str, Union[Tag, M]]
1031"""
1032The dictionary is shaped like: {"type": tag, tag: value}. There is no way to express
1033that dictionary shape in Python's current type system at the time of writing this
1034implementation.
1035"""
1037def with_tag(tag: Tag, value: M) -> Tagged[M]:
1038 return {"type": tag, tag: value}
1040def tag(effect: Union[ControllerFork[T, X, M], Tagger[T, X, M]], tag: str) -> Effect[Union[Control, Tagged[M]]]:
1041 """
1042 Tags an effect by boxing each event with an object that has `type` field
1043 corresponding to the given tag and same named field holding original message e.g
1044 given `nums` effect that produces numbers, `tag(nums, "inc")` would create an effect
1045 that produces events like `{"type": "inc", "inc": 1}`
1046 """
1047 if effect is NONE_:
1048 return NONE_ # type: ignore[return-value]
1049 if isinstance(effect, Tagger):
1050 return Tagger(tags=(effect.tags + [tag]), source=effect.source) # type: ignore[return-value]
1051 return Tagger([tag], effect) # type:ignore[return-value, arg-type]
1053def listen(sources: dict[Tag, Effect[M]]) -> Effect[Union[Control, Tagged[M]]]:
1054 """
1055 Takes several effects and merges them into a single effect of tagged variants so
1056 that their source could be identified via `type` field.
1057 """
1058 forks: list[Fork] = [] # type: ignore[type-arg]
1059 for entry in sources.items():
1060 name, eff = entry
1061 if eff is not NONE_: 1061 ↛ 1059line 1061 didn't jump to line 1059 because the condition on line 1061 was always true
1062 forks.append(
1063 (yield from fork(tag(eff, name)))
1064 )
1065 yield from group(forks) # type: ignore[misc]
1067def batch(effects: list[Effect[T]]) -> Effect[T]:
1068 """
1069 Takes several effects and combines them into one effect
1070 """
1071 forks: list[Fork] = [] # type: ignore[type-arg]
1072 for eff in effects:
1073 forks.append((yield from fork(eff)))
1074 yield from group(forks) # type: ignore[misc]
1076def effects(tasks: list[Task[None, T]]) -> Effect[Optional[T]]:
1077 """
1078 Takes several tasks and creates an effect of them all.
1079 """
1080 if tasks:
1081 return batch([effect(task) for task in tasks])
1082 return NONE_