python_async_loops

Eight different ways to implement an asyncronous loop in python  http://tobib.spline.de/xi/posts/2023-01-29-python-async-loops/
git clone https://git.ce9e.org/python_async_loops.git

commit
e771b338f9273795c40f4eb0369d82c33649ce8b
parent
337621fb83005640c4cfe4dbe47aa7529170ec27
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2023-01-29 15:29
add implementations

Diffstat

A 01_blocking_loop.py 47 +++++++++++++++++++++++++++++++++++++++++++++++
A 02_busy_loop.py 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
A 03_sleepy_loop.py 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
A 04_select_loop.py 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++
A 05_callback_loop.py 106 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
A 06_generator_loop.py 132 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
A 07_async_await_loop.py 141 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
A random.sh 5 +++++

8 files changed, 603 insertions, 0 deletions


diff --git a/01_blocking_loop.py b/01_blocking_loop.py

@@ -0,0 +1,47 @@
   -1     1 import datetime
   -1     2 import os
   -1     3 import subprocess
   -1     4 
   -1     5 
   -1     6 class LineReader:
   -1     7     def __init__(self, file):
   -1     8         self.file = file
   -1     9         self.buffer = b''
   -1    10         self.line = ''
   -1    11 
   -1    12     def read_line(self):
   -1    13         chunk = os.read(self.file.fileno(), 1024)
   -1    14         if not chunk:
   -1    15             raise ValueError
   -1    16         self.buffer += chunk
   -1    17         lines = self.buffer.split(b'\n')
   -1    18         if len(lines) > 1:
   -1    19             self.line = lines[-2].decode('utf-8')
   -1    20         self.buffer = lines[-1]
   -1    21 
   -1    22 
   -1    23 def cleanup():
   -1    24     proc1.terminate()
   -1    25     proc2.terminate()
   -1    26     proc1.wait()
   -1    27     proc2.wait()
   -1    28 
   -1    29 
   -1    30 def render():
   -1    31     now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1    32     print(' '.join([now, reader1.line, reader2.line]))
   -1    33 
   -1    34 
   -1    35 proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    36 proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    37 
   -1    38 reader1 = LineReader(proc1.stdout)
   -1    39 reader2 = LineReader(proc2.stdout)
   -1    40 
   -1    41 try:
   -1    42     while True:
   -1    43         for reader in [reader1, reader2]:
   -1    44             reader.read_line()
   -1    45         render()
   -1    46 finally:
   -1    47     cleanup()

diff --git a/02_busy_loop.py b/02_busy_loop.py

@@ -0,0 +1,59 @@
   -1     1 import datetime
   -1     2 import fcntl
   -1     3 import os
   -1     4 import subprocess
   -1     5 
   -1     6 
   -1     7 class LineReader:
   -1     8     def __init__(self, file):
   -1     9         self.file = file
   -1    10         self.buffer = b''
   -1    11         self.line = ''
   -1    12 
   -1    13     def read_line(self):
   -1    14         chunk = os.read(self.file.fileno(), 1024)
   -1    15         if not chunk:
   -1    16             raise ValueError
   -1    17         self.buffer += chunk
   -1    18         lines = self.buffer.split(b'\n')
   -1    19         if len(lines) > 1:
   -1    20             self.line = lines[-2].decode('utf-8')
   -1    21         self.buffer = lines[-1]
   -1    22 
   -1    23 
   -1    24 def cleanup():
   -1    25     proc1.terminate()
   -1    26     proc2.terminate()
   -1    27     proc1.wait()
   -1    28     proc2.wait()
   -1    29 
   -1    30 
   -1    31 def render():
   -1    32     now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1    33     print(' '.join([now, reader1.line, reader2.line]))
   -1    34 
   -1    35 
   -1    36 def set_nonblock(fd):
   -1    37     fl = fcntl.fcntl(fd, fcntl.F_GETFL)
   -1    38     fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
   -1    39 
   -1    40 
   -1    41 proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    42 proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    43 
   -1    44 set_nonblock(proc1.stdout.fileno())
   -1    45 set_nonblock(proc2.stdout.fileno())
   -1    46 
   -1    47 reader1 = LineReader(proc1.stdout)
   -1    48 reader2 = LineReader(proc2.stdout)
   -1    49 
   -1    50 try:
   -1    51     while True:
   -1    52         for reader in [reader1, reader2]:
   -1    53             try:
   -1    54                 reader.read_line()
   -1    55             except BlockingIOError:
   -1    56                 pass
   -1    57         render()
   -1    58 finally:
   -1    59     cleanup()

diff --git a/03_sleepy_loop.py b/03_sleepy_loop.py

@@ -0,0 +1,61 @@
   -1     1 import datetime
   -1     2 import fcntl
   -1     3 import os
   -1     4 import subprocess
   -1     5 import time
   -1     6 
   -1     7 
   -1     8 class LineReader:
   -1     9     def __init__(self, file):
   -1    10         self.file = file
   -1    11         self.buffer = b''
   -1    12         self.line = ''
   -1    13 
   -1    14     def read_line(self):
   -1    15         chunk = os.read(self.file.fileno(), 1024)
   -1    16         if not chunk:
   -1    17             raise ValueError
   -1    18         self.buffer += chunk
   -1    19         lines = self.buffer.split(b'\n')
   -1    20         if len(lines) > 1:
   -1    21             self.line = lines[-2].decode('utf-8')
   -1    22         self.buffer = lines[-1]
   -1    23 
   -1    24 
   -1    25 def cleanup():
   -1    26     proc1.terminate()
   -1    27     proc2.terminate()
   -1    28     proc1.wait()
   -1    29     proc2.wait()
   -1    30 
   -1    31 
   -1    32 def render():
   -1    33     now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1    34     print(' '.join([now, reader1.line, reader2.line]))
   -1    35 
   -1    36 
   -1    37 def set_nonblock(fd):
   -1    38     fl = fcntl.fcntl(fd, fcntl.F_GETFL)
   -1    39     fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
   -1    40 
   -1    41 
   -1    42 proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    43 proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    44 
   -1    45 set_nonblock(proc1.stdout.fileno())
   -1    46 set_nonblock(proc2.stdout.fileno())
   -1    47 
   -1    48 reader1 = LineReader(proc1.stdout)
   -1    49 reader2 = LineReader(proc2.stdout)
   -1    50 
   -1    51 try:
   -1    52     while True:
   -1    53         for reader in [reader1, reader2]:
   -1    54             try:
   -1    55                 reader.read_line()
   -1    56             except BlockingIOError:
   -1    57                 pass
   -1    58         render()
   -1    59         time.sleep(1)
   -1    60 finally:
   -1    61     cleanup()

diff --git a/04_select_loop.py b/04_select_loop.py

@@ -0,0 +1,52 @@
   -1     1 import datetime
   -1     2 import os
   -1     3 import selectors
   -1     4 import subprocess
   -1     5 
   -1     6 
   -1     7 class LineReader:
   -1     8     def __init__(self, file):
   -1     9         self.file = file
   -1    10         self.buffer = b''
   -1    11         self.line = ''
   -1    12 
   -1    13     def read_line(self):
   -1    14         chunk = os.read(self.file.fileno(), 1024)
   -1    15         if not chunk:
   -1    16             raise ValueError
   -1    17         self.buffer += chunk
   -1    18         lines = self.buffer.split(b'\n')
   -1    19         if len(lines) > 1:
   -1    20             self.line = lines[-2].decode('utf-8')
   -1    21         self.buffer = lines[-1]
   -1    22 
   -1    23 
   -1    24 def cleanup():
   -1    25     proc1.terminate()
   -1    26     proc2.terminate()
   -1    27     proc1.wait()
   -1    28     proc2.wait()
   -1    29 
   -1    30 
   -1    31 def render():
   -1    32     now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1    33     print(' '.join([now, reader1.line, reader2.line]))
   -1    34 
   -1    35 
   -1    36 proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    37 proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    38 
   -1    39 reader1 = LineReader(proc1.stdout)
   -1    40 reader2 = LineReader(proc2.stdout)
   -1    41 
   -1    42 selector = selectors.DefaultSelector()
   -1    43 selector.register(proc1.stdout, selectors.EVENT_READ, reader1)
   -1    44 selector.register(proc2.stdout, selectors.EVENT_READ, reader2)
   -1    45 
   -1    46 try:
   -1    47     while selector.get_map():
   -1    48         for key, mask in selector.select(10):
   -1    49             key.data.read_line()
   -1    50         render()
   -1    51 finally:
   -1    52     cleanup()

diff --git a/05_callback_loop.py b/05_callback_loop.py

@@ -0,0 +1,106 @@
   -1     1 import datetime
   -1     2 import os
   -1     3 import selectors
   -1     4 import subprocess
   -1     5 import time
   -1     6 
   -1     7 
   -1     8 class LineReader:
   -1     9     def __init__(self, file):
   -1    10         self.file = file
   -1    11         self.buffer = b''
   -1    12         self.line = ''
   -1    13 
   -1    14     def read_line(self):
   -1    15         chunk = os.read(self.file.fileno(), 1024)
   -1    16         if not chunk:
   -1    17             raise ValueError
   -1    18         self.buffer += chunk
   -1    19         lines = self.buffer.split(b'\n')
   -1    20         if len(lines) > 1:
   -1    21             self.line = lines[-2].decode('utf-8')
   -1    22         self.buffer = lines[-1]
   -1    23 
   -1    24 
   -1    25 class Loop:
   -1    26     def __init__(self):
   -1    27         self.selector = selectors.DefaultSelector()
   -1    28         self.times = []
   -1    29 
   -1    30     def set_timeout(self, callback, timeout):
   -1    31         now = time.time()
   -1    32         self.times.append((callback, now + timeout))
   -1    33 
   -1    34     def set_interval(self, callback, timeout):
   -1    35         def wrapper():
   -1    36             callback()
   -1    37             self.set_timeout(wrapper, timeout)
   -1    38         self.set_timeout(wrapper, 0)
   -1    39 
   -1    40     def register_file(self, file, callback):
   -1    41         self.selector.register(file, selectors.EVENT_READ, callback)
   -1    42 
   -1    43     def unregister_file(self, file):
   -1    44         self.selector.unregister(file)
   -1    45 
   -1    46     def run(self):
   -1    47         while True:
   -1    48             now = time.time()
   -1    49             timeout = min((t - now for _, t in self.times), default=None)
   -1    50 
   -1    51             for key, mask in self.selector.select(timeout):
   -1    52                 key.data()
   -1    53 
   -1    54             keep = []
   -1    55             now = time.time()
   -1    56             for callback, t in self.times:
   -1    57                 if t < now:
   -1    58                     callback()
   -1    59                 else:
   -1    60                     keep.append((callback, t))
   -1    61             self.times = keep
   -1    62 
   -1    63 
   -1    64 def cleanup():
   -1    65     proc1.terminate()
   -1    66     proc2.terminate()
   -1    67     proc1.wait()
   -1    68     proc2.wait()
   -1    69 
   -1    70 
   -1    71 def render():
   -1    72     now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1    73     print(' '.join([now, reader1.line, reader2.line]))
   -1    74 
   -1    75 
   -1    76 def callback1():
   -1    77     try:
   -1    78         reader1.read_line()
   -1    79     except ValueError:
   -1    80         loop.unregister_file(proc1.stdout)
   -1    81     render()
   -1    82 
   -1    83 
   -1    84 def callback2():
   -1    85     try:
   -1    86         reader2.read_line()
   -1    87     except ValueError:
   -1    88         loop.unregister_file(proc2.stdout)
   -1    89     render()
   -1    90 
   -1    91 
   -1    92 proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    93 proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
   -1    94 
   -1    95 reader1 = LineReader(proc1.stdout)
   -1    96 reader2 = LineReader(proc2.stdout)
   -1    97 
   -1    98 loop = Loop()
   -1    99 loop.register_file(proc1.stdout, callback1)
   -1   100 loop.register_file(proc2.stdout, callback2)
   -1   101 loop.set_interval(render, 10)
   -1   102 
   -1   103 try:
   -1   104     loop.run()
   -1   105 finally:
   -1   106     cleanup()

diff --git a/06_generator_loop.py b/06_generator_loop.py

@@ -0,0 +1,132 @@
   -1     1 import datetime
   -1     2 import os
   -1     3 import selectors
   -1     4 import subprocess
   -1     5 import time
   -1     6 
   -1     7 selector = selectors.DefaultSelector()
   -1     8 data = ['', '', '']
   -1     9 
   -1    10 
   -1    11 class LineReader:
   -1    12     def __init__(self, file):
   -1    13         self.file = file
   -1    14         self.buffer = b''
   -1    15         self.line = ''
   -1    16 
   -1    17     def read_line(self):
   -1    18         chunk = os.read(self.file.fileno(), 1024)
   -1    19         if not chunk:
   -1    20             raise ValueError
   -1    21         self.buffer += chunk
   -1    22         lines = self.buffer.split(b'\n')
   -1    23         if len(lines) > 1:
   -1    24             self.line = lines[-2].decode('utf-8')
   -1    25         self.buffer = lines[-1]
   -1    26 
   -1    27 
   -1    28 class Task:
   -1    29     def __init__(self, gen):
   -1    30         self.gen = gen
   -1    31         self.files = set()
   -1    32         self.times = {0}
   -1    33         self.init = False
   -1    34         self.done = False
   -1    35         self.result = None
   -1    36 
   -1    37     def select(self):
   -1    38         now = time.time()
   -1    39         timeout = min((t - now for t in self.times), default=None)
   -1    40         return {key.fileobj for key, mask in selector.select(timeout)}
   -1    41 
   -1    42     def step(self, files, now):
   -1    43         try:
   -1    44             if self.done:
   -1    45                 return
   -1    46             elif not self.init:
   -1    47                 self.files, self.times = next(self.gen)
   -1    48                 self.init = True
   -1    49             elif any(t < now for t in self.times) or files & self.files:
   -1    50                 self.files, self.times = self.gen.send((files, now))
   -1    51         except StopIteration as e:
   -1    52             self.done = True
   -1    53             self.result = e.value
   -1    54 
   -1    55     def close(self):
   -1    56         self.gen.close()
   -1    57 
   -1    58 
   -1    59 def sleep(t):
   -1    60     yield set(), {time.time() + t}
   -1    61 
   -1    62 
   -1    63 def gather(*generators):
   -1    64     subtasks = [Task(gen) for gen in generators]
   -1    65     try:
   -1    66         while True:
   -1    67             wait_files = set().union(
   -1    68                 *[t.files for t in subtasks if not t.done]
   -1    69             )
   -1    70             wait_times = set().union(
   -1    71                 *[t.times for t in subtasks if not t.done]
   -1    72             )
   -1    73             files, now = yield wait_files, wait_times
   -1    74             for task in subtasks:
   -1    75                 task.step(files, now)
   -1    76             if all(task.done for task in subtasks):
   -1    77                 return [task.result for task in subtasks]
   -1    78     finally:
   -1    79         for task in subtasks:
   -1    80             task.close()
   -1    81 
   -1    82 
   -1    83 def run(gen):
   -1    84     task = Task(gen)
   -1    85     try:
   -1    86         while not task.done:
   -1    87             files = task.select()
   -1    88             task.step(files, time.time())
   -1    89         return task.result
   -1    90     finally:
   -1    91         task.close()
   -1    92 
   -1    93 
   -1    94 def render():
   -1    95     data[0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1    96     print(' '.join(data))
   -1    97 
   -1    98 
   -1    99 def clock():
   -1   100     while True:
   -1   101         yield from sleep(10)
   -1   102         render()
   -1   103 
   -1   104 
   -1   105 def popen(cmd, i):
   -1   106     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
   -1   107     reader = LineReader(proc.stdout)
   -1   108     selector.register(proc.stdout, selectors.EVENT_READ)
   -1   109     try:
   -1   110         while True:
   -1   111             yield {proc.stdout}, set()
   -1   112             try:
   -1   113                 reader.read_line()
   -1   114                 data[i] = reader.line
   -1   115                 render()
   -1   116             except ValueError:
   -1   117                 break
   -1   118     finally:
   -1   119         selector.unregister(proc.stdout)
   -1   120         proc.terminate()
   -1   121         proc.wait()
   -1   122 
   -1   123 
   -1   124 def amain():
   -1   125     yield from gather(
   -1   126         popen(['./random.sh'], 1),
   -1   127         popen(['./random.sh'], 2),
   -1   128         clock(),
   -1   129     )
   -1   130 
   -1   131 
   -1   132 run(amain())

diff --git a/07_async_await_loop.py b/07_async_await_loop.py

@@ -0,0 +1,141 @@
   -1     1 import datetime
   -1     2 import os
   -1     3 import selectors
   -1     4 import subprocess
   -1     5 import time
   -1     6 
   -1     7 selector = selectors.DefaultSelector()
   -1     8 data = ['', '', '']
   -1     9 
   -1    10 
   -1    11 class LineReader:
   -1    12     def __init__(self, file):
   -1    13         self.file = file
   -1    14         self.buffer = b''
   -1    15         self.line = ''
   -1    16 
   -1    17     def read_line(self):
   -1    18         chunk = os.read(self.file.fileno(), 1024)
   -1    19         if not chunk:
   -1    20             raise ValueError
   -1    21         self.buffer += chunk
   -1    22         lines = self.buffer.split(b'\n')
   -1    23         if len(lines) > 1:
   -1    24             self.line = lines[-2].decode('utf-8')
   -1    25         self.buffer = lines[-1]
   -1    26         return self.line
   -1    27 
   -1    28 
   -1    29 class AYield:
   -1    30     def __init__(self, value):
   -1    31         self.value = value
   -1    32 
   -1    33     def __await__(self):
   -1    34         return (yield self.value)
   -1    35 
   -1    36 
   -1    37 class Task:
   -1    38     def __init__(self, coro):
   -1    39         self.gen = coro.__await__()
   -1    40         self.files = set()
   -1    41         self.times = {0}
   -1    42         self.init = False
   -1    43         self.done = False
   -1    44         self.result = None
   -1    45 
   -1    46     def select(self):
   -1    47         now = time.time()
   -1    48         timeout = min((t - now for t in self.times), default=None)
   -1    49         return {key.fileobj for key, mask in selector.select(timeout)}
   -1    50 
   -1    51     def step(self, files, now):
   -1    52         try:
   -1    53             if self.done:
   -1    54                 return
   -1    55             elif not self.init:
   -1    56                 self.files, self.times = next(self.gen)
   -1    57                 self.init = True
   -1    58             elif any(t < now for t in self.times) or files & self.files:
   -1    59                 self.files, self.times = self.gen.send((files, now))
   -1    60         except StopIteration as e:
   -1    61             self.done = True
   -1    62             self.result = e.value
   -1    63 
   -1    64     def close(self):
   -1    65         self.gen.close()
   -1    66 
   -1    67 
   -1    68 async def sleep(t):
   -1    69     await AYield((set(), {time.time() + t}))
   -1    70 
   -1    71 
   -1    72 async def gather(*coros):
   -1    73     subtasks = [Task(coro) for coro in coros]
   -1    74     try:
   -1    75         while True:
   -1    76             wait_files = set().union(
   -1    77                 *[t.files for t in subtasks if not t.done]
   -1    78             )
   -1    79             wait_times = set().union(
   -1    80                 *[t.times for t in subtasks if not t.done]
   -1    81             )
   -1    82             files, now = await AYield((wait_files, wait_times))
   -1    83             for task in subtasks:
   -1    84                 task.step(files, now)
   -1    85             if all(task.done for task in subtasks):
   -1    86                 return [task.result for task in subtasks]
   -1    87     finally:
   -1    88         for task in subtasks:
   -1    89             task.close()
   -1    90 
   -1    91 
   -1    92 def run(coro):
   -1    93     task = Task(coro)
   -1    94     try:
   -1    95         while not task.done:
   -1    96             files = task.select()
   -1    97             task.step(files, time.time())
   -1    98         return task.result
   -1    99     finally:
   -1   100         task.close()
   -1   101 
   -1   102 
   -1   103 def render():
   -1   104     data[0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   -1   105     print(' '.join(data))
   -1   106 
   -1   107 
   -1   108 async def clock():
   -1   109     while True:
   -1   110         await sleep(10)
   -1   111         render()
   -1   112 
   -1   113 
   -1   114 async def popen(cmd, i):
   -1   115     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
   -1   116     reader = LineReader(proc.stdout)
   -1   117     selector.register(proc.stdout, selectors.EVENT_READ)
   -1   118     try:
   -1   119         while True:
   -1   120             await AYield(({proc.stdout}, set()))
   -1   121             try:
   -1   122                 reader.read_line()
   -1   123                 data[i] = reader.line
   -1   124                 render()
   -1   125             except ValueError:
   -1   126                 break
   -1   127     finally:
   -1   128         selector.unregister(proc.stdout)
   -1   129         proc.terminate()
   -1   130         proc.wait()
   -1   131 
   -1   132 
   -1   133 async def amain():
   -1   134     await gather(
   -1   135         popen(['./random.sh'], 1),
   -1   136         popen(['./random.sh'], 2),
   -1   137         clock(),
   -1   138     )
   -1   139 
   -1   140 
   -1   141 run(amain())

diff --git a/random.sh b/random.sh

@@ -0,0 +1,5 @@
   -1     1 #!/bin/bash
   -1     2 while true; do
   -1     3     sleep $(($RANDOM % 5))
   -1     4     echo $RANDOM
   -1     5 done