python_async_loops

Eight different ways to implement an asyncronous loop in python  http://tobib.spline.de/xi/posts/2023-01-29-python-async-loops/
git clone https://git.ce9e.org/python_async_loops.git

commit
ca80ef33ab9cc8fd7033858bd60c5f767e124326
parent
ca72ba1be3057594e39072768ade224b0bb54482
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2023-01-31 19:48
refactor Task

Diffstat

M 06_generator_loop.py 28 ++++++++++++++++++----------
M 07_async_await_loop.py 28 ++++++++++++++++++----------

2 files changed, 36 insertions, 20 deletions


diff --git a/06_generator_loop.py b/06_generator_loop.py

@@ -29,23 +29,28 @@ class Task:
   29    29     def __init__(self, gen):
   30    30         self.gen = gen
   31    31         self.files = set()
   32    -1         self.times = {0}
   33    -1         self.init = False
   -1    32         self.times = {}
   34    33         self.done = False
   35    34         self.result = None
   36    35 
   37    -1     def step(self, files, now):
   -1    36     def set_result(self, result):
   -1    37         self.done = True
   -1    38         self.result = result
   -1    39 
   -1    40     def init(self):
   -1    41         try:
   -1    42             self.files, self.times = next(self.gen)
   -1    43         except StopIteration as e:
   -1    44             self.set_result(e.value)
   -1    45 
   -1    46     def wakeup(self, files, now):
   38    47         try:
   39    48             if self.done:
   40    49                 return
   41    -1             elif not self.init:
   42    -1                 self.files, self.times = next(self.gen)
   43    -1                 self.init = True
   44    50             elif any(t < now for t in self.times) or files & self.files:
   45    51                 self.files, self.times = self.gen.send((files, now))
   46    52         except StopIteration as e:
   47    -1             self.done = True
   48    -1             self.result = e.value
   -1    53             self.set_result(e.value)
   49    54 
   50    55     def close(self):
   51    56         self.gen.close()
@@ -54,11 +59,12 @@ class Task:
   54    59 def run(gen):
   55    60     task = Task(gen)
   56    61     try:
   -1    62         task.init()
   57    63         while not task.done:
   58    64             now = time.time()
   59    65             timeout = min((t - now for t in task.times), default=None)
   60    66             files = {key.fileobj for key, mask in selector.select(timeout)}
   61    -1             task.step(files, time.time())
   -1    67             task.wakeup(files, time.time())
   62    68         return task.result
   63    69     finally:
   64    70         task.close()
@@ -71,6 +77,8 @@ def sleep(t):
   71    77 def gather(*generators):
   72    78     subtasks = [Task(gen) for gen in generators]
   73    79     try:
   -1    80         for task in subtasks:
   -1    81             task.init()
   74    82         while True:
   75    83             wait_files = set().union(
   76    84                 *[t.files for t in subtasks if not t.done]
@@ -80,7 +88,7 @@ def gather(*generators):
   80    88             )
   81    89             files, now = yield wait_files, wait_times
   82    90             for task in subtasks:
   83    -1                 task.step(files, now)
   -1    91                 task.wakeup(files, now)
   84    92             if all(task.done for task in subtasks):
   85    93                 return [task.result for task in subtasks]
   86    94     finally:

diff --git a/07_async_await_loop.py b/07_async_await_loop.py

@@ -38,23 +38,28 @@ class Task:
   38    38     def __init__(self, coro):
   39    39         self.gen = coro.__await__()
   40    40         self.files = set()
   41    -1         self.times = {0}
   42    -1         self.init = False
   -1    41         self.times = {}
   43    42         self.done = False
   44    43         self.result = None
   45    44 
   46    -1     def step(self, files, now):
   -1    45     def set_result(self, result):
   -1    46         self.done = True
   -1    47         self.result = result
   -1    48 
   -1    49     def init(self):
   -1    50         try:
   -1    51             self.files, self.times = next(self.gen)
   -1    52         except StopIteration as e:
   -1    53             self.set_result(e.value)
   -1    54 
   -1    55     def wakeup(self, files, now):
   47    56         try:
   48    57             if self.done:
   49    58                 return
   50    -1             elif not self.init:
   51    -1                 self.files, self.times = next(self.gen)
   52    -1                 self.init = True
   53    59             elif any(t < now for t in self.times) or files & self.files:
   54    60                 self.files, self.times = self.gen.send((files, now))
   55    61         except StopIteration as e:
   56    -1             self.done = True
   57    -1             self.result = e.value
   -1    62             self.set_result(e.value)
   58    63 
   59    64     def close(self):
   60    65         self.gen.close()
@@ -63,11 +68,12 @@ class Task:
   63    68 def run(coro):
   64    69     task = Task(coro)
   65    70     try:
   -1    71         task.init()
   66    72         while not task.done:
   67    73             now = time.time()
   68    74             timeout = min((t - now for t in task.times), default=None)
   69    75             files = {key.fileobj for key, mask in selector.select(timeout)}
   70    -1             task.step(files, time.time())
   -1    76             task.wakeup(files, time.time())
   71    77         return task.result
   72    78     finally:
   73    79         task.close()
@@ -80,6 +86,8 @@ async def sleep(t):
   80    86 async def gather(*coros):
   81    87     subtasks = [Task(coro) for coro in coros]
   82    88     try:
   -1    89         for task in subtasks:
   -1    90             task.init()
   83    91         while True:
   84    92             wait_files = set().union(
   85    93                 *[t.files for t in subtasks if not t.done]
@@ -89,7 +97,7 @@ async def gather(*coros):
   89    97             )
   90    98             files, now = await AYield((wait_files, wait_times))
   91    99             for task in subtasks:
   92    -1                 task.step(files, now)
   -1   100                 task.wakeup(files, now)
   93   101             if all(task.done for task in subtasks):
   94   102                 return [task.result for task in subtasks]
   95   103     finally: