Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.deps.mulled.mulled_list

#!/usr/bin/env python

import argparse
import logging
from glob import glob
from html.parser import HTMLParser

from galaxy.util import requests
from .util import MULLED_SOCKET_TIMEOUT

QUAY_API_ENDPOINT = "https://quay.io/api/v1/repository"


[docs]def get_quay_containers(repository="biocontainers"): """ Get all quay containers in the biocontainers repo """ containers = [] repos_parameters = {"public": "true", "namespace": repository} repos_headers = {"Accept-encoding": "gzip", "Accept": "application/json"} repos_response = requests.get(QUAY_API_ENDPOINT, headers=repos_headers, params=repos_parameters, timeout=12) repos = repos_response.json()["repositories"] for repo in repos: logging.info(repo) tags_response = requests.get(f"{QUAY_API_ENDPOINT}/{repository}/{repo['name']}", timeout=MULLED_SOCKET_TIMEOUT) tags = tags_response.json()["tags"] for tag in tags: containers.append(f"{repo['name']}:{tag}") return containers
[docs]def get_singularity_containers(): """ Get all existing singularity containers from "https://depot.galaxyproject.org/singularity/" """ class GetContainerNames(HTMLParser): # small parser which gets list of containers def __init__(self): HTMLParser.__init__(self) self.containers = [] def handle_starttag(self, tag, attrs): try: for attr in attrs: if attr[0] == "href" and attr[1] != "../": self.containers.append(attr[1].replace("%3A", ":")) except IndexError: pass parser = GetContainerNames() index = requests.get("https://depot.galaxyproject.org/singularity/", timeout=MULLED_SOCKET_TIMEOUT) parser.feed(index.text) return parser.containers
[docs]def get_conda_envs(filepath): """ Get list of already existing envs """ return [n.split("__")[-1].replace("@", ":") for n in glob(f"{filepath}/*")]
[docs]def get_missing_containers(quay_list, singularity_list, blocklist_file=None): r""" Return list of quay containers that do not exist as singularity containers. Files stored in a blocklist will be ignored """ blocklist = [] if blocklist_file: with open(blocklist_file) as fh: blocklist = fh.read().split("\n") return [n for n in quay_list if n not in singularity_list and n not in blocklist]
[docs]def get_missing_envs(quay_list, conda_list, blocklist_file=None): r""" Compares list of conda envs and docker containers and returns missing conda envs """ blocklist = [] if blocklist_file: with open(blocklist_file) as fh: blocklist = fh.read().split("\n") return [n for n in quay_list if n.split("--")[0] not in conda_list and n.split("--")[0] not in blocklist]
[docs]def main(): parser = argparse.ArgumentParser( description="Returns list of Docker containers in the quay.io biocontainers repository." ) parser.add_argument("--source", "-s", help="Docker, Singularity or Conda.") parser.add_argument( "--not-singularity", dest="not_singularity", action="store_true", help="Exclude Docker containers from which Singularity containers have already been built.", ) parser.add_argument( "--not-conda", dest="not_conda", action="store_true", help="Exclude Docker containers from which Conda environments have already been extracted.", ) parser.add_argument( "--conda-filepath", dest="conda_filepath", default=None, help="If searching for conda environments or employing the --not-conda option, a filepath where the environments are located.", ) parser.add_argument( "-b", "--blocklist", "--blacklist", dest="blocklist", default=None, help="Provide a 'blocklist file' containing containers which should be excluded from the list.", ) parser.add_argument( "-f", "--file", dest="output", default=None, help="File to write list to. If not given output will be returned on the command line.", ) args = parser.parse_args() if args.source == "docker": containers = get_quay_containers() if args.not_singularity: containers = get_missing_containers(containers, get_singularity_containers(), args.blocklist) if args.not_conda: containers = get_missing_envs(containers, get_conda_envs(args.conda_filepath), args.blocklist) elif args.source == "singularity": containers = get_singularity_containers() elif args.source == "conda": containers = get_conda_envs(args.conda_filepath) else: print("The 'source' argument was not understood.") return if args.output: with open(args.output, "a") as f: for container in containers: f.write(f"{container}\n") else: print(containers)
if __name__ == "__main__": main()