Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.hg_util

import logging
import os
import subprocess
import tempfile
from datetime import datetime
from time import gmtime

from galaxy.util import unicodify
from tool_shed.util import basic_util

log = logging.getLogger(__name__)

INITIAL_CHANGELOG_HASH = '000000000000'


[docs]def add_changeset(repo_path, path_to_filename_in_archive): try: subprocess.check_output(['hg', 'add', path_to_filename_in_archive], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error adding '%s' to repository: %s" % (path_to_filename_in_archive, unicodify(e)) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % e.output raise Exception(error_message)
[docs]def archive_repository_revision(app, repository, archive_dir, changeset_revision): '''Create an un-versioned archive of a repository.''' repo_path = repository.repo_path(app) try: subprocess.check_output(['hg', 'archive', '-r', changeset_revision, archive_dir], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error attempting to archive revision '%s' of repository '%s': %s" % (changeset_revision, repository.name, unicodify(e)) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) log.exception(error_message) raise Exception(error_message)
[docs]def clone_repository(repository_clone_url, repository_file_dir, ctx_rev=None): """ Clone the repository up to the specified changeset_revision. No subsequent revisions will be present in the cloned repository. """ cmd = ['hg', 'clone'] if ctx_rev: cmd.extend(['-r', str(ctx_rev)]) cmd.extend([repository_clone_url, repository_file_dir]) # Make sure the destination path actually exists before attempting to clone if not os.path.exists(repository_file_dir): os.makedirs(repository_file_dir) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) return True, None except Exception as e: error_message = 'Error cloning repository: %s' % unicodify(e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) log.error(error_message) return False, error_message
[docs]def commit_changeset(repo_path, full_path_to_changeset, username, message): try: subprocess.check_output(['hg', 'commit', '-u', username, '-m', message, full_path_to_changeset], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error committing '%s' to repository: %s" % (full_path_to_changeset, unicodify(e)) if isinstance(e, subprocess.CalledProcessError): if e.returncode == 1 and 'nothing changed' in unicodify(e.output): return error_message += "\nOutput was:\n%s" % unicodify(e.output) raise Exception(error_message)
[docs]def copy_file_from_manifest(repo, changeset_revision, filename, dir): """ Copy the latest version of the file named filename from the repository manifest to the directory to which dir refers. """ for changeset in reversed_upper_bounded_changelog(repo, changeset_revision): changeset_ctx = repo.changectx(changeset) fctx = get_file_context_from_ctx(changeset_ctx, filename) if fctx and fctx not in ['DELETED']: file_path = os.path.join(dir, filename) fh = open(file_path, 'wb') fh.write(fctx.data()) fh.close() return file_path return None
[docs]def create_hgrc_file(app, repository): # Since we support both http and https, we set `push_ssl` to False to # override the default (which is True) in the Mercurial API. # The hg purge extension purges all files and directories not being tracked # by Mercurial in the current repository. It will remove unknown files and # empty directories. This is not currently used because it is not supported # in the Mercurial API. repo_path = repository.repo_path(app) hgrc_path = os.path.join(repo_path, '.hg', 'hgrc') with open(hgrc_path, 'w') as fp: fp.write('[paths]\n') fp.write('default = .\n') fp.write('default-push = .\n') fp.write('[web]\n') fp.write('allow_push = %s\n' % repository.user.username) fp.write('name = %s\n' % repository.name) fp.write('push_ssl = false\n') fp.write('[extensions]\n') fp.write('hgext.purge=')
[docs]def get_changectx_for_changeset(repo, changeset_revision, **kwd): """Retrieve a specified changectx from a repository.""" for changeset in repo.changelog: ctx = repo.changectx(changeset) if str(ctx) == changeset_revision: return ctx return None
[docs]def get_config_from_disk(config_file, relative_install_dir): for root, dirs, files in os.walk(relative_install_dir): if root.find('.hg') < 0: for name in files: if name == config_file: return os.path.abspath(os.path.join(root, name)) return None
[docs]def get_ctx_file_path_from_manifest(filename, repo, changeset_revision): """ Get the ctx file path for the latest revision of filename from the repository manifest up to the value of changeset_revision. """ stripped_filename = basic_util.strip_path(filename) for changeset in reversed_upper_bounded_changelog(repo, changeset_revision): manifest_ctx = repo.changectx(changeset) for ctx_file in manifest_ctx.files(): ctx_file_name = basic_util.strip_path(ctx_file) if ctx_file_name == stripped_filename: return manifest_ctx, ctx_file return None, None
[docs]def get_file_context_from_ctx(ctx, filename): """Return the mercurial file context for a specified file.""" # We have to be careful in determining if we found the correct file because multiple files with # the same name may be in different directories within ctx if the files were moved within the change # set. For example, in the following ctx.files() list, the former may have been moved to the latter: # ['tmap_wrapper_0.0.19/tool_data_table_conf.xml.sample', 'tmap_wrapper_0.3.3/tool_data_table_conf.xml.sample']. # Another scenario is that the file has been deleted. deleted = False filename = basic_util.strip_path(filename) for ctx_file in ctx.files(): ctx_file_name = basic_util.strip_path(ctx_file) if filename == ctx_file_name: try: # If the file was moved, its destination will be returned here. fctx = ctx[ctx_file] return fctx except LookupError: # Set deleted for now, and continue looking in case the file was moved instead of deleted. deleted = True if deleted: return 'DELETED' return None
[docs]def get_named_tmpfile_from_ctx(ctx, filename, dir): """ Return a named temporary file created from a specified file with a given name included in a repository changeset revision. """ filename = basic_util.strip_path(filename) for ctx_file in ctx.files(): ctx_file_name = basic_util.strip_path(ctx_file) if filename == ctx_file_name: try: # If the file was moved, its destination file contents will be returned here. fctx = ctx[ctx_file] except LookupError: # Continue looking in case the file was moved. fctx = None continue if fctx: fh = tempfile.NamedTemporaryFile('wb', prefix="tmp-toolshed-gntfc", dir=dir) tmp_filename = fh.name fh.close() fh = open(tmp_filename, 'wb') fh.write(fctx.data()) fh.close() return tmp_filename return None
[docs]def get_readable_ctx_date(ctx): """Convert the date of the changeset (the received ctx) to a human-readable date.""" t, tz = ctx.date() date = datetime(*gmtime(float(t) - tz)[:6]) ctx_date = date.strftime("%Y-%m-%d") return ctx_date
[docs]def get_repo_for_repository(app, repository=None, repo_path=None): # Import from mercurial here to let Galaxy start under Python 3 from mercurial import ( hg, ui ) if repository is not None: return hg.repository(ui.ui(), repository.repo_path(app)) if repo_path is not None: return hg.repository(ui.ui(), repo_path)
[docs]def get_repository_heads(repo): """Return current repository heads, which are changesets with no child changesets.""" heads = [repo[h] for h in repo.heads(None)] return heads
[docs]def get_reversed_changelog_changesets(repo): """Return a list of changesets in reverse order from that provided by the repository manifest.""" reversed_changelog = [] for changeset in repo.changelog: reversed_changelog.insert(0, changeset) return reversed_changelog
[docs]def get_revision_label(app, repository, changeset_revision, include_date=True, include_hash=True): """ Return a string consisting of the human read-able changeset rev and the changeset revision string which includes the revision date if the receive include_date is True. """ repo = get_repo_for_repository(app, repository=repository) ctx = get_changectx_for_changeset(repo, changeset_revision) if ctx: return get_revision_label_from_ctx(ctx, include_date=include_date, include_hash=include_hash) else: if include_hash: return "-1:%s" % changeset_revision else: return "-1"
[docs]def get_rev_label_changeset_revision_from_repository_metadata(app, repository_metadata, repository=None, include_date=True, include_hash=True): if repository is None: repository = repository_metadata.repository repo = get_repo_for_repository(app, repository=repository) changeset_revision = repository_metadata.changeset_revision ctx = get_changectx_for_changeset(repo, changeset_revision) if ctx: rev = '%04d' % ctx.rev() if include_date: changeset_revision_date = get_readable_ctx_date(ctx) if include_hash: label = "%s:%s (%s)" % (str(ctx.rev()), changeset_revision, changeset_revision_date) else: label = "%s (%s)" % (str(ctx.rev()), changeset_revision_date) else: if include_hash: label = "%s:%s" % (str(ctx.rev()), changeset_revision) else: label = "%s" % str(ctx.rev()) else: rev = '-1' if include_hash: label = "-1:%s" % changeset_revision else: label = "-1" return rev, label, changeset_revision
[docs]def get_revision_label_from_ctx(ctx, include_date=True, include_hash=True): if include_date: if include_hash: return '%s:%s <i><font color="#666666">(%s)</font></i>' % \ (str(ctx.rev()), str(ctx), str(get_readable_ctx_date(ctx))) else: return '%s <i><font color="#666666">(%s)</font></i>' % \ (str(ctx.rev()), str(get_readable_ctx_date(ctx))) else: if include_hash: return '%s:%s' % (str(ctx.rev()), str(ctx)) else: return '%s' % str(ctx.rev())
[docs]def get_rev_label_from_changeset_revision(repo, changeset_revision, include_date=True, include_hash=True): """ Given a changeset revision hash, return two strings, the changeset rev and the changeset revision hash which includes the revision date if the receive include_date is True. """ ctx = get_changectx_for_changeset(repo, changeset_revision) if ctx: rev = '%04d' % ctx.rev() label = get_revision_label_from_ctx(ctx, include_date=include_date) else: rev = '-1' label = "-1:%s" % changeset_revision return rev, label
[docs]def pull_repository(repo_path, repository_clone_url, ctx_rev): """Pull changes from a remote repository to a local one.""" try: subprocess.check_output(['hg', 'pull', '-r', ctx_rev, repository_clone_url], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error pulling revision '%s': %s" % (ctx_rev, unicodify(e)) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) raise Exception(error_message)
[docs]def remove_file(repo_path, selected_file, force=True): cmd = ['hg', 'remove'] if force: cmd.append('--force') cmd.append(selected_file) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error removing file '%s': %s" % (selected_file, unicodify(e)) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) raise Exception(error_message)
[docs]def reversed_lower_upper_bounded_changelog(repo, excluded_lower_bounds_changeset_revision, included_upper_bounds_changeset_revision): """ Return a reversed list of changesets in the repository changelog after the excluded_lower_bounds_changeset_revision, but up to and including the included_upper_bounds_changeset_revision. The value of excluded_lower_bounds_changeset_revision will be the value of INITIAL_CHANGELOG_HASH if no valid changesets exist before included_upper_bounds_changeset_revision. """ # To set excluded_lower_bounds_changeset_revision, calling methods should do the following, where the value # of changeset_revision is a downloadable changeset_revision. # excluded_lower_bounds_changeset_revision = \ # metadata_util.get_previous_metadata_changeset_revision(app, repository, changeset_revision, downloadable=?) if excluded_lower_bounds_changeset_revision == INITIAL_CHANGELOG_HASH: appending_started = True else: appending_started = False reversed_changelog = [] for changeset in repo.changelog: changeset_hash = str(repo.changectx(changeset)) if appending_started: reversed_changelog.insert(0, changeset) if changeset_hash == excluded_lower_bounds_changeset_revision and not appending_started: appending_started = True if changeset_hash == included_upper_bounds_changeset_revision: break return reversed_changelog
[docs]def reversed_upper_bounded_changelog(repo, included_upper_bounds_changeset_revision): """ Return a reversed list of changesets in the repository changelog up to and including the included_upper_bounds_changeset_revision. """ return reversed_lower_upper_bounded_changelog(repo, INITIAL_CHANGELOG_HASH, included_upper_bounds_changeset_revision)
[docs]def update_repository(repo_path, ctx_rev=None): """ Update the cloned repository to changeset_revision. It is critical that the installed repository is updated to the desired changeset_revision before metadata is set because the process for setting metadata uses the repository files on disk. """ # TODO: We may have files on disk in the repo directory that aren't being tracked, so they must be removed. # The codes used to show the status of files are as follows. # M = modified # A = added # R = removed # C = clean # ! = deleted, but still tracked # ? = not tracked # I = ignored # It would be nice if we could use mercurial's purge extension to remove untracked files. The problem is that # purging is not supported by the mercurial API. cmd = ['hg', 'update'] if ctx_rev: cmd.extend(['-r', ctx_rev]) try: subprocess.check_output(cmd, stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = 'Error updating repository: %s' % unicodify(e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) raise Exception(error_message)
[docs]def init_repository(repo_path): """ Create a new Mercurial repository in the given directory. """ try: subprocess.check_output(['hg', 'init'], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = 'Error initializing repository: %s' % unicodify(e) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) raise Exception(error_message)
[docs]def changeset2rev(repo_path, changeset_revision): """ Return the revision number (as an int) corresponding to a specified changeset revision. """ try: rev = subprocess.check_output(['hg', 'id', '-r', changeset_revision, '-n'], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = "Error looking for changeset '%s': %s" % (changeset_revision, unicodify(e)) if isinstance(e, subprocess.CalledProcessError): error_message += "\nOutput was:\n%s" % unicodify(e.output) raise Exception(error_message) return int(rev.strip())