From 441be3fb5957e00a65b1b6aae94084d8f2d782c9 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Sun, 17 Dec 2017 21:42:35 -0500 Subject: [PATCH 1/5] bpo-32355: Optimize asyncio.gather() --- Lib/asyncio/tasks.py | 93 +++++++++++-------- .../2017-12-17-21-42-24.bpo-32355.tbaTWA.rst | 1 + 2 files changed, 53 insertions(+), 41 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 275141c65e7e22..e1342ee76bad57 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -575,8 +575,7 @@ def cancel(self): def gather(*coros_or_futures, loop=None, return_exceptions=False): - """Return a future aggregating results from the given coroutines - or futures. + """Return a future aggregating results from the given coroutines/futures. Coroutines will be wrapped in a future and scheduled in the event loop. They will not necessarily be scheduled in the same order as @@ -605,56 +604,68 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): outer.set_result([]) return outer - arg_to_fut = {} - for arg in set(coros_or_futures): - if not futures.isfuture(arg): - fut = ensure_future(arg, loop=loop) - if loop is None: - loop = fut._loop - # The caller cannot control this future, the "destroy pending task" - # warning should not be emitted. - fut._log_destroy_pending = False - else: - fut = arg - if loop is None: - loop = fut._loop - elif fut._loop is not loop: - raise ValueError("futures are tied to different event loops") - arg_to_fut[arg] = fut - - children = [arg_to_fut[arg] for arg in coros_or_futures] - nchildren = len(children) - outer = _GatheringFuture(children, loop=loop) - nfinished = 0 - results = [None] * nchildren - - def _done_callback(i, fut): + def _done_callback(fut): nonlocal nfinished + nfinished += 1 + if outer.done(): if not fut.cancelled(): # Mark exception retrieved. fut.exception() return - if fut.cancelled(): - res = futures.CancelledError() - if not return_exceptions: - outer.set_exception(res) + if not return_exceptions: + if fut.cancelled(): + exc = futures.CancelledError() + outer.set_exception(exc) return - elif fut._exception is not None: - res = fut.exception() # Mark exception retrieved. - if not return_exceptions: - outer.set_exception(res) + elif fut._exception is not None: + outer.set_exception(fut._exception) return - else: - res = fut._result - results[i] = res - nfinished += 1 - if nfinished == nchildren: + + if nfinished == nfuts: + # All futures are done; create a list of results + # and set it to the 'outer' future. + results = [] + + for fut in children: + if fut.cancelled(): + res = futures.CancelledError() + elif fut._exception is not None: + res = fut._exception + else: + res = fut._result + results.append(res) + outer.set_result(results) - for i, fut in enumerate(children): - fut.add_done_callback(functools.partial(_done_callback, i)) + arg_to_fut = {} + children = [] + nfuts = 0 + nfinished = 0 + for arg in coros_or_futures: + if arg not in arg_to_fut: + fut = ensure_future(arg, loop=loop) + if loop is None: + loop = fut._loop + if fut is not arg: + # 'arg' was not a Future, therefore, 'fut' is a new + # Future created specifically for 'arg'. Since the caller + # can't control it, disable the "destroy pending task" + # warning. + fut._log_destroy_pending = False + + nfuts += 1 + arg_to_fut[arg] = fut + fut.add_done_callback(_done_callback) + + else: + # There's a duplicate Future object in coros_or_futures. + fut = arg_to_fut[arg] + + children.append(fut) + + outer = _GatheringFuture(children, loop=loop) return outer diff --git a/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst b/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst new file mode 100644 index 00000000000000..ca908e97803aeb --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst @@ -0,0 +1 @@ +Optimize asyncio.gather(); now up to 15% faster. From 8bd98529a53a38b798a719c74bc04735fe82e40b Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 18 Dec 2017 12:54:35 -0500 Subject: [PATCH 2/5] Use only public Future API --- Lib/asyncio/tasks.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index e1342ee76bad57..70ea4a6f536c9d 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -619,9 +619,11 @@ def _done_callback(fut): exc = futures.CancelledError() outer.set_exception(exc) return - elif fut._exception is not None: - outer.set_exception(fut._exception) - return + else: + exc = fut.exception() + if exc is not None: + outer.set_exception(exc) + return if nfinished == nfuts: # All futures are done; create a list of results @@ -631,10 +633,10 @@ def _done_callback(fut): for fut in children: if fut.cancelled(): res = futures.CancelledError() - elif fut._exception is not None: - res = fut._exception else: - res = fut._result + res = fut.exception() + if res is None: + res = fut.result() results.append(res) outer.set_result(results) From 8ee2b69ada0de64742ca9ad0fe976c04b0025b50 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 18 Dec 2017 12:57:53 -0500 Subject: [PATCH 3/5] Add a couple of comments clarifying cancelled() calls --- Lib/asyncio/tasks.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 70ea4a6f536c9d..ff8a486b544c94 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -616,6 +616,9 @@ def _done_callback(fut): if not return_exceptions: if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. exc = futures.CancelledError() outer.set_exception(exc) return @@ -632,6 +635,9 @@ def _done_callback(fut): for fut in children: if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. res = futures.CancelledError() else: res = fut.exception() From b10fbe9b0665f9496ef6d81da05b91637c75abf6 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 18 Dec 2017 13:00:36 -0500 Subject: [PATCH 4/5] Use public Future API in base_events.py --- Lib/asyncio/base_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 398497de09fce7..ad1d46e5d5a481 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -159,7 +159,7 @@ def _ipaddr_info(host, port, family, type, proto): def _run_until_complete_cb(fut): - exc = fut._exception + exc = fut.exception() if isinstance(exc, BaseException) and not isinstance(exc, Exception): # Issue #22429: run_forever() already finished, no need to # stop it. From a41d3e936182a2b54f58296384c2e736152882b1 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 18 Dec 2017 17:14:30 -0500 Subject: [PATCH 5/5] Fix cancelled future case --- Lib/asyncio/base_events.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index ad1d46e5d5a481..d1ec45dd7762a4 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -159,11 +159,12 @@ def _ipaddr_info(host, port, family, type, proto): def _run_until_complete_cb(fut): - exc = fut.exception() - if isinstance(exc, BaseException) and not isinstance(exc, Exception): - # Issue #22429: run_forever() already finished, no need to - # stop it. - return + if not fut.cancelled(): + exc = fut.exception() + if isinstance(exc, BaseException) and not isinstance(exc, Exception): + # Issue #22429: run_forever() already finished, no need to + # stop it. + return fut._loop.stop()