show color in hook outputs when attached to a tty

This commit is contained in:
Anthony Sottile 2019-10-12 15:57:40 -07:00
parent c8620f35e1
commit 7c3404ef1f
27 changed files with 200 additions and 76 deletions

View file

@ -117,29 +117,28 @@ class CalledProcessError(RuntimeError):
__str__ = to_text
def cmd_output_b(*cmd, **kwargs):
retcode = kwargs.pop('retcode', 0)
popen_kwargs = {
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
}
def _cmd_kwargs(*cmd, **kwargs):
# py2/py3 on windows are more strict about the types here
cmd = tuple(five.n(arg) for arg in cmd)
kwargs['env'] = {
five.n(key): five.n(value)
for key, value in kwargs.pop('env', {}).items()
} or None
popen_kwargs.update(kwargs)
for arg in ('stdin', 'stdout', 'stderr'):
kwargs.setdefault(arg, subprocess.PIPE)
return cmd, kwargs
def cmd_output_b(*cmd, **kwargs):
retcode = kwargs.pop('retcode', 0)
cmd, kwargs = _cmd_kwargs(*cmd, **kwargs)
try:
cmd = parse_shebang.normalize_cmd(cmd)
except parse_shebang.ExecutableNotFoundError as e:
returncode, stdout_b, stderr_b = e.to_output()
else:
proc = subprocess.Popen(cmd, **popen_kwargs)
proc = subprocess.Popen(cmd, **kwargs)
stdout_b, stderr_b = proc.communicate()
returncode = proc.returncode
@ -158,6 +157,72 @@ def cmd_output(*cmd, **kwargs):
return returncode, stdout, stderr
if os.name != 'nt': # pragma: windows no cover
from os import openpty
import termios
class Pty(object):
def __init__(self):
self.r = self.w = None
def __enter__(self):
self.r, self.w = openpty()
# tty flags normally change \n to \r\n
attrs = termios.tcgetattr(self.r)
attrs[1] &= ~(termios.ONLCR | termios.OPOST)
termios.tcsetattr(self.r, termios.TCSANOW, attrs)
return self
def close_w(self):
if self.w is not None:
os.close(self.w)
self.w = None
def close_r(self):
assert self.r is not None
os.close(self.r)
self.r = None
def __exit__(self, exc_type, exc_value, traceback):
self.close_w()
self.close_r()
def cmd_output_p(*cmd, **kwargs):
assert kwargs.pop('retcode') is None
assert kwargs['stderr'] == subprocess.STDOUT, kwargs['stderr']
cmd, kwargs = _cmd_kwargs(*cmd, **kwargs)
try:
cmd = parse_shebang.normalize_cmd(cmd)
except parse_shebang.ExecutableNotFoundError as e:
return e.to_output()
with open(os.devnull) as devnull, Pty() as pty:
kwargs.update({'stdin': devnull, 'stdout': pty.w, 'stderr': pty.w})
proc = subprocess.Popen(cmd, **kwargs)
pty.close_w()
buf = b''
while True:
try:
bts = os.read(pty.r, 4096)
except OSError as e:
if e.errno == errno.EIO:
bts = b''
else:
raise
else:
buf += bts
if not bts:
break
return proc.wait(), buf, None
else: # pragma: no cover
cmd_output_p = cmd_output_b
def rmtree(path):
"""On windows, rmtree fails for readonly dirs."""
def handle_remove_readonly(func, path, exc):