- commit
- c383ca69e8b62de8bd033d2b664192d7a70ea35e
- parent
- d241b8a53cab1f54563072f953803756c32cbf02
- Author
- Tobias Bengfort <tobias.bengfort@posteo.de>
- Date
- 2026-04-11 06:56
add on_asyncio() compat function
Diffstat
| A | tests/test_compat.py | 67 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | xiio/__init__.py | 1 | + |
| A | xiio/compat.py | 59 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | xiio/core.py | 14 | ++++++++++---- |
4 files changed, 137 insertions, 4 deletions
diff --git a/tests/test_compat.py b/tests/test_compat.py
@@ -0,0 +1,67 @@ -1 1 import asyncio -1 2 import contextlib -1 3 import os -1 4 import time -1 5 import unittest -1 6 -1 7 import xiio -1 8 -1 9 -1 10 class TestOnAsyncIO(unittest.IsolatedAsyncioTestCase): -1 11 @contextlib.contextmanager -1 12 def assert_duration(self, expected, *, places=2): -1 13 start = time.monotonic() -1 14 try: -1 15 yield -1 16 finally: -1 17 actual = time.monotonic() - start -1 18 self.assertAlmostEqual(actual, expected, places=places) -1 19 -1 20 async def test_sleep(self): -1 21 with self.assert_duration(0.1): -1 22 await xiio.on_asyncio(xiio.sleep(0.1)) -1 23 -1 24 async def test_sleep_both(self): -1 25 with self.assert_duration(0.1, places=1): -1 26 await asyncio.gather( -1 27 asyncio.sleep(0.1), -1 28 xiio.on_asyncio(xiio.sleep(0.1)), -1 29 ) -1 30 -1 31 async def test_cancel(self): -1 32 with self.assert_duration(0.1): -1 33 with self.assertRaises(TimeoutError): -1 34 async with asyncio.timeout(0.1): -1 35 await xiio.on_asyncio(xiio.sleep(0.5)) -1 36 -1 37 async def test_read(self): -1 38 loop = asyncio.get_running_loop() -1 39 r, w = os.pipe() -1 40 try: -1 41 def on_write(): -1 42 os.write(w, b'foo') -1 43 loop.remove_writer(w) -1 44 -1 45 loop.add_writer(w, on_write) -1 46 result = await xiio.on_asyncio(xiio.read(r, 10)) -1 47 self.assertEqual(result, b'foo') -1 48 finally: -1 49 os.close(r) -1 50 os.close(w) -1 51 -1 52 async def test_write(self): -1 53 loop = asyncio.get_running_loop() -1 54 future = loop.create_future() -1 55 r, w = os.pipe() -1 56 try: -1 57 def on_read(): -1 58 future.set_result(os.read(r, 10)) -1 59 loop.remove_reader(r) -1 60 -1 61 loop.add_reader(r, on_read) -1 62 await xiio.on_asyncio(xiio.write(w, b'foo')) -1 63 result = await future -1 64 self.assertEqual(result, b'foo') -1 65 finally: -1 66 os.close(r) -1 67 os.close(w)
diff --git a/xiio/__init__.py b/xiio/__init__.py
@@ -10,3 +10,4 @@ from .multiplex import gather # noqa 10 10 from .multiplex import timeout # noqa 11 11 from .subprocess import run_process # noqa 12 12 from .signals import subscribe_signals # noqa -1 13 from .compat import on_asyncio # noqa
diff --git a/xiio/compat.py b/xiio/compat.py
@@ -0,0 +1,59 @@
-1 1 import asyncio
-1 2 import os
-1 3 import typing
-1 4
-1 5 from .core import READ
-1 6 from .core import WRITE
-1 7 from .core import Coro
-1 8 from .core import Files
-1 9 from .core import Task
-1 10
-1 11 T = typing.TypeVar('T')
-1 12
-1 13
-1 14 async def asyncio_select(files: Files, timeout: float | None) -> Files:
-1 15 if timeout is not None and timeout <= 0:
-1 16 return {}
-1 17
-1 18 loop = asyncio.get_running_loop()
-1 19 future = loop.create_future()
-1 20
-1 21 def callback(fd, events):
-1 22 if not future.done():
-1 23 future.set_result({fd: events})
-1 24
-1 25 # duplicate file descriptors so we don't remove existing callbacks
-1 26 dups = {fd: os.dup(fd) for fd in files}
-1 27
-1 28 for fd, events in files.items():
-1 29 if events & READ:
-1 30 loop.add_reader(dups[fd], callback, fd, READ)
-1 31 if events & WRITE:
-1 32 loop.add_writer(dups[fd], callback, fd, WRITE)
-1 33
-1 34 try:
-1 35 async with asyncio.timeout(timeout):
-1 36 return await future
-1 37 except TimeoutError:
-1 38 return {}
-1 39 finally:
-1 40 for dup in dups.values():
-1 41 loop.remove_reader(dup)
-1 42 loop.remove_writer(dup)
-1 43 os.close(dup)
-1 44
-1 45
-1 46 async def on_asyncio(coro: Coro[T]) -> T:
-1 47 task = Task(coro.__await__())
-1 48 try:
-1 49 while True:
-1 50 try:
-1 51 files = await asyncio_select(
-1 52 task.condition.files, task.condition.timeout
-1 53 )
-1 54 except BaseException as e:
-1 55 task.resume(e)
-1 56 else:
-1 57 task.resume(files)
-1 58 except StopIteration as e:
-1 59 return typing.cast(T, e.value)
diff --git a/xiio/core.py b/xiio/core.py
@@ -33,6 +33,15 @@ class Condition: 33 33 def __await__(self) -> Gen[Files]: 34 34 return (yield self) 35 35 -1 36 @property -1 37 def timeout(self) -> float | None: -1 38 if any(future.done for future in self.futures): -1 39 return 0 -1 40 elif self.time == math.inf: -1 41 return None -1 42 else: -1 43 return self.time - time.monotonic() -1 44 36 45 @classmethod 37 46 def combine(cls, conditions: list['Condition']) -> 'Condition': 38 47 result = cls() @@ -55,13 +64,10 @@ class Condition: 55 64 ) 56 65 57 66 def select(self) -> Files:58 -1 timeout = self.time - time.monotonic()59 -1 if any(future.done for future in self.futures):60 -1 timeout = 061 67 with selectors.DefaultSelector() as sel: 62 68 for fileno, events in self.files.items(): 63 69 sel.register(fileno, events)64 -1 selected = sel.select(None if timeout == math.inf else timeout)-1 70 selected = sel.select(self.timeout) 65 71 return {key.fd: events for key, events in selected} 66 72 67 73