Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.hg_util

import logging
import os
import shutil
import subprocess
import tempfile
from datetime import datetime
from time import gmtime

from galaxy.tool_shed.util import basic_util
from galaxy.tool_shed.util.hg_util import (
    clone_repository,
    copy_file_from_manifest,
    get_changectx_for_changeset,
    get_config_from_disk,
    get_ctx_file_path_from_manifest,
    get_file_context_from_ctx,
    pull_repository,
    reversed_lower_upper_bounded_changelog,
    reversed_upper_bounded_changelog,
    update_repository,
)
from galaxy.util import unicodify

log = logging.getLogger(__name__)

INITIAL_CHANGELOG_HASH = "000000000000"


[docs]def add_changeset(repo_path, path_to_filename_in_archive): try: subprocess.check_output(["hg", "add", path_to_filename_in_archive], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = f"Error adding '{path_to_filename_in_archive}' to repository: {unicodify(e)}" if isinstance(e, subprocess.CalledProcessError): error_message += f"\nOutput was:\n{unicodify(e.output)}" raise Exception(error_message)
[docs]def archive_repository_revision(app, repository, archive_dir, changeset_revision): """Create an un-versioned archive of a repository.""" repo_path = repository.repo_path(app) try: subprocess.check_output( ["hg", "archive", "-r", changeset_revision, archive_dir], stderr=subprocess.STDOUT, cwd=repo_path ) except Exception as e: error_message = f"Error attempting to archive revision '{changeset_revision}' of repository '{repository.name}': {unicodify(e)}" if isinstance(e, subprocess.CalledProcessError): error_message += f"\nOutput was:\n{unicodify(e.output)}" log.exception(error_message) raise Exception(error_message)
[docs]def commit_changeset(repo_path: str, full_path_to_changeset: str, username: str, message: str) -> None: try: subprocess.check_output( ["hg", "commit", "-u", username, "-m", message, full_path_to_changeset], stderr=subprocess.STDOUT, cwd=repo_path, ) except Exception as e: error_message = f"Error committing '{full_path_to_changeset}' to repository: {unicodify(e)}" if isinstance(e, subprocess.CalledProcessError): if e.returncode == 1 and "nothing changed" in unicodify(e.output): return error_message += f"\nOutput was:\n{unicodify(e.output)}" raise Exception(error_message)
def get_hgrc_path(repo_path): return os.path.join(repo_path, ".hg", "hgrc")
[docs]def create_hgrc_file(app, repository, repo_path): # Since we support both http and https, we set `push_ssl` to False to # override the default (which is True) in the Mercurial API. # The hg purge extension purges all files and directories not being tracked # by Mercurial in the current repository. It will remove unknown files and # empty directories. This is not currently used because it is not supported # in the Mercurial API. hgrc_path = get_hgrc_path(repo_path) with open(hgrc_path, "w") as fp: fp.write("[paths]\n") fp.write("default = .\n") fp.write("default-push = .\n") fp.write("[web]\n") if app.config.config_hg_for_dev: allow_push = "*" else: allow_push = repository.user.username fp.write(f"allow_push = {allow_push}\n") fp.write(f"name = {repository.name}\n") fp.write("push_ssl = false\n") fp.write("[extensions]\n") fp.write("hgext.purge=")
[docs]def get_named_tmpfile_from_ctx(ctx, filename, dir): """ Return a named temporary file created from a specified file with a given name included in a repository changeset revision. """ filename = basic_util.strip_path(filename) for ctx_file in ctx.files(): ctx_file_name = basic_util.strip_path(unicodify(ctx_file)) if filename == ctx_file_name: try: # If the file was moved, its destination file contents will be returned here. fctx = ctx[ctx_file] except LookupError: # Continue looking in case the file was moved. fctx = None continue if fctx: fh = tempfile.NamedTemporaryFile("wb", prefix="tmp-toolshed-gntfc", dir=dir) tmp_filename = fh.name fh.close() fh = open(tmp_filename, "wb") fh.write(fctx.data()) fh.close() return tmp_filename return None
[docs]def get_readable_ctx_date(ctx): """Convert the date of the changeset (the received ctx) to a human-readable date.""" t, tz = ctx.date() date = datetime(*gmtime(float(t) - tz)[:6]) ctx_date = date.strftime("%Y-%m-%d") return ctx_date
[docs]def get_repository_heads(repo): """Return current repository heads, which are changesets with no child changesets.""" heads = [repo[h] for h in repo.heads(None)] return heads
[docs]def get_reversed_changelog_changesets(repo): """Return a list of changesets in reverse order from that provided by the repository manifest.""" reversed_changelog = [] for changeset in repo.changelog: reversed_changelog.insert(0, changeset) return reversed_changelog
[docs]def get_revision_label(app, repository, changeset_revision, include_date=True, include_hash=True): """ Return a string consisting of the human readable changeset rev and the changeset revision string which includes the revision date if the receive include_date is True. """ repo = repository.hg_repo if ctx := get_changectx_for_changeset(repo, changeset_revision): return get_revision_label_from_ctx(ctx, include_date=include_date, include_hash=include_hash) else: if include_hash: return f"-1:{changeset_revision}" else: return "-1"
[docs]def get_rev_label_changeset_revision_from_repository_metadata( app, repository_metadata, repository=None, include_date=True, include_hash=True ): if repository is None: repository = repository_metadata.repository repo = repository.hg_repo changeset_revision = repository_metadata.changeset_revision if ctx := get_changectx_for_changeset(repo, changeset_revision): rev = f"{ctx.rev():04d}" if include_date: changeset_revision_date = get_readable_ctx_date(ctx) if include_hash: label = f"{ctx.rev()}:{changeset_revision} ({changeset_revision_date})" else: label = f"{ctx.rev()} ({changeset_revision_date})" else: if include_hash: label = f"{ctx.rev()}:{changeset_revision}" else: label = f"{ctx.rev()}" else: rev = "-1" if include_hash: label = f"-1:{changeset_revision}" else: label = "-1" return rev, label, changeset_revision
[docs]def get_revision_label_from_ctx(ctx, include_date=True, include_hash=True): if include_date: if include_hash: return f'{ctx.rev()}:{ctx} <i><font color="#666666">({get_readable_ctx_date(ctx)})</font></i>' else: return f'{ctx.rev()} <i><font color="#666666">({get_readable_ctx_date(ctx)})</font></i>' else: if include_hash: return f"{ctx.rev()}:{ctx}" else: return str(ctx.rev())
[docs]def get_rev_label_from_changeset_revision(repo, changeset_revision, include_date=True, include_hash=True): """ Given a changeset revision hash, return two strings, the changeset rev and the changeset revision hash which includes the revision date if the receive include_date is True. """ if ctx := get_changectx_for_changeset(repo, changeset_revision): rev = f"{ctx.rev():04d}" label = get_revision_label_from_ctx(ctx, include_date=include_date) else: rev = "-1" label = f"-1:{changeset_revision}" return rev, label
[docs]def remove_path(repo_path, selected_file): cmd = ["hg", "remove", "--force", selected_file] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = f"Error removing path '{selected_file}': {unicodify(e)}" if isinstance(e, subprocess.CalledProcessError): output = unicodify(e.output) if "is untracked" in output: # That's ok, happens if we add a new file or directory via tarball upload, # just delete the file or dir on disk selected_file_path = os.path.join(repo_path, selected_file) if os.path.isdir(selected_file_path): shutil.rmtree(selected_file_path) else: os.remove(selected_file_path) return error_message += f"\nOutput was:\n{output}" raise Exception(error_message)
[docs]def init_repository(repo_path): """ Create a new Mercurial repository in the given directory. """ try: subprocess.check_output(["hg", "init"], stderr=subprocess.STDOUT, cwd=repo_path) except Exception as e: error_message = f"Error initializing repository: {unicodify(e)}" if isinstance(e, subprocess.CalledProcessError): error_message += f"\nOutput was:\n{unicodify(e.output)}" raise Exception(error_message)
[docs]def changeset2rev(repo_path, changeset_revision): """ Return the revision number (as an int) corresponding to a specified changeset revision. """ try: rev = subprocess.check_output( ["hg", "id", "-r", changeset_revision, "-n"], stderr=subprocess.STDOUT, cwd=repo_path ) except Exception as e: error_message = f"Error looking for changeset '{changeset_revision}': {unicodify(e)}" if isinstance(e, subprocess.CalledProcessError): error_message += f"\nOutput was:\n{unicodify(e.output)}" raise Exception(error_message) return int(rev.strip())
__all__ = ( "add_changeset", "archive_repository_revision", "clone_repository", "commit_changeset", "copy_file_from_manifest", "create_hgrc_file", "get_changectx_for_changeset", "get_config_from_disk", "get_ctx_file_path_from_manifest", "get_file_context_from_ctx", "get_named_tmpfile_from_ctx", "get_readable_ctx_date", "get_repository_heads", "get_reversed_changelog_changesets", "get_revision_label", "get_rev_label_changeset_revision_from_repository_metadata", "get_revision_label_from_ctx", "get_rev_label_from_changeset_revision", "pull_repository", "remove_path", "reversed_lower_upper_bounded_changelog", "reversed_upper_bounded_changelog", "update_repository", "init_repository", "changeset2rev", )