Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.deps.brew_exts
#!/usr/bin/env python
# % brew vinstall samtools 1.0
# % brew vinstall samtools 0.1.19
# % brew vinstall samtools 1.1
# % brew env samtools 1.1
# PATH=/home/john/.linuxbrew/Cellar/htslib/1.1/bin:/home/john/.linuxbrew/Cellar/samtools/1.1/bin:$PATH
# export PATH
# LD_LIBRARY_PATH=/home/john/.linuxbrew/Cellar/htslib/1.1/lib:/home/john/.linuxbrew/Cellar/samtools/1.1/lib:$LD_LIBRARY_PATH
# export LD_LIBRARY_PATH
# % . <(brew env samtools 1.1)
# % which samtools
# /home/john/.linuxbrew/Cellar/samtools/1.1/bin/samtools
# % . <(brew env samtools 0.1.19)
# % which samtools
# /home/john/.linuxbrew/Cellar/samtools/0.1.19/bin/samtools
# % brew vuninstall samtools 1.0
# % brew vdeps samtools 1.1
# htslib@1.1
# % brew vdeps samtools 0.1.19
from __future__ import print_function
import argparse
import contextlib
import glob
import json
import os
import re
import string
import subprocess
import sys
WHITESPACE_PATTERN = re.compile(r"[\s]+")
DESCRIPTION = "Script built on top of linuxbrew to operate on isolated, versioned brew installed environments."
if sys.platform == "darwin":
DEFAULT_HOMEBREW_ROOT = "/usr/local"
else:
DEFAULT_HOMEBREW_ROOT = os.path.join(os.path.expanduser("~"), ".linuxbrew")
NO_BREW_ERROR_MESSAGE = "Could not find brew on PATH, please place on path or pass to script with --brew argument."
CANNOT_DETERMINE_TAP_ERROR_MESSAGE = "Cannot determine tap of specified recipe - please use fully qualified recipe (e.g. homebrew/science/samtools)."
VERBOSE = False
RELAXED = False
BREW_ARGS = []
[docs]class BrewContext(object):
[docs] def __init__(self, args=None):
ensure_brew_on_path(args)
raw_config = brew_execute(["config"])
config_lines = [l.strip().split(":", 1) for l in raw_config.split("\n") if l]
config = dict([(p[0].strip(), p[1].strip()) for p in config_lines])
# unset if "/usr/local" -> https://github.com/Homebrew/homebrew/blob/master/Library/Homebrew/cmd/config.rb
homebrew_prefix = config.get("HOMEBREW_PREFIX", "/usr/local")
homebrew_cellar = config.get("HOMEBREW_CELLAR", os.path.join(homebrew_prefix, "Cellar"))
self.homebrew_prefix = homebrew_prefix
self.homebrew_cellar = homebrew_cellar
[docs]class RecipeContext(object):
[docs] @staticmethod
def from_args(args, brew_context=None):
return RecipeContext(args.recipe, args.version, brew_context)
[docs] def __init__(self, recipe, version, brew_context=None):
self.recipe = recipe
self.version = version
self.brew_context = brew_context or BrewContext()
@property
def cellar_path(self):
return recipe_cellar_path(self.brew_context.homebrew_cellar, self.recipe, self.version)
@property
def tap_path(self):
return os.path.join(self.brew_context.homebrew_prefix, "Library", "Taps", self.__tap_path(self.recipe))
def __tap_path(self, recipe):
parts = recipe.split("/")
if len(parts) == 1:
info = brew_info(self.recipe)
from_url = info["from_url"]
if not from_url:
raise Exception(CANNOT_DETERMINE_TAP_ERROR_MESSAGE)
from_url_parts = from_url.split("/")
blob_index = from_url_parts.index("blob") # comes right after username and repository
if blob_index < 2:
raise Exception(CANNOT_DETERMINE_TAP_ERROR_MESSAGE)
username = from_url_parts[blob_index - 2]
repository = from_url_parts[blob_index - 1]
else:
assert len(parts) == 3
parts = recipe.split("/")
username = parts[0]
repository = "homebrew-%s" % parts[1]
path = os.path.join(username, repository)
return path
[docs]def main():
global VERBOSE
global RELAXED
global BREW_ARGS
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("--brew", help="Path to linuxbrew 'brew' executable to target")
actions = ["vinstall", "vuninstall", "vdeps", "vinfo", "env"]
action = __action(sys)
if not action:
parser.add_argument('action', metavar='action', help="Versioned action to perform.", choices=actions)
parser.add_argument('recipe', metavar='recipe', help="Recipe for action - should be absolute (e.g. homebrew/science/samtools).")
parser.add_argument('version', metavar='version', help="Version for action (e.g. 0.1.19).")
parser.add_argument('--relaxed', action='store_true', help="Relaxed processing - for instance allow use of env on non-vinstall-ed recipes.")
parser.add_argument('--verbose', action='store_true', help="Verbose output")
parser.add_argument('restargs', nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.verbose:
VERBOSE = True
if args.relaxed:
RELAXED = True
BREW_ARGS = args.restargs
if not action:
action = args.action
brew_context = BrewContext(args)
recipe_context = RecipeContext.from_args(args, brew_context)
if action == "vinstall":
versioned_install(recipe_context, args.recipe, args.version)
elif action == "vuninstall":
brew_execute(["switch", args.recipe, args.version])
brew_execute(["uninstall", args.recipe])
elif action == "vdeps":
print_versioned_deps(recipe_context, args.recipe, args.version)
elif action == "env":
env_statements = build_env_statements_from_recipe_context(recipe_context)
print(env_statements)
elif action == "vinfo":
with brew_head_at_version(recipe_context, args.recipe, args.version):
print(brew_info(args.recipe))
else:
raise NotImplementedError()
[docs]class CommandLineException(Exception):
[docs] def __init__(self, command, stdout, stderr):
self.command = command
self.stdout = stdout
self.stderr = stderr
self.message = ("Failed to execute command-line %s, stderr was:\n"
"-------->>begin stderr<<--------\n"
"%s\n"
"-------->>end stderr<<--------\n"
"-------->>begin stdout<<--------\n"
"%s\n"
"-------->>end stdout<<--------\n"
) % (command, stderr, stdout)
def __str__(self):
return self.message
[docs]def versioned_install(recipe_context, package=None, version=None, installed_deps=[]):
if package is None:
package = recipe_context.recipe
version = recipe_context.version
attempt_unlink(package)
with brew_head_at_version(recipe_context, package, version):
deps = brew_deps(package)
deps_metadata = []
dep_to_version = {}
for dep in deps:
version_info = brew_versions_info(dep, recipe_context.tap_path)[0]
dep_version = version_info[0]
dep_to_version[dep] = dep_version
versioned = version_info[2]
if versioned:
dep_to_version[dep] = dep_version
if dep in installed_deps:
continue
versioned_install(recipe_context, dep, dep_version)
installed_deps.append(dep)
else:
# Install latest.
dep_to_version[dep] = None
if dep in installed_deps:
continue
unversioned_install(dep)
try:
for dep in deps:
dep_version = dep_to_version[dep]
if dep_version:
brew_execute(["switch", dep, dep_version])
else:
brew_execute(["link", dep])
# dep_version obtained from brew versions doesn't
# include revision. This linked_keg attribute does.
keg_verion = brew_info(dep)["linked_keg"]
dep_metadata = {
'name': dep,
'version': keg_verion,
'versioned': versioned
}
deps_metadata.append(dep_metadata)
cellar_root = recipe_context.brew_context.homebrew_cellar
cellar_path = recipe_context.cellar_path
env_actions = build_env_actions(deps_metadata, cellar_root, cellar_path, custom_only=True)
env = EnvAction.build_env(env_actions)
args = ["install"]
if VERBOSE:
args.append("--verbose")
args.extend(BREW_ARGS)
args.append(package)
brew_execute(args, env=env)
deps = brew_execute(["deps", package])
deps = [d.strip() for d in deps.split("\n") if d]
metadata = {
'deps': deps_metadata
}
cellar_root = recipe_context.brew_context.homebrew_cellar
cellar_path = recipe_cellar_path(cellar_root, package, version)
v_metadata_path = os.path.join(cellar_path, "INSTALL_RECEIPT_VERSIONED.json")
with open(v_metadata_path, "w") as f:
json.dump(metadata, f)
finally:
attempt_unlink_all(package, deps)
[docs]def commit_for_version(recipe_context, package, version):
tap_path = recipe_context.tap_path
commit = None
with brew_head_at_commit("master", tap_path):
version_to_commit = brew_versions_info(package, tap_path)
if version is None:
version = version_to_commit[0][0]
commit = version_to_commit[0][1]
else:
for mapping in version_to_commit:
if mapping[0] == version:
commit = mapping[1]
if commit is None:
raise Exception("Failed to find commit for version %s" % version)
return commit
[docs]def print_versioned_deps(recipe_context, recipe, version):
deps = load_versioned_deps(recipe_context.cellar_path)
for dep in deps:
val = dep['name']
if dep['versioned']:
val += "@%s" % dep['version']
print(val)
[docs]def load_versioned_deps(cellar_path, relaxed=None):
if relaxed is None:
relaxed = RELAXED
v_metadata_path = os.path.join(cellar_path, "INSTALL_RECEIPT_VERSIONED.json")
if not os.path.isfile(v_metadata_path):
if RELAXED:
return []
else:
raise IOError("Could not locate versioned receipt file: {}".format(v_metadata_path))
with open(v_metadata_path, "r") as f:
metadata = json.load(f)
return metadata['deps']
[docs]def unversioned_install(package):
try:
deps = brew_deps(package)
for dep in deps:
brew_execute(["link", dep])
brew_execute(["install", package])
finally:
attempt_unlink_all(package, deps)
[docs]def attempt_unlink_all(package, deps):
for dep in deps:
attempt_unlink(dep)
attempt_unlink(package)
[docs]def attempt_unlink(package):
try:
brew_execute(["unlink", package])
except Exception:
# TODO: warn
pass
[docs]def brew_execute(args, env=None):
os.environ["HOMEBREW_NO_EMOJI"] = "1" # simplify brew parsing.
cmds = ["brew"] + args
return execute(cmds, env=env)
[docs]def build_env_statements_from_recipe_context(recipe_context, **kwds):
cellar_root = recipe_context.brew_context.homebrew_cellar
env_statements = build_env_statements(cellar_root, recipe_context.cellar_path, **kwds)
return env_statements
[docs]def build_env_statements(cellar_root, cellar_path, relaxed=None, custom_only=False):
deps = load_versioned_deps(cellar_path, relaxed=relaxed)
actions = build_env_actions(deps, cellar_root, cellar_path, relaxed, custom_only)
env_statements = []
for action in actions:
env_statements.extend(action.to_statements())
return "\n".join(env_statements)
[docs]def build_env_actions(deps, cellar_root, cellar_path, relaxed=None, custom_only=False):
path_appends = []
ld_path_appends = []
actions = []
def handle_keg(cellar_path):
bin_path = os.path.join(cellar_path, "bin")
if os.path.isdir(bin_path):
path_appends.append(bin_path)
lib_path = os.path.join(cellar_path, "lib")
if os.path.isdir(lib_path):
ld_path_appends.append(lib_path)
env_path = os.path.join(cellar_path, "platform_environment.json")
if os.path.exists(env_path):
with open(env_path, "r") as f:
env_metadata = json.load(f)
if "actions" in env_metadata:
def to_action(desc):
return EnvAction(cellar_path, desc)
actions.extend(map(to_action, env_metadata["actions"]))
for dep in deps:
package = dep['name']
version = dep['version']
dep_cellar_path = recipe_cellar_path(cellar_root, package, version)
handle_keg(dep_cellar_path)
handle_keg(cellar_path)
if not custom_only:
if path_appends:
actions.append(EnvAction(cellar_path, {"action": "prepend", "variable": "PATH", "value": ":".join(path_appends)}))
if ld_path_appends:
actions.append(EnvAction(cellar_path, {"action": "prepend", "variable": "LD_LIBRARY_PATH", "value": ":".join(path_appends)}))
return actions
[docs]class EnvAction(object):
[docs] def __init__(self, keg_root, action_description):
self.variable = action_description["variable"]
self.action = action_description["action"]
self.value = string.Template(action_description["value"]).safe_substitute({
'KEG_ROOT': keg_root,
})
[docs] @staticmethod
def build_env(env_actions):
new_env = os.environ.copy()
map(lambda env_action: env_action.modify_environ(new_env), env_actions)
return new_env
[docs] def modify_environ(self, environ):
if self.action == "set" or not environ.get(self.variable, ""):
environ[self.variable] = self.__eval("${value}")
elif self.action == "prepend":
environ[self.variable] = self.__eval("${value}:%s" % environ[self.variable])
else:
environ[self.variable] = self.__eval("%s:${value}" % environ[self.variable])
def __eval(self, template):
return string.Template(template).safe_substitute(
variable=self.variable,
value=self.value,
)
[docs] def to_statements(self):
if self.action == "set":
template = '''${variable}="${value}"'''
elif self.action == "prepend":
template = '''${variable}="${value}:$$${variable}"'''
else:
template = '''${variable}="$$${variable}:${value}"'''
return [
self.__eval(template),
"export %s" % self.variable
]
[docs]@contextlib.contextmanager
def brew_head_at_version(recipe_context, package, version):
commit = commit_for_version(recipe_context, package, version)
tap_path = recipe_context.tap_path
with brew_head_at_commit(commit, tap_path):
yield
[docs]@contextlib.contextmanager
def brew_head_at_commit(commit, tap_path):
try:
os.chdir(tap_path)
current_commit = git_execute(["rev-parse", "HEAD"]).strip()
try:
git_execute(["checkout", commit])
yield
finally:
git_execute(["checkout", current_commit])
finally:
# TODO: restore chdir - or better yet just don't chdir
# shouldn't be needed.
pass
[docs]def execute(cmds, env=None):
subprocess_kwds = dict(
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if env:
subprocess_kwds["env"] = env
p = subprocess.Popen(cmds, **subprocess_kwds)
# log = p.stdout.read()
global VERBOSE
stdout, stderr = p.communicate()
if p.returncode != 0:
raise CommandLineException(" ".join(cmds), stdout, stderr)
if VERBOSE:
print(stdout)
return stdout
[docs]def brew_deps(package):
args = ["deps"]
args.extend(BREW_ARGS)
args.append(package)
stdout = brew_execute(args)
return [p.strip() for p in stdout.split("\n") if p]
[docs]def brew_info(recipe):
info_json = brew_execute(["info", "--json=v1", recipe])
info = json.loads(info_json)[0]
info.update(extended_brew_info(recipe))
return info
[docs]def extended_brew_info(recipe):
# Extract more info from non-json variant. JSON variant should
# include this in a backward compatible way (TODO: Open PR).
raw_info = brew_execute(["info", recipe])
extra_info = dict(
from_url=None,
build_dependencies=[],
required_dependencies=[],
recommended_dependencies=[],
optional_dependencies=[],
)
for line in raw_info.split("\n"):
if line.startswith("From: "):
extra_info["from_url"] = line[len("From: "):].strip()
for dep_type in ["Build", "Required", "Recommended", "Optional"]:
if line.startswith("%s: " % dep_type):
key = "%s_dependencies" % dep_type.lower()
raw_val = line[len("%s: " % dep_type):]
extra_info[key].extend(raw_val.split(", "))
return extra_info
[docs]def brew_versions_info(package, tap_path):
def versioned(recipe_path):
if not os.path.isabs(recipe_path):
recipe_path = os.path.join(os.getcwd(), recipe_path)
# Dependencies in the same repository should be versioned,
# core dependencies (presumably in base homebrew) are not
# versioned.
return tap_path in recipe_path
# TODO: Also use tags.
stdout = brew_execute(["versions", package])
version_parts = [l for l in stdout.split("\n") if l and "git checkout" in l]
version_parts = map(lambda l: WHITESPACE_PATTERN.split(l), version_parts)
info = [(p[0], p[3], versioned(p[4])) for p in version_parts]
return info
def __action(sys):
script_name = os.path.basename(sys.argv[0])
if script_name.startswith("brew-"):
return script_name[len("brew-"):]
else:
return None
[docs]def recipe_cellar_path(cellar_path, recipe, version):
recipe_base = recipe.split("/")[-1]
recipe_base_path = os.path.join(cellar_path, recipe_base, version)
revision_paths = glob.glob(recipe_base_path + "_*")
if revision_paths:
revisions = map(lambda x: int(x.rsplit("_", 1)[-1]), revision_paths)
max_revision = max(revisions)
recipe_path = "%s_%d" % (recipe_base_path, max_revision)
else:
recipe_path = recipe_base_path
return recipe_path
[docs]def ensure_brew_on_path(args):
brew_on_path = which("brew")
if brew_on_path:
brew_on_path = os.path.abspath(brew_on_path)
def ensure_on_path(brew):
if brew != brew_on_path:
os.environ["PATH"] = "%s:%s" % (os.path.dirname(brew), os.environ["PATH"])
default_brew_path = os.path.join(DEFAULT_HOMEBREW_ROOT, "bin", "brew")
if args and args.brew:
user_brew_path = os.path.abspath(args.brew)
ensure_on_path(user_brew_path)
elif brew_on_path:
return brew_on_path
elif os.path.exists(default_brew_path):
ensure_on_path(default_brew_path)
else:
raise Exception(NO_BREW_ERROR_MESSAGE)
[docs]def which(file):
# http://stackoverflow.com/questions/5226958/which-equivalent-function-in-python
for path in os.environ["PATH"].split(":"):
if os.path.exists(path + "/" + file):
return path + "/" + file
return None
if __name__ == "__main__":
main()