From aa50a8cde0919f0cf98b66b415403f04e54c7f05 Mon Sep 17 00:00:00 2001 From: Chris Kuehl Date: Mon, 22 Oct 2018 09:50:46 -0700 Subject: [PATCH] Switch to using concurrent.futures --- pre_commit/xargs.py | 47 +++++++++++++++++++++++---------------------- setup.py | 5 ++++- tests/xargs_test.py | 13 +++++++++++++ 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/pre_commit/xargs.py b/pre_commit/xargs.py index 9c4bc78a..5222d553 100644 --- a/pre_commit/xargs.py +++ b/pre_commit/xargs.py @@ -4,7 +4,6 @@ from __future__ import unicode_literals import contextlib import math -import multiprocessing.pool import sys import concurrent.futures @@ -79,12 +78,12 @@ def partition(cmd, varargs, target_concurrency, _max_length=None): @contextlib.contextmanager -def _threadpool(size): - pool = multiprocessing.pool.ThreadPool(size) - try: - yield pool - finally: - pool.terminate() +def _thread_mapper(maxsize): + if maxsize == 1: + yield map + else: + with concurrent.futures.ThreadPoolExecutor(maxsize) as ex: + yield ex.map def xargs(cmd, varargs, **kwargs): @@ -109,22 +108,24 @@ def xargs(cmd, varargs, **kwargs): def run_cmd_partition(run_cmd): return cmd_output(*run_cmd, encoding=None, retcode=None) - with _threadpool(min(len(partitions), target_concurrency)) as pool: - results = pool.map(run_cmd_partition, partitions) + with _thread_mapper( + min(len(partitions), target_concurrency), + ) as thread_map: + results = thread_map(run_cmd_partition, partitions) - for proc_retcode, proc_out, proc_err in results: - # This is *slightly* too clever so I'll explain it. - # First the xor boolean table: - # T | F | - # +-------+ - # T | F | T | - # --+-------+ - # F | T | F | - # --+-------+ - # When negate is True, it has the effect of flipping the return code - # Otherwise, the retuncode is unchanged - retcode |= bool(proc_retcode) ^ negate - stdout += proc_out - stderr += proc_err + for proc_retcode, proc_out, proc_err in results: + # This is *slightly* too clever so I'll explain it. + # First the xor boolean table: + # T | F | + # +-------+ + # T | F | T | + # --+-------+ + # F | T | F | + # --+-------+ + # When negate is True, it has the effect of flipping the return + # code. Otherwise, the returncode is unchanged. + retcode |= bool(proc_retcode) ^ negate + stdout += proc_out + stderr += proc_err return retcode, stdout, stderr diff --git a/setup.py b/setup.py index 7c0a958f..dd3eb425 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,10 @@ setup( 'toml', 'virtualenv', ], - extras_require={':python_version<"3.7"': ['importlib-resources']}, + extras_require={ + ':python_version<"3.2"': ['futures'], + ':python_version<"3.7"': ['importlib-resources'], + }, entry_points={ 'console_scripts': [ 'pre-commit = pre_commit.main:main', diff --git a/tests/xargs_test.py b/tests/xargs_test.py index da3cc74d..ed65ed46 100644 --- a/tests/xargs_test.py +++ b/tests/xargs_test.py @@ -5,6 +5,7 @@ from __future__ import unicode_literals import sys import time +import concurrent.futures import mock import pytest import six @@ -180,3 +181,15 @@ def test_xargs_concurrency(): # It would take 0.5*5=2.5 seconds ot run all of these in serial, so if it # takes less, they must have run concurrently. assert elapsed < 2.5 + + +def test_thread_mapper_concurrency_uses_threadpoolexecutor_map(): + with xargs._thread_mapper(10) as thread_map: + assert isinstance( + thread_map.__self__, concurrent.futures.ThreadPoolExecutor, + ) is True + + +def test_thread_mapper_concurrency_uses_regular_map(): + with xargs._thread_mapper(1) as thread_map: + assert thread_map is map