xiio

really simple async runtime
git clone https://git.ce9e.org/xiio.git

commit
37911ea6ed293cd987fad904b07ab4b027857604
parent
e8447dd8daf816df9be346b112c65e57e538fcd5
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2026-04-12 13:36
add run_process()

Mostly the same interface as `subprocess.run()` from the standard
library, just async.

Uses a pidfd to integrate into the event loop, so it is linux-specific.

https://docs.python.org/3/library/subprocess.html
https://docs.python.org/3/library/asyncio-subprocess.html
https://trio.readthedocs.io/en/stable/reference-io.html#trio.Process

Diffstat

A tests/test_subprocess.py 25 +++++++++++++++++++++++++
M xiio/__init__.py 1 +
A xiio/subprocess.py 39 +++++++++++++++++++++++++++++++++++++++

3 files changed, 65 insertions, 0 deletions


diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py

@@ -0,0 +1,25 @@
   -1     1 import subprocess
   -1     2 
   -1     3 import xiio
   -1     4 from tests.utils import XiioTestCase
   -1     5 
   -1     6 
   -1     7 class TestRunProcess(XiioTestCase):
   -1     8     async def test_run_process(self):
   -1     9         cmd = ['sh', '-c', 'sleep 0.1 && echo "Hello World"']
   -1    10 
   -1    11         with self.assert_duration(0.1, places=1):
   -1    12             result = await xiio.run_process(cmd, capture_output=True)
   -1    13         self.assertEqual(result.returncode, 0)
   -1    14         self.assertEqual(result.stdout, b'Hello World\n')
   -1    15 
   -1    16     async def test_check(self):
   -1    17         await xiio.run_process(['true'], check=True)
   -1    18         with self.assertRaises(subprocess.CalledProcessError):
   -1    19             await xiio.run_process(['false'], check=True)
   -1    20 
   -1    21     async def test_cancel(self):
   -1    22         with self.assert_duration(0.1, places=1):
   -1    23             with self.assertRaises(TimeoutError):
   -1    24                 async with xiio.timeout(0.1):
   -1    25                     await xiio.run_process(['sleep', '0.5'])

diff --git a/xiio/__init__.py b/xiio/__init__.py

@@ -8,3 +8,4 @@ from .core import CancelledError  # noqa
    8     8 from .multiplex import TaskGroup  # noqa
    9     9 from .multiplex import gather  # noqa
   10    10 from .multiplex import timeout  # noqa
   -1    11 from .subprocess import run_process  # noqa

diff --git a/xiio/subprocess.py b/xiio/subprocess.py

@@ -0,0 +1,39 @@
   -1     1 import os
   -1     2 import subprocess
   -1     3 import typing
   -1     4 
   -1     5 from .core import READ
   -1     6 from .core import Condition
   -1     7 
   -1     8 
   -1     9 def kill_process(proc: subprocess.Popen, timeout: float = 1) -> None:
   -1    10     proc.terminate()
   -1    11     try:
   -1    12         proc.wait(timeout)
   -1    13     except subprocess.TimeoutExpired:  # pragma: no cover
   -1    14         proc.kill()
   -1    15 
   -1    16 
   -1    17 async def run_process(
   -1    18     cmd: list[str], *, capture_output: bool = False, check: bool = False, **kwargs
   -1    19 ) -> subprocess.CompletedProcess:
   -1    20     if capture_output:
   -1    21         kwargs['stdout'] = subprocess.PIPE
   -1    22         kwargs['stderr'] = subprocess.PIPE
   -1    23 
   -1    24     with subprocess.Popen(cmd, **kwargs) as proc:
   -1    25         try:
   -1    26             # concurrency: as long as no one reaps the process (e.g. by
   -1    27             # calling `waitpid()`) it is safe to use `pidfd_open()`
   -1    28             pidfd = os.pidfd_open(proc.pid)
   -1    29             await Condition(files={pidfd: READ})
   -1    30             retcode = typing.cast(int, proc.poll())
   -1    31             stdout, stderr = proc.communicate()
   -1    32         except:
   -1    33             kill_process(proc)
   -1    34             raise
   -1    35 
   -1    36     result = subprocess.CompletedProcess(cmd, retcode, stdout, stderr)
   -1    37     if check:
   -1    38         result.check_returncode()
   -1    39     return result