This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.error_reports.plugins.base_git

"""This module defines the common functions for error reporting for Galaxy jobs towards Git applications (e.g. Github/GitLab).

import logging
from abc import (
from typing import Dict

from galaxy.tools.errors import EmailErrorReporter
from galaxy.util import (
from . import ErrorPlugin

log = logging.getLogger(__name__)

[docs]class BaseGitPlugin(ErrorPlugin, metaclass=ABCMeta): """Base definition to send error reports to a Git repository provider""" issue_cache: Dict[str, Dict] = {} ts_urls: Dict[str, str] = {} ts_repo_cache: Dict[str, Dict] = {} git_project_cache: Dict[str, Dict] = {} label_cache: Dict[str, Dict] = {} git_username_id_cache: Dict[str, str] = {} # Git variables git_default_repo_owner = False git_default_repo_name = False git_default_repo_only = True def _determine_ts_url(self, tool): if not tool.tool_shed or self.git_default_repo_only: return None try: if tool.tool_shed not in self.ts_urls: ts_url_request = requests.get(f"http://{tool.tool_shed}", timeout=DEFAULT_SOCKET_TIMEOUT) self.ts_urls[tool.tool_shed] = ts_url_request.url return self.ts_urls[tool.tool_shed] except Exception: return None def _get_gitrepo_from_ts(self, job, ts_url): if not ts_url or self.git_default_repo_only: return None try: if job.tool_id not in self.ts_repo_cache: ts_repo_request_data = requests.get( f"{ts_url}/api/repositories?tool_ids={str(job.tool_id)}", timeout=DEFAULT_SOCKET_TIMEOUT ).json() for repoinfo in ts_repo_request_data.values(): if isinstance(repoinfo, dict): self.ts_repo_cache[job.tool_id] = repoinfo.get("repository", {}).get( "remote_repository_url", None ) return self.ts_repo_cache[job.tool_id] except Exception: return None def _get_issue_cache_key(self, job, ts_repourl): return job.tool_id if ts_repourl else "default" def _generate_error_message(self, dataset, job, kwargs): # We'll re-use the email error reporter's template since most Git providers supports HTML error_reporter = EmailErrorReporter(dataset.id, self.app) error_reporter.create_report( job.get_user(), email=kwargs.get("email", None), message=kwargs.get("message", None), redact_user_details_in_bugreport=self.redact_user_details_in_bugreport, ) # Return the HTML report return error_reporter.html_report def _generate_error_title(self, job): tool_kw = {"tool_id": unicodify(job.tool_id), "tool_version": unicodify(job.tool_version)} return """Galaxy Job Error: {tool_id} v{tool_version}""".format(**tool_kw) @abstractmethod def _create_issue(self, issue_cache_key, error_title, error_mesage, project, **kwargs): raise NotImplementedError("Method _create_issue is required") @abstractmethod def _append_issue(self, issue_cache_key, error_title, error_message, **kwargs): raise NotImplementedError("Method _append_issue is required") @abstractmethod def _fill_issue_cache(self, git_project, issue_cache_key): raise NotImplementedError("Method _fill_issue_cache is required")