Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.hg_util
import logging
import os
import subprocess
import tempfile
from datetime import datetime
from time import gmtime
from galaxy.tool_shed.util import basic_util
from galaxy.tool_shed.util.hg_util import (
clone_repository,
copy_file_from_manifest,
get_changectx_for_changeset,
get_config_from_disk,
get_ctx_file_path_from_manifest,
get_file_context_from_ctx,
pull_repository,
reversed_lower_upper_bounded_changelog,
reversed_upper_bounded_changelog,
update_repository,
)
from galaxy.util import unicodify
log = logging.getLogger(__name__)
INITIAL_CHANGELOG_HASH = '000000000000'
[docs]def add_changeset(repo_path, path_to_filename_in_archive):
try:
subprocess.check_output(['hg', 'add', path_to_filename_in_archive], stderr=subprocess.STDOUT, cwd=repo_path)
except Exception as e:
error_message = "Error adding '{}' to repository: {}".format(path_to_filename_in_archive, unicodify(e))
if isinstance(e, subprocess.CalledProcessError):
error_message += "\nOutput was:\n%s" % unicodify(e.output)
raise Exception(error_message)
[docs]def archive_repository_revision(app, repository, archive_dir, changeset_revision):
'''Create an un-versioned archive of a repository.'''
repo_path = repository.repo_path(app)
try:
subprocess.check_output(['hg', 'archive', '-r', changeset_revision, archive_dir], stderr=subprocess.STDOUT, cwd=repo_path)
except Exception as e:
error_message = "Error attempting to archive revision '{}' of repository '{}': {}".format(changeset_revision, repository.name, unicodify(e))
if isinstance(e, subprocess.CalledProcessError):
error_message += "\nOutput was:\n%s" % unicodify(e.output)
log.exception(error_message)
raise Exception(error_message)
[docs]def commit_changeset(repo_path, full_path_to_changeset, username, message):
try:
subprocess.check_output(['hg', 'commit', '-u', username, '-m', message, full_path_to_changeset], stderr=subprocess.STDOUT, cwd=repo_path)
except Exception as e:
error_message = "Error committing '{}' to repository: {}".format(full_path_to_changeset, unicodify(e))
if isinstance(e, subprocess.CalledProcessError):
if e.returncode == 1 and 'nothing changed' in unicodify(e.output):
return
error_message += "\nOutput was:\n%s" % unicodify(e.output)
raise Exception(error_message)
def get_hgrc_path(repo_path):
return os.path.join(repo_path, '.hg', 'hgrc')
[docs]def create_hgrc_file(app, repository):
# Since we support both http and https, we set `push_ssl` to False to
# override the default (which is True) in the Mercurial API.
# The hg purge extension purges all files and directories not being tracked
# by Mercurial in the current repository. It will remove unknown files and
# empty directories. This is not currently used because it is not supported
# in the Mercurial API.
repo_path = repository.repo_path(app)
hgrc_path = get_hgrc_path(repo_path)
with open(hgrc_path, 'w') as fp:
fp.write('[paths]\n')
fp.write('default = .\n')
fp.write('default-push = .\n')
fp.write('[web]\n')
fp.write('allow_push = %s\n' % repository.user.username)
fp.write('name = %s\n' % repository.name)
fp.write('push_ssl = false\n')
fp.write('[extensions]\n')
fp.write('hgext.purge=')
[docs]def get_named_tmpfile_from_ctx(ctx, filename, dir):
"""
Return a named temporary file created from a specified file with a given name included in a repository
changeset revision.
"""
filename = basic_util.strip_path(filename)
for ctx_file in ctx.files():
ctx_file_name = basic_util.strip_path(unicodify(ctx_file))
if filename == ctx_file_name:
try:
# If the file was moved, its destination file contents will be returned here.
fctx = ctx[ctx_file]
except LookupError:
# Continue looking in case the file was moved.
fctx = None
continue
if fctx:
fh = tempfile.NamedTemporaryFile('wb', prefix="tmp-toolshed-gntfc", dir=dir)
tmp_filename = fh.name
fh.close()
fh = open(tmp_filename, 'wb')
fh.write(fctx.data())
fh.close()
return tmp_filename
return None
[docs]def get_readable_ctx_date(ctx):
"""Convert the date of the changeset (the received ctx) to a human-readable date."""
t, tz = ctx.date()
date = datetime(*gmtime(float(t) - tz)[:6])
ctx_date = date.strftime("%Y-%m-%d")
return ctx_date
[docs]def get_repository_heads(repo):
"""Return current repository heads, which are changesets with no child changesets."""
heads = [repo[h] for h in repo.heads(None)]
return heads
[docs]def get_reversed_changelog_changesets(repo):
"""Return a list of changesets in reverse order from that provided by the repository manifest."""
reversed_changelog = []
for changeset in repo.changelog:
reversed_changelog.insert(0, changeset)
return reversed_changelog
[docs]def get_revision_label(app, repository, changeset_revision, include_date=True, include_hash=True):
"""
Return a string consisting of the human readable changeset rev and the changeset revision string
which includes the revision date if the receive include_date is True.
"""
repo = repository.hg_repo
ctx = get_changectx_for_changeset(repo, changeset_revision)
if ctx:
return get_revision_label_from_ctx(ctx, include_date=include_date, include_hash=include_hash)
else:
if include_hash:
return "-1:%s" % changeset_revision
else:
return "-1"
[docs]def get_rev_label_changeset_revision_from_repository_metadata(app, repository_metadata, repository=None,
include_date=True, include_hash=True):
if repository is None:
repository = repository_metadata.repository
repo = repository.hg_repo
changeset_revision = repository_metadata.changeset_revision
ctx = get_changectx_for_changeset(repo, changeset_revision)
if ctx:
rev = '%04d' % ctx.rev()
if include_date:
changeset_revision_date = get_readable_ctx_date(ctx)
if include_hash:
label = "{}:{} ({})".format(str(ctx.rev()), changeset_revision, changeset_revision_date)
else:
label = "{} ({})".format(str(ctx.rev()), changeset_revision_date)
else:
if include_hash:
label = "{}:{}".format(str(ctx.rev()), changeset_revision)
else:
label = "%s" % str(ctx.rev())
else:
rev = '-1'
if include_hash:
label = "-1:%s" % changeset_revision
else:
label = "-1"
return rev, label, changeset_revision
[docs]def get_revision_label_from_ctx(ctx, include_date=True, include_hash=True):
if include_date:
if include_hash:
return '%s:%s <i><font color="#666666">(%s)</font></i>' % \
(str(ctx.rev()), str(ctx), str(get_readable_ctx_date(ctx)))
else:
return '%s <i><font color="#666666">(%s)</font></i>' % \
(str(ctx.rev()), str(get_readable_ctx_date(ctx)))
else:
if include_hash:
return '{}:{}'.format(str(ctx.rev()), str(ctx))
else:
return str(ctx.rev())
[docs]def get_rev_label_from_changeset_revision(repo, changeset_revision, include_date=True, include_hash=True):
"""
Given a changeset revision hash, return two strings, the changeset rev and the changeset revision hash
which includes the revision date if the receive include_date is True.
"""
ctx = get_changectx_for_changeset(repo, changeset_revision)
if ctx:
rev = '%04d' % ctx.rev()
label = get_revision_label_from_ctx(ctx, include_date=include_date)
else:
rev = '-1'
label = "-1:%s" % changeset_revision
return rev, label
[docs]def remove_file(repo_path, selected_file, force=True):
cmd = ['hg', 'remove']
if force:
cmd.append('--force')
cmd.append(selected_file)
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT, cwd=repo_path)
except Exception as e:
error_message = "Error removing file '{}': {}".format(selected_file, unicodify(e))
if isinstance(e, subprocess.CalledProcessError):
error_message += "\nOutput was:\n%s" % unicodify(e.output)
raise Exception(error_message)
[docs]def init_repository(repo_path):
"""
Create a new Mercurial repository in the given directory.
"""
try:
subprocess.check_output(['hg', 'init'], stderr=subprocess.STDOUT, cwd=repo_path)
except Exception as e:
error_message = 'Error initializing repository: %s' % unicodify(e)
if isinstance(e, subprocess.CalledProcessError):
error_message += "\nOutput was:\n%s" % unicodify(e.output)
raise Exception(error_message)
[docs]def changeset2rev(repo_path, changeset_revision):
"""
Return the revision number (as an int) corresponding to a specified changeset revision.
"""
try:
rev = subprocess.check_output(['hg', 'id', '-r', changeset_revision, '-n'], stderr=subprocess.STDOUT, cwd=repo_path)
except Exception as e:
error_message = "Error looking for changeset '{}': {}".format(changeset_revision, unicodify(e))
if isinstance(e, subprocess.CalledProcessError):
error_message += "\nOutput was:\n%s" % unicodify(e.output)
raise Exception(error_message)
return int(rev.strip())
__all__ = (
'add_changeset',
'archive_repository_revision',
'clone_repository',
'commit_changeset',
'copy_file_from_manifest',
'create_hgrc_file',
'get_changectx_for_changeset',
'get_config_from_disk',
'get_ctx_file_path_from_manifest',
'get_file_context_from_ctx',
'get_named_tmpfile_from_ctx',
'get_readable_ctx_date',
'get_repository_heads',
'get_reversed_changelog_changesets',
'get_revision_label',
'get_rev_label_changeset_revision_from_repository_metadata',
'get_revision_label_from_ctx',
'get_rev_label_from_changeset_revision',
'pull_repository',
'remove_file',
'reversed_lower_upper_bounded_changelog',
'reversed_upper_bounded_changelog',
'update_repository',
'init_repository',
'changeset2rev',
)