- commit
- b34bac8ed1f750b2fb8db0a0358826d0593a7b90
- parent
- 5b257c932b01485db2170c21116f5ef06754b76b
- Author
- Tobias Bengfort <tobias.bengfort@posteo.de>
- Date
- 2026-04-16 17:16
add ThreadPool and in_thread()
Diffstat
| A | tests/test_threads.py | 30 | ++++++++++++++++++++++++++++++ |
| M | xiio/__init__.py | 2 | ++ |
| A | xiio/threads.py | 72 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
3 files changed, 104 insertions, 0 deletions
diff --git a/tests/test_threads.py b/tests/test_threads.py
@@ -0,0 +1,30 @@ -1 1 import time -1 2 -1 3 import xiio -1 4 from tests.utils import XiioTestCase -1 5 -1 6 -1 7 class TestThreads(XiioTestCase): -1 8 async def test_in_thread(self): -1 9 with self.assert_duration(0.1): -1 10 await xiio.gather([ -1 11 xiio.in_thread(time.sleep, 0.1), -1 12 xiio.in_thread(time.sleep, 0.1), -1 13 xiio.in_thread(time.sleep, 0.1), -1 14 ]) -1 15 -1 16 async def test_in_thread_exception(self): -1 17 def raise_error(): -1 18 raise NotImplementedError -1 19 -1 20 with self.assertRaises(NotImplementedError): -1 21 await xiio.in_thread(raise_error) -1 22 -1 23 async def test_max_workers(self): -1 24 with self.assert_duration(0.2): -1 25 with xiio.ThreadPool(max_workers=2) as tp: -1 26 await xiio.gather([ -1 27 tp.run(time.sleep, 0.1), -1 28 tp.run(time.sleep, 0.1), -1 29 tp.run(time.sleep, 0.1), -1 30 ])
diff --git a/xiio/__init__.py b/xiio/__init__.py
@@ -8,6 +8,8 @@ from .core import CancelledError # noqa 8 8 from .multiplex import TaskGroup # noqa 9 9 from .multiplex import gather # noqa 10 10 from .multiplex import timeout # noqa -1 11 from .threads import ThreadPool # noqa -1 12 from .threads import in_thread # noqa 11 13 from .subprocess import run_process # noqa 12 14 from .signals import subscribe_signals # noqa 13 15 from .compat import on_asyncio # noqa
diff --git a/xiio/threads.py b/xiio/threads.py
@@ -0,0 +1,72 @@
-1 1 import os
-1 2 import queue
-1 3 import threading
-1 4 import typing
-1 5 from collections.abc import Callable
-1 6
-1 7 from .core import READ
-1 8 from .core import Condition
-1 9 from .core import Future
-1 10
-1 11 T = typing.TypeVar('T')
-1 12
-1 13
-1 14 async def future_with_fd(future: Future[T], fd: int) -> T:
-1 15 while not future.done:
-1 16 await Condition(files={fd: READ})
-1 17 os.read(fd, 1)
-1 18 return await future
-1 19
-1 20
-1 21 class ThreadPool:
-1 22 def __init__(self, max_workers: int | None = None):
-1 23 self.max_workers = max_workers or min(32, (os.cpu_count() or 1) + 4)
-1 24 self.queue = queue.Queue()
-1 25 self.workers = []
-1 26
-1 27 def __enter__(self) -> 'ThreadPool':
-1 28 self.r, self.w = os.pipe()
-1 29 return self
-1 30
-1 31 def __exit__(self, *args, **kwargs):
-1 32 for worker in self.workers:
-1 33 worker.join()
-1 34 os.close(self.r)
-1 35 os.close(self.w)
-1 36
-1 37 def _worker(self) -> None:
-1 38 while True:
-1 39 try:
-1 40 fn = self.queue.get(block=False)
-1 41 except queue.Empty:
-1 42 break
-1 43 fn()
-1 44 self.queue.task_done()
-1 45
-1 46 async def run(self, fn: Callable[..., T], *args, **kwargs) -> T:
-1 47 future = Future()
-1 48
-1 49 def wrapper() -> None:
-1 50 try:
-1 51 result = fn(*args, **kwargs)
-1 52 except Exception as e:
-1 53 future.set_exception(e)
-1 54 else:
-1 55 future.set_result(result)
-1 56 finally:
-1 57 os.write(self.w, b'\0')
-1 58
-1 59 self.queue.put(wrapper)
-1 60
-1 61 self.workers = [w for w in self.workers if w.is_alive()]
-1 62 if len(self.workers) < self.max_workers:
-1 63 worker = threading.Thread(target=self._worker)
-1 64 self.workers.append(worker)
-1 65 worker.start()
-1 66
-1 67 return await future_with_fd(future, self.r)
-1 68
-1 69
-1 70 async def in_thread(fn: Callable[..., T], *args, **kwargs) -> T:
-1 71 with ThreadPool() as pool:
-1 72 return await pool.run(fn, *args, **kwargs)