drop python 3.6 support

python 3.6 reached end of life on 2021-12-23
This commit is contained in:
Anthony Sottile 2022-01-18 17:36:17 -05:00
parent d3bdf1403d
commit 04de6a2e57
111 changed files with 401 additions and 286 deletions

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import concurrent.futures
import contextlib
import math
@ -8,11 +10,8 @@ from typing import Any
from typing import Callable
from typing import Generator
from typing import Iterable
from typing import List
from typing import MutableMapping
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from pre_commit import parse_shebang
@ -23,7 +22,7 @@ TArg = TypeVar('TArg')
TRet = TypeVar('TRet')
def _environ_size(_env: Optional[MutableMapping[str, str]] = None) -> int:
def _environ_size(_env: MutableMapping[str, str] | None = None) -> int:
environ = _env if _env is not None else getattr(os, 'environb', os.environ)
size = 8 * len(environ) # number of pointers in `envp`
for k, v in environ.items():
@ -62,8 +61,8 @@ def partition(
cmd: Sequence[str],
varargs: Sequence[str],
target_concurrency: int,
_max_length: Optional[int] = None,
) -> Tuple[Tuple[str, ...], ...]:
_max_length: int | None = None,
) -> tuple[tuple[str, ...], ...]:
_max_length = _max_length or _get_platform_max_length()
# Generally, we try to partition evenly into at least `target_concurrency`
@ -73,7 +72,7 @@ def partition(
cmd = tuple(cmd)
ret = []
ret_cmd: List[str] = []
ret_cmd: list[str] = []
# Reversed so arguments are in order
varargs = list(reversed(varargs))
@ -115,14 +114,14 @@ def _thread_mapper(maxsize: int) -> Generator[
def xargs(
cmd: Tuple[str, ...],
cmd: tuple[str, ...],
varargs: Sequence[str],
*,
color: bool = False,
target_concurrency: int = 1,
_max_length: int = _get_platform_max_length(),
**kwargs: Any,
) -> Tuple[int, bytes]:
) -> tuple[int, bytes]:
"""A simplified implementation of xargs.
color: Make a pty if on a platform that supports it
@ -152,8 +151,8 @@ def xargs(
partitions = partition(cmd, varargs, target_concurrency, _max_length)
def run_cmd_partition(
run_cmd: Tuple[str, ...],
) -> Tuple[int, bytes, Optional[bytes]]:
run_cmd: tuple[str, ...],
) -> tuple[int, bytes, bytes | None]:
return cmd_fn(
*run_cmd, retcode=None, stderr=subprocess.STDOUT, **kwargs,
)