Switch to using concurrent.futures

This commit is contained in:
Chris Kuehl 2018-10-22 09:50:46 -07:00 committed by Chris Kuehl
parent 231f6013bb
commit aa50a8cde0
3 changed files with 41 additions and 24 deletions

View file

@ -4,7 +4,6 @@ from __future__ import unicode_literals
import contextlib
import math
import multiprocessing.pool
import sys
import concurrent.futures
@ -79,12 +78,12 @@ def partition(cmd, varargs, target_concurrency, _max_length=None):
@contextlib.contextmanager
def _threadpool(size):
pool = multiprocessing.pool.ThreadPool(size)
try:
yield pool
finally:
pool.terminate()
def _thread_mapper(maxsize):
if maxsize == 1:
yield map
else:
with concurrent.futures.ThreadPoolExecutor(maxsize) as ex:
yield ex.map
def xargs(cmd, varargs, **kwargs):
@ -109,22 +108,24 @@ def xargs(cmd, varargs, **kwargs):
def run_cmd_partition(run_cmd):
return cmd_output(*run_cmd, encoding=None, retcode=None)
with _threadpool(min(len(partitions), target_concurrency)) as pool:
results = pool.map(run_cmd_partition, partitions)
with _thread_mapper(
min(len(partitions), target_concurrency),
) as thread_map:
results = thread_map(run_cmd_partition, partitions)
for proc_retcode, proc_out, proc_err in results:
# This is *slightly* too clever so I'll explain it.
# First the xor boolean table:
# T | F |
# +-------+
# T | F | T |
# --+-------+
# F | T | F |
# --+-------+
# When negate is True, it has the effect of flipping the return code
# Otherwise, the retuncode is unchanged
retcode |= bool(proc_retcode) ^ negate
stdout += proc_out
stderr += proc_err
for proc_retcode, proc_out, proc_err in results:
# This is *slightly* too clever so I'll explain it.
# First the xor boolean table:
# T | F |
# +-------+
# T | F | T |
# --+-------+
# F | T | F |
# --+-------+
# When negate is True, it has the effect of flipping the return
# code. Otherwise, the returncode is unchanged.
retcode |= bool(proc_retcode) ^ negate
stdout += proc_out
stderr += proc_err
return retcode, stdout, stderr

View file

@ -47,7 +47,10 @@ setup(
'toml',
'virtualenv',
],
extras_require={':python_version<"3.7"': ['importlib-resources']},
extras_require={
':python_version<"3.2"': ['futures'],
':python_version<"3.7"': ['importlib-resources'],
},
entry_points={
'console_scripts': [
'pre-commit = pre_commit.main:main',

View file

@ -5,6 +5,7 @@ from __future__ import unicode_literals
import sys
import time
import concurrent.futures
import mock
import pytest
import six
@ -180,3 +181,15 @@ def test_xargs_concurrency():
# It would take 0.5*5=2.5 seconds ot run all of these in serial, so if it
# takes less, they must have run concurrently.
assert elapsed < 2.5
def test_thread_mapper_concurrency_uses_threadpoolexecutor_map():
with xargs._thread_mapper(10) as thread_map:
assert isinstance(
thread_map.__self__, concurrent.futures.ThreadPoolExecutor,
) is True
def test_thread_mapper_concurrency_uses_regular_map():
with xargs._thread_mapper(1) as thread_map:
assert thread_map is map