- commit
- 47d701f6d787451e3cbb15c5ef1f6dd28ffb617e
- parent
- 2ca58a870010e789abb171a8cba9f0547fa1d920
- Author
- Tobias Bengfort <tobias.bengfort@posteo.de>
- Date
- 2026-02-26 22:15
add gather() for multiplexing
Diffstat
| M | tests.py | 99 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- |
| M | xiio.py | 72 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- |
2 files changed, 165 insertions, 6 deletions
diff --git a/tests.py b/tests.py
@@ -8,6 +8,24 @@ from unittest import mock 8 8 import xiio 9 9 10 10 -1 11 async def return_later(seconds, value): -1 12 await xiio.sleep(seconds) -1 13 return value -1 14 -1 15 -1 16 async def raise_later(seconds, exc): -1 17 await xiio.sleep(seconds) -1 18 raise exc -1 19 -1 20 -1 21 def interrupt_select(self): -1 22 timeout = self.time - time.monotonic() -1 23 if timeout > 0: -1 24 time.sleep(timeout / 2) -1 25 raise KeyboardInterrupt -1 26 return {} -1 27 -1 28 11 29 class XiioTestCase(unittest.TestCase): 12 30 def _callTestMethod(self, method): # noqa 13 31 if not inspect.iscoroutinefunction(method): @@ -105,6 +123,85 @@ class TestFuture(XiioTestCase): 105 123 await future 106 124 107 125 -1 126 class TestGather(XiioTestCase): -1 127 async def test_sync_values(self): -1 128 async def return_immediately(value): -1 129 return value -1 130 -1 131 with self.assert_duration(0): -1 132 result = await xiio.gather([ -1 133 return_immediately(1), -1 134 return_immediately(2), -1 135 ]) -1 136 self.assertEqual(result, [1, 2]) -1 137 -1 138 async def test_async_values(self): -1 139 with self.assert_duration(0.2): -1 140 result = await xiio.gather([ -1 141 return_later(0.2, 1), -1 142 return_later(0.1, 2), -1 143 ]) -1 144 self.assertEqual(result, [1, 2]) -1 145 -1 146 async def test_raise_on_error(self): -1 147 with self.assertRaises(ValueError): -1 148 with self.assert_duration(0.2): -1 149 await xiio.gather([ -1 150 return_later(0.1, 1), -1 151 raise_later(0.2, ValueError), -1 152 ]) -1 153 -1 154 async def test_cancel_others_on_error(self): -1 155 with self.assertRaises(ValueError): -1 156 with self.assert_duration(0.1): -1 157 await xiio.gather([ -1 158 return_later(0.2, 1), -1 159 raise_later(0.1, ValueError), -1 160 ]) -1 161 -1 162 async def test_cleanup_others_on_error(self): -1 163 async def foo(): -1 164 try: -1 165 await xiio.sleep(0.2) -1 166 finally: -1 167 await xiio.sleep(0.2) -1 168 -1 169 with self.assertRaises(ValueError): -1 170 with self.assert_duration(0.3): -1 171 await xiio.gather([ -1 172 foo(), -1 173 raise_later(0.1, ValueError), -1 174 ]) -1 175 -1 176 async def test_swallow_exceptions_during_cancellation(self): -1 177 async def foo(): -1 178 try: -1 179 await xiio.sleep(0.3) -1 180 finally: -1 181 raise ValueError -1 182 -1 183 with self.assertRaises(TypeError): -1 184 with self.assert_duration(0.1): -1 185 await xiio.gather([ -1 186 foo(), -1 187 raise_later(0.1, TypeError), -1 188 ]) -1 189 -1 190 async def test_cleanup_on_error_while_paused(self): -1 191 stack = [] -1 192 -1 193 async def foo(): -1 194 try: -1 195 await xiio.sleep(0.1) -1 196 finally: -1 197 stack.append(1) -1 198 -1 199 with mock.patch('xiio.Condition.select', new=interrupt_select): -1 200 with self.assertRaises(KeyboardInterrupt): -1 201 await xiio.gather([foo()]) -1 202 self.assertEqual(stack, [1]) -1 203 -1 204 108 205 class TestRun(XiioTestCase): 109 206 def test_sleep(self): 110 207 async def foo(): @@ -124,7 +221,7 @@ class TestRun(XiioTestCase): 124 221 finally: 125 222 stack.append(1) 126 223127 -1 with mock.patch('xiio.Condition.select', side_effect=KeyboardInterrupt):-1 224 with mock.patch('xiio.Condition.select', new=interrupt_select): 128 225 with self.assertRaises(KeyboardInterrupt): 129 226 xiio.run(foo()) 130 227 self.assertEqual(stack, [1])
diff --git a/xiio.py b/xiio.py
@@ -14,6 +14,10 @@ Gen = Generator['Condition', Files, T] 14 14 Coro = Coroutine['Condition', Files, T] 15 15 16 16 -1 17 class CancelledError(BaseException): -1 18 pass -1 19 -1 20 17 21 class Condition: 18 22 def __init__( 19 23 self, @@ -105,16 +109,74 @@ class Future(typing.Generic[T]): 105 109 return typing.cast(T, self.result) 106 110 107 111 -1 112 class Task(typing.Generic[T]): -1 113 def __init__(self, gen: Gen[T]): -1 114 self.gen = gen -1 115 self._condition: Condition | None = None -1 116 self.result: T | None = None -1 117 self._cancel_soon: bool = False -1 118 -1 119 @property -1 120 def condition(self) -> Condition: -1 121 return self._condition or Condition(time=-math.inf) -1 122 -1 123 def resume(self, state: Files | BaseException) -> None: -1 124 if self._cancel_soon: -1 125 self._cancel_soon = False -1 126 self._condition = self.gen.throw(CancelledError()) -1 127 elif isinstance(state, BaseException): -1 128 self._condition = self.gen.throw(state) -1 129 elif not self._condition: -1 130 self._condition = next(self.gen) -1 131 elif self.condition.fulfilled(state): -1 132 self._condition = self.gen.send(state) -1 133 -1 134 def cancel(self) -> None: -1 135 self._cancel_soon = True -1 136 self._condition = None -1 137 -1 138 -1 139 async def gather(coros: list[Coro[T]]) -> list[T]: -1 140 tasks = [Task(coro.__await__()) for coro in coros] -1 141 remaining = tasks[:] -1 142 exc = None -1 143 -1 144 while remaining: -1 145 try: -1 146 state = await Condition.combine( -1 147 [task.condition for task in remaining] -1 148 ) -1 149 except BaseException as e: -1 150 state = e -1 151 -1 152 for task in remaining[:]: -1 153 try: -1 154 task.resume(state) -1 155 except StopIteration as e: -1 156 remaining.remove(task) -1 157 task.result = e.value -1 158 except BaseException as e: -1 159 remaining.remove(task) -1 160 if not exc: -1 161 exc = e -1 162 for task in remaining: -1 163 task.cancel() -1 164 -1 165 if exc: -1 166 raise exc -1 167 -1 168 return [typing.cast(T, task.result) for task in tasks] -1 169 -1 170 108 171 def run(coro: Coro[T]) -> T:109 -1 gen = coro.__await__()-1 172 task = Task(coro.__await__()) 110 173 try:111 -1 condition = next(gen)112 174 while True: 113 175 try:114 -1 files = condition.select()-1 176 files = task.condition.select() 115 177 except BaseException as e:116 -1 condition = gen.throw(e)-1 178 task.resume(e) 117 179 else:118 -1 condition = gen.send(files)-1 180 task.resume(files) 119 181 except StopIteration as e: 120 182 return typing.cast(T, e.value)