- commit
- 2ca58a870010e789abb171a8cba9f0547fa1d920
- parent
- 3261d16114cc24b5d12975c008a3a87cf7f98b7c
- Author
- Tobias Bengfort <tobias.bengfort@posteo.de>
- Date
- 2026-02-26 21:31
allow to combine and check conditions We can not multiplex over multiple coroutines yet. To do that, we have to wait for a combined condition from multiple coroutines, and then decide which condition is fulfilled so we know which coroutine to resume. So here we are building the necessary infrastructure.
Diffstat
| M | tests.py | 63 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | xiio.py | 40 | +++++++++++++++++++++++++++++++--------- |
2 files changed, 94 insertions, 9 deletions
diff --git a/tests.py b/tests.py
@@ -28,6 +28,69 @@ class XiioTestCase(unittest.TestCase): 28 28 self.assertAlmostEqual(actual, expected, places=places) 29 29 30 30 -1 31 class TestConditionCombine(XiioTestCase): -1 32 def test_time(self): -1 33 result = xiio.Condition.combine([ -1 34 xiio.Condition(), -1 35 xiio.Condition(time=1), -1 36 xiio.Condition(time=3), -1 37 xiio.Condition(time=-2), -1 38 ]) -1 39 self.assertEqual(result.time, -2) -1 40 -1 41 def test_futures(self): -1 42 f1 = xiio.Future() -1 43 f2 = xiio.Future() -1 44 _f3 = xiio.Future() -1 45 -1 46 result = xiio.Condition.combine([ -1 47 xiio.Condition(), -1 48 xiio.Condition(futures={f1}), -1 49 xiio.Condition(futures={f1, f2}), -1 50 ]) -1 51 -1 52 self.assertEqual(result.futures, {f1, f2}) -1 53 -1 54 def test_files(self): -1 55 result = xiio.Condition.combine([ -1 56 xiio.Condition(), -1 57 xiio.Condition(files={1: xiio.READ, 2: xiio.READ}), -1 58 xiio.Condition(files={1: xiio.WRITE}), -1 59 ]) -1 60 -1 61 self.assertEqual(result.files, { -1 62 1: xiio.READ|xiio.WRITE, -1 63 2: xiio.READ, -1 64 }) -1 65 -1 66 -1 67 class TestConditionFulfilled(XiioTestCase): -1 68 def test_files(self): -1 69 condition = xiio.Condition(files={1: xiio.READ}) -1 70 self.assertTrue(condition.fulfilled({1: xiio.READ, 2: xiio.READ})) -1 71 -1 72 def test_files_wrong_mode(self): -1 73 condition = xiio.Condition(files={1: xiio.READ}) -1 74 self.assertFalse(condition.fulfilled({1: xiio.WRITE, 2: xiio.READ})) -1 75 -1 76 def test_future_not_done(self): -1 77 future = xiio.Future() -1 78 condition = xiio.Condition(futures={future}) -1 79 self.assertFalse(condition.fulfilled({})) -1 80 -1 81 def test_future_result(self): -1 82 future = xiio.Future() -1 83 future.set_result(1) -1 84 condition = xiio.Condition(futures={future}) -1 85 self.assertTrue(condition.fulfilled({})) -1 86 -1 87 def test_future_exception(self): -1 88 future = xiio.Future() -1 89 future.set_exception(ValueError) -1 90 condition = xiio.Condition(futures={future}) -1 91 self.assertTrue(condition.fulfilled({})) -1 92 -1 93 31 94 class TestFuture(XiioTestCase): 32 95 async def test_set_result(self): 33 96 future = xiio.Future()
diff --git a/xiio.py b/xiio.py
@@ -10,8 +10,8 @@ from selectors import EVENT_WRITE as WRITE 10 10 11 11 T = typing.TypeVar('T') 12 12 Files = dict[int, int]13 -1 Gen = Generator['Condition', None, T]14 -1 Coro = Coroutine['Condition', None, T]-1 13 Gen = Generator['Condition', Files, T] -1 14 Coro = Coroutine['Condition', Files, T] 15 15 16 16 17 17 class Condition: @@ -26,17 +26,39 @@ class Condition: 26 26 self.futures = futures or set() 27 27 self.time = time 28 2829 -1 def __await__(self) -> Gen[None]:30 -1 yield self31 -132 -1 def select(self) -> None:-1 29 def __await__(self) -> Gen[Files]: -1 30 return (yield self) -1 31 -1 32 @classmethod -1 33 def combine(cls, conditions: list['Condition']) -> 'Condition': -1 34 result = cls() -1 35 for condition in conditions: -1 36 for fileno, events in condition.files.items(): -1 37 result.files.setdefault(fileno, 0) -1 38 result.files[fileno] |= events -1 39 result.futures |= condition.futures -1 40 result.time = min(result.time, condition.time) -1 41 return result -1 42 -1 43 def fulfilled(self, files: Files) -> bool: -1 44 return ( -1 45 self.time <= time.monotonic() -1 46 or any(future.done for future in self.futures) -1 47 or any( -1 48 files.get(fileno, 0) & events -1 49 for fileno, events in self.files.items() -1 50 ) -1 51 ) -1 52 -1 53 def select(self) -> Files: 33 54 timeout = self.time - time.monotonic() 34 55 if any(future.done for future in self.futures): 35 56 timeout = 0 36 57 with selectors.DefaultSelector() as sel: 37 58 for fileno, events in self.files.items(): 38 59 sel.register(fileno, events)39 -1 sel.select(None if timeout == math.inf else timeout)-1 60 selected = sel.select(None if timeout == math.inf else timeout) -1 61 return {key.fd: events for key, events in selected} 40 62 41 63 42 64 async def sleep(seconds: float) -> None: @@ -89,10 +111,10 @@ def run(coro: Coro[T]) -> T: 89 111 condition = next(gen) 90 112 while True: 91 113 try:92 -1 condition.select()-1 114 files = condition.select() 93 115 except BaseException as e: 94 116 condition = gen.throw(e) 95 117 else:96 -1 condition = next(gen)-1 118 condition = gen.send(files) 97 119 except StopIteration as e: 98 120 return typing.cast(T, e.value)