Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.hg_util

import logging
import os
import subprocess
import tempfile
from datetime import datetime
from time import gmtime

from tool_shed.util import basic_util

log = logging.getLogger(__name__)

INITIAL_CHANGELOG_HASH = '000000000000'


[docs]def add_changeset(repo_path, path_to_filename_in_archive): try: subprocess.check_output(['hg', 'add', path_to_filename_in_archive], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error adding '%s' to repository: %s" % (path_to_filename_in_archive, e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def archive_repository_revision(app, repository, archive_dir, changeset_revision): '''Create an un-versioned archive of a repository.''' repo_path = repository.repo_path(app) try: subprocess.check_output(['hg', 'archive', '-r', changeset_revision, archive_dir], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error attempting to archive revision '%s' of repository '%s': %s" % (changeset_revision, repository.name, e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output log.exception(error_message) raise Exception(error_message)
[docs]def clone_repository(repository_clone_url, repository_file_dir, ctx_rev=None): """ Clone the repository up to the specified changeset_revision. No subsequent revisions will be present in the cloned repository. """ cmd = ['hg', 'clone'] if ctx_rev: cmd.extend(['-r', ctx_rev]) cmd.extend([repository_clone_url, repository_file_dir]) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) return True, None except Exception as e: error_message = 'Error cloning repository: %s' % e if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output log.error(error_message) return False, error_message
[docs]def commit_changeset(repo_path, full_path_to_changeset, username, message): try: subprocess.check_output(['hg', 'commit', '-u', username, '-m', message, full_path_to_changeset], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error committing '%s' to repository: %s" % (full_path_to_changeset, e) if isinstance(e, subprocess.CalledProcessError): if e.returncode == 1 and 'nothing changed' in e.output: return error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def copy_file_from_manifest(repo, changeset_revision, filename, dir): """ Copy the latest version of the file named filename from the repository manifest to the directory to which dir refers. """ for changeset in reversed_upper_bounded_changelog(repo, changeset_revision): changeset_ctx = repo.changectx(changeset) fctx = get_file_context_from_ctx(changeset_ctx, filename) if fctx and fctx not in ['DELETED']: file_path = os.path.join(dir, filename) fh = open(file_path, 'wb') fh.write(fctx.data()) fh.close() return file_path return None
[docs]def create_hgrc_file(app, repository): # Since we support both http and https, we set `push_ssl` to False to # override the default (which is True) in the Mercurial API. # The hg purge extension purges all files and directories not being tracked # by Mercurial in the current repository. It will remove unknown files and # empty directories. This is not currently used because it is not supported # in the Mercurial API. repo_path = repository.repo_path(app) hgrc_path = os.path.join(repo_path, '.hg', 'hgrc') with open(hgrc_path, 'wb') as fp: fp.write('[paths]\n') fp.write('default = .\n') fp.write('default-push = .\n') fp.write('[web]\n') fp.write('allow_push = %s\n' % repository.user.username) fp.write('name = %s\n' % repository.name) fp.write('push_ssl = false\n') fp.write('[extensions]\n') fp.write('hgext.purge=')
[docs]def get_changectx_for_changeset(repo, changeset_revision, **kwd): """Retrieve a specified changectx from a repository.""" for changeset in repo.changelog: ctx = repo.changectx(changeset) if str(ctx) == changeset_revision: return ctx return None
[docs]def get_config_from_disk(config_file, relative_install_dir): for root, dirs, files in os.walk(relative_install_dir): if root.find('.hg') < 0: for name in files: if name == config_file: return os.path.abspath(os.path.join(root, name)) return None
[docs]def get_ctx_file_path_from_manifest(filename, repo, changeset_revision): """ Get the ctx file path for the latest revision of filename from the repository manifest up to the value of changeset_revision. """ stripped_filename = basic_util.strip_path(filename) for changeset in reversed_upper_bounded_changelog(repo, changeset_revision): manifest_ctx = repo.changectx(changeset) for ctx_file in manifest_ctx.files(): ctx_file_name = basic_util.strip_path(ctx_file) if ctx_file_name == stripped_filename: return manifest_ctx, ctx_file return None, None
[docs]def get_file_context_from_ctx(ctx, filename): """Return the mercurial file context for a specified file.""" # We have to be careful in determining if we found the correct file because multiple files with # the same name may be in different directories within ctx if the files were moved within the change # set. For example, in the following ctx.files() list, the former may have been moved to the latter: # ['tmap_wrapper_0.0.19/tool_data_table_conf.xml.sample', 'tmap_wrapper_0.3.3/tool_data_table_conf.xml.sample']. # Another scenario is that the file has been deleted. deleted = False filename = basic_util.strip_path(filename) for ctx_file in ctx.files(): ctx_file_name = basic_util.strip_path(ctx_file) if filename == ctx_file_name: try: # If the file was moved, its destination will be returned here. fctx = ctx[ctx_file] return fctx except LookupError: # Set deleted for now, and continue looking in case the file was moved instead of deleted. deleted = True if deleted: return 'DELETED' return None
[docs]def get_named_tmpfile_from_ctx(ctx, filename, dir): """ Return a named temporary file created from a specified file with a given name included in a repository changeset revision. """ filename = basic_util.strip_path(filename) for ctx_file in ctx.files(): ctx_file_name = basic_util.strip_path(ctx_file) if filename == ctx_file_name: try: # If the file was moved, its destination file contents will be returned here. fctx = ctx[ctx_file] except LookupError: # Continue looking in case the file was moved. fctx = None continue if fctx: fh = tempfile.NamedTemporaryFile('wb', prefix="tmp-toolshed-gntfc", dir=dir) tmp_filename = fh.name fh.close() fh = open(tmp_filename, 'wb') fh.write(fctx.data()) fh.close() return tmp_filename return None
[docs]def get_readable_ctx_date(ctx): """Convert the date of the changeset (the received ctx) to a human-readable date.""" t, tz = ctx.date() date = datetime(*gmtime(float(t) - tz)[:6]) ctx_date = date.strftime("%Y-%m-%d") return ctx_date
[docs]def get_repo_for_repository(app, repository=None, repo_path=None): # Import from mercurial here to let Galaxy start under Python 3 from mercurial import ( hg, ui ) if repository is not None: return hg.repository(ui.ui(), repository.repo_path(app)) if repo_path is not None: return hg.repository(ui.ui(), repo_path)
[docs]def get_repository_heads(repo): """Return current repository heads, which are changesets with no child changesets.""" heads = [repo[h] for h in repo.heads(None)] return heads
[docs]def get_reversed_changelog_changesets(repo): """Return a list of changesets in reverse order from that provided by the repository manifest.""" reversed_changelog = [] for changeset in repo.changelog: reversed_changelog.insert(0, changeset) return reversed_changelog
[docs]def get_revision_label(app, repository, changeset_revision, include_date=True, include_hash=True): """ Return a string consisting of the human read-able changeset rev and the changeset revision string which includes the revision date if the receive include_date is True. """ repo = get_repo_for_repository(app, repository=repository) ctx = get_changectx_for_changeset(repo, changeset_revision) if ctx: return get_revision_label_from_ctx(ctx, include_date=include_date, include_hash=include_hash) else: if include_hash: return "-1:%s" % changeset_revision else: return "-1"
[docs]def get_rev_label_changeset_revision_from_repository_metadata(app, repository_metadata, repository=None, include_date=True, include_hash=True): if repository is None: repository = repository_metadata.repository repo = get_repo_for_repository(app, repository=repository) changeset_revision = repository_metadata.changeset_revision ctx = get_changectx_for_changeset(repo, changeset_revision) if ctx: rev = '%04d' % ctx.rev() if include_date: changeset_revision_date = get_readable_ctx_date(ctx) if include_hash: label = "%s:%s (%s)" % (str(ctx.rev()), changeset_revision, changeset_revision_date) else: label = "%s (%s)" % (str(ctx.rev()), changeset_revision_date) else: if include_hash: label = "%s:%s" % (str(ctx.rev()), changeset_revision) else: label = "%s" % str(ctx.rev()) else: rev = '-1' if include_hash: label = "-1:%s" % changeset_revision else: label = "-1" return rev, label, changeset_revision
[docs]def get_revision_label_from_ctx(ctx, include_date=True, include_hash=True): if include_date: if include_hash: return '%s:%s <i><font color="#666666">(%s)</font></i>' % \ (str(ctx.rev()), str(ctx), str(get_readable_ctx_date(ctx))) else: return '%s <i><font color="#666666">(%s)</font></i>' % \ (str(ctx.rev()), str(get_readable_ctx_date(ctx))) else: if include_hash: return '%s:%s' % (str(ctx.rev()), str(ctx)) else: return '%s' % str(ctx.rev())
[docs]def get_rev_label_from_changeset_revision(repo, changeset_revision, include_date=True, include_hash=True): """ Given a changeset revision hash, return two strings, the changeset rev and the changeset revision hash which includes the revision date if the receive include_date is True. """ ctx = get_changectx_for_changeset(repo, changeset_revision) if ctx: rev = '%04d' % ctx.rev() label = get_revision_label_from_ctx(ctx, include_date=include_date) else: rev = '-1' label = "-1:%s" % changeset_revision return rev, label
[docs]def pull_repository(repo_path, repository_clone_url, ctx_rev): """Pull changes from a remote repository to a local one.""" try: subprocess.check_output(['hg', 'pull', '-r', ctx_rev, repository_clone_url], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error pulling revision '%s': %s" % (ctx_rev, e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def remove_file(repo_path, selected_file, force=True): cmd = ['hg', 'remove'] if force: cmd.append('--force') cmd.append(selected_file) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error removing file '%s': %s" % (selected_file, e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def reversed_lower_upper_bounded_changelog(repo, excluded_lower_bounds_changeset_revision, included_upper_bounds_changeset_revision): """ Return a reversed list of changesets in the repository changelog after the excluded_lower_bounds_changeset_revision, but up to and including the included_upper_bounds_changeset_revision. The value of excluded_lower_bounds_changeset_revision will be the value of INITIAL_CHANGELOG_HASH if no valid changesets exist before included_upper_bounds_changeset_revision. """ # To set excluded_lower_bounds_changeset_revision, calling methods should do the following, where the value # of changeset_revision is a downloadable changeset_revision. # excluded_lower_bounds_changeset_revision = \ # metadata_util.get_previous_metadata_changeset_revision(app, repository, changeset_revision, downloadable=?) if excluded_lower_bounds_changeset_revision == INITIAL_CHANGELOG_HASH: appending_started = True else: appending_started = False reversed_changelog = [] for changeset in repo.changelog: changeset_hash = str(repo.changectx(changeset)) if appending_started: reversed_changelog.insert(0, changeset) if changeset_hash == excluded_lower_bounds_changeset_revision and not appending_started: appending_started = True if changeset_hash == included_upper_bounds_changeset_revision: break return reversed_changelog
[docs]def reversed_upper_bounded_changelog(repo, included_upper_bounds_changeset_revision): """ Return a reversed list of changesets in the repository changelog up to and including the included_upper_bounds_changeset_revision. """ return reversed_lower_upper_bounded_changelog(repo, INITIAL_CHANGELOG_HASH, included_upper_bounds_changeset_revision)
[docs]def update_repository(repo_path, ctx_rev=None): """ Update the cloned repository to changeset_revision. It is critical that the installed repository is updated to the desired changeset_revision before metadata is set because the process for setting metadata uses the repository files on disk. """ # TODO: We may have files on disk in the repo directory that aren't being tracked, so they must be removed. # The codes used to show the status of files are as follows. # M = modified # A = added # R = removed # C = clean # ! = deleted, but still tracked # ? = not tracked # I = ignored # It would be nice if we could use mercurial's purge extension to remove untracked files. The problem is that # purging is not supported by the mercurial API. cmd = ['hg', 'update'] if ctx_rev: cmd.extend(['-r', ctx_rev]) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = 'Error updating repository: %s' % e if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def init_repository(repo_path): """ Create a new Mercurial repository in the given directory. """ try: subprocess.check_output(['hg', 'init'], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = 'Error initializing repository: %s' % e if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def changeset2rev(repo_path, changeset_revision): """ Return the revision number corresponding to a specified changeset revision. """ try: rev = subprocess.check_output(['hg', 'id', '-r', changeset_revision, '-n'], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error looking for changeset '%s': %s" % (changeset_revision, e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message) return int(rev.strip())