Implement pre-commit autoupdate --freeze

This commit is contained in:
Anthony Sottile 2019-12-28 12:25:30 -08:00
parent 32ce682238
commit 8a3c740f9e
5 changed files with 314 additions and 260 deletions

View file

@ -1,18 +1,17 @@
from __future__ import print_function
from __future__ import unicode_literals
import collections
import os.path
import re
import six
from aspy.yaml import ordered_dump
from aspy.yaml import ordered_load
from cfgv import remove_defaults
import pre_commit.constants as C
from pre_commit import git
from pre_commit import output
from pre_commit.clientlib import CONFIG_SCHEMA
from pre_commit.clientlib import InvalidManifestError
from pre_commit.clientlib import load_config
from pre_commit.clientlib import load_manifest
@ -25,39 +24,44 @@ from pre_commit.util import cmd_output_b
from pre_commit.util import tmpdir
class RevInfo(collections.namedtuple('RevInfo', ('repo', 'rev', 'frozen'))):
__slots__ = ()
@classmethod
def from_config(cls, config):
return cls(config['repo'], config['rev'], None)
def update(self, tags_only, freeze):
if tags_only:
tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--abbrev=0')
else:
tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--exact')
with tmpdir() as tmp:
git.init_repo(tmp, self.repo)
cmd_output_b('git', 'fetch', 'origin', 'HEAD', '--tags', cwd=tmp)
try:
rev = cmd_output(*tag_cmd, cwd=tmp)[1].strip()
except CalledProcessError:
cmd = ('git', 'rev-parse', 'FETCH_HEAD')
rev = cmd_output(*cmd, cwd=tmp)[1].strip()
frozen = None
if freeze:
exact = cmd_output('git', 'rev-parse', rev, cwd=tmp)[1].strip()
if exact != rev:
rev, frozen = exact, rev
return self._replace(rev=rev, frozen=frozen)
class RepositoryCannotBeUpdatedError(RuntimeError):
pass
def _update_repo(repo_config, store, tags_only):
"""Updates a repository to the tip of `master`. If the repository cannot
be updated because a hook that is configured does not exist in `master`,
this raises a RepositoryCannotBeUpdatedError
Args:
repo_config - A config for a repository
"""
with tmpdir() as repo_path:
git.init_repo(repo_path, repo_config['repo'])
cmd_output_b('git', 'fetch', 'origin', 'HEAD', '--tags', cwd=repo_path)
tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags')
if tags_only:
tag_cmd += ('--abbrev=0',)
else:
tag_cmd += ('--exact',)
try:
rev = cmd_output(*tag_cmd, cwd=repo_path)[1].strip()
except CalledProcessError:
tag_cmd = ('git', 'rev-parse', 'FETCH_HEAD')
rev = cmd_output(*tag_cmd, cwd=repo_path)[1].strip()
# Don't bother trying to update if our rev is the same
if rev == repo_config['rev']:
return repo_config
def _check_hooks_still_exist_at_rev(repo_config, info, store):
try:
path = store.clone(repo_config['repo'], rev)
path = store.clone(repo_config['repo'], info.rev)
manifest = load_manifest(os.path.join(path, C.MANIFEST_FILE))
except InvalidManifestError as e:
raise RepositoryCannotBeUpdatedError(six.text_type(e))
@ -71,94 +75,91 @@ def _update_repo(repo_config, store, tags_only):
'{}'.format(', '.join(sorted(hooks_missing))),
)
# Construct a new config with the head rev
new_config = repo_config.copy()
new_config['rev'] = rev
return new_config
REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([^\s#]+)(.*)(\r?\n)$', re.DOTALL)
REV_LINE_FMT = '{}rev:{}{}{}{}'
REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([^\s#]+)(.*)$', re.DOTALL)
REV_LINE_FMT = '{}rev:{}{}{}'
def _write_new_config_file(path, output):
def _original_lines(path, rev_infos, retry=False):
"""detect `rev:` lines or reformat the file"""
with open(path) as f:
original_contents = f.read()
output = remove_defaults(output, CONFIG_SCHEMA)
new_contents = ordered_dump(output, **C.YAML_DUMP_KWARGS)
original = f.read()
lines = original_contents.splitlines(True)
rev_line_indices_reversed = list(
reversed([
i for i, line in enumerate(lines) if REV_LINE_RE.match(line)
]),
)
lines = original.splitlines(True)
idxs = [i for i, line in enumerate(lines) if REV_LINE_RE.match(line)]
if len(idxs) == len(rev_infos):
return lines, idxs
elif retry:
raise AssertionError('could not find rev lines')
else:
with open(path, 'w') as f:
f.write(ordered_dump(ordered_load(original), **C.YAML_DUMP_KWARGS))
return _original_lines(path, rev_infos, retry=True)
for line in new_contents.splitlines(True):
if REV_LINE_RE.match(line):
# It's possible we didn't identify the rev lines in the original
if not rev_line_indices_reversed:
break
line_index = rev_line_indices_reversed.pop()
original_line = lines[line_index]
orig_match = REV_LINE_RE.match(original_line)
new_match = REV_LINE_RE.match(line)
lines[line_index] = REV_LINE_FMT.format(
orig_match.group(1), orig_match.group(2),
new_match.group(3), orig_match.group(4),
)
# If we failed to intelligently rewrite the rev lines, fall back to the
# pretty-formatted yaml output
to_write = ''.join(lines)
if remove_defaults(ordered_load(to_write), CONFIG_SCHEMA) != output:
to_write = new_contents
def _write_new_config(path, rev_infos):
lines, idxs = _original_lines(path, rev_infos)
for idx, rev_info in zip(idxs, rev_infos):
if rev_info is None:
continue
match = REV_LINE_RE.match(lines[idx])
assert match is not None
new_rev_s = ordered_dump({'rev': rev_info.rev}, **C.YAML_DUMP_KWARGS)
new_rev = new_rev_s.split(':', 1)[1].strip()
if rev_info.frozen is not None:
comment = ' # {}'.format(rev_info.frozen)
else:
comment = match.group(4)
lines[idx] = REV_LINE_FMT.format(
match.group(1), match.group(2), new_rev, comment, match.group(5),
)
with open(path, 'w') as f:
f.write(to_write)
f.write(''.join(lines))
def autoupdate(config_file, store, tags_only, repos=()):
def autoupdate(config_file, store, tags_only, freeze, repos=()):
"""Auto-update the pre-commit config to the latest versions of repos."""
migrate_config(config_file, quiet=True)
retv = 0
output_repos = []
rev_infos = []
changed = False
input_config = load_config(config_file)
for repo_config in input_config['repos']:
if (
repo_config['repo'] in {LOCAL, META} or
# Skip updating any repo_configs that aren't for the specified repo
repos and repo_config['repo'] not in repos
):
output_repos.append(repo_config)
config = load_config(config_file)
for repo_config in config['repos']:
if repo_config['repo'] in {LOCAL, META}:
continue
output.write('Updating {}...'.format(repo_config['repo']))
info = RevInfo.from_config(repo_config)
if repos and info.repo not in repos:
rev_infos.append(None)
continue
output.write('Updating {}...'.format(info.repo))
new_info = info.update(tags_only=tags_only, freeze=freeze)
try:
new_repo_config = _update_repo(repo_config, store, tags_only)
_check_hooks_still_exist_at_rev(repo_config, new_info, store)
except RepositoryCannotBeUpdatedError as error:
output.write_line(error.args[0])
output_repos.append(repo_config)
rev_infos.append(None)
retv = 1
continue
if new_repo_config['rev'] != repo_config['rev']:
if new_info.rev != info.rev:
changed = True
output.write_line(
'updating {} -> {}.'.format(
repo_config['rev'], new_repo_config['rev'],
),
)
output_repos.append(new_repo_config)
if new_info.frozen:
updated_to = '{} (frozen)'.format(new_info.frozen)
else:
updated_to = new_info.rev
msg = 'updating {} -> {}.'.format(info.rev, updated_to)
output.write_line(msg)
rev_infos.append(new_info)
else:
output.write_line('already up to date.')
output_repos.append(repo_config)
rev_infos.append(None)
if changed:
output_config = input_config.copy()
output_config['repos'] = output_repos
_write_new_config_file(config_file, output_config)
_write_new_config(config_file, rev_infos)
return retv