Source code for galaxy.webapps.galaxy.api.webhooks

"""
API Controller providing Galaxy Webhooks
"""

import importlib.util
import logging
from typing import Any

from galaxy.web import expose_api_anonymous_and_sessionless
from galaxy.webapps.base.webapp import GalaxyWebTransaction
from . import BaseGalaxyAPIController

log = logging.getLogger(__name__)


[docs]class WebhooksController(BaseGalaxyAPIController):
[docs] @expose_api_anonymous_and_sessionless def all_webhooks(self, trans: GalaxyWebTransaction, **kwd): """ GET /api/webhooks/ Return all webhooks. """ return [webhook.to_dict() for webhook in self.app.webhooks_registry.webhooks]
[docs] @expose_api_anonymous_and_sessionless def webhook_data(self, trans: Any, webhook_id, **kwd): """ GET /api/webhooks/{webhook_id}/data/{params} Return the result of executing helper function. """ params = {} for key, value in kwd.items(): params[key] = value webhook = next(webhook for webhook in self.app.webhooks_registry.webhooks if webhook.id == webhook_id) if webhook and webhook.helper != "": spec = importlib.util.spec_from_file_location(webhook.path, webhook.helper) assert spec module = importlib.util.module_from_spec(spec) assert spec.loader spec.loader.exec_module(module) return module.main( trans, webhook, params, ) else: return {}