Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.actions.history_imp_exp

import datetime
import logging
import os
import tempfile
from typing import Optional

from galaxy.job_execution.setup import create_working_directory_for_job
from galaxy.model import (
    History,
    Job,
)
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.actions import (
    ToolAction,
    ToolActionExecuteResult,
)
from galaxy.tools.execute import (
    DatasetCollectionElementsSliceT,
    DEFAULT_DATASET_COLLECTION_ELEMENTS,
    DEFAULT_JOB_CALLBACK,
    DEFAULT_PREFERRED_OBJECT_STORE_ID,
    DEFAULT_RERUN_REMAP_JOB_ID,
    DEFAULT_SET_OUTPUT_HID,
    JobCallbackT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache
from galaxy.tools.imp_exp import (
    JobExportHistoryArchiveWrapper,
    JobImportHistoryArchiveWrapper,
)
from galaxy.util import ready_name_for_url

log = logging.getLogger(__name__)


[docs]class ImportHistoryToolAction(ToolAction): """Tool action used for importing a history to an archive.""" produces_real_jobs: bool = True
[docs] def execute( self, tool, trans, incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, execution_cache: Optional[ToolExecutionCache] = None, dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, completed_job: Optional[Job] = None, collection_info: Optional[MatchingCollections] = None, job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, flush_job: bool = True, skip: bool = False, ) -> ToolActionExecuteResult: # # Create job. # incoming = incoming or {} trans.check_user_activation() job = trans.app.model.Job() job.galaxy_version = trans.app.config.version_major session = trans.get_galaxy_session() job.session_id = session and session.id if history: history_id = history.id elif trans.history: history_id = trans.history.id else: history_id = None job.history_id = history_id job.tool_id = tool.id job.user_id = trans.user.id start_job_state = job.state # should be job.states.NEW job.state = ( job.states.WAITING ) # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters trans.sa_session.add(job) with transaction(trans.sa_session): # ensure job.id are available trans.sa_session.commit() # # Setup job and job wrapper. # # Add association for keeping track of job, history relationship. # Use abspath because mkdtemp() does not, contrary to the documentation, # always return an absolute path. archive_dir = os.path.abspath(tempfile.mkdtemp()) jiha = trans.app.model.JobImportHistoryArchive(job=job, archive_dir=archive_dir) trans.sa_session.add(jiha) job_wrapper = JobImportHistoryArchiveWrapper(trans.app, job) job_wrapper.setup_job(jiha, incoming["__ARCHIVE_SOURCE__"], incoming["__ARCHIVE_TYPE__"]) # # Add parameters to job_parameter table. # # Set additional parameters. incoming["__DEST_DIR__"] = jiha.archive_dir for name, value in tool.params_to_strings(incoming, trans.app).items(): job.add_parameter(name, value) job.state = start_job_state # job inputs have been configured, restore initial job state return job, {}
[docs]class ExportHistoryToolAction(ToolAction): """Tool action used for exporting a history to an archive.""" produces_real_jobs: bool = True
[docs] def execute( self, tool, trans, incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, execution_cache: Optional[ToolExecutionCache] = None, dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, completed_job: Optional[Job] = None, collection_info: Optional[MatchingCollections] = None, job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, flush_job: bool = True, skip: bool = False, ) -> ToolActionExecuteResult: trans.check_user_activation() # # Get history to export. # incoming = incoming or {} history = None for name, value in incoming.items(): if isinstance(value, trans.app.model.History): history_param_name = name history = value del incoming[history_param_name] break if not history: raise Exception("There is no history to export.") # # Create the job and output dataset objects # job = trans.app.model.Job() job.galaxy_version = trans.app.config.version_major session = trans.get_galaxy_session() job.session_id = session and session.id history_id = history.id job.history_id = history_id job.tool_id = tool.id if trans.user: # If this is an actual user, run the job as that individual. Otherwise we're running as guest. job.user_id = trans.user.id job.state = ( job.states.WAITING ) # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters trans.sa_session.add(job) compressed = incoming["compress"] exporting_to_uri = "directory_uri" in incoming if not exporting_to_uri: # see comment below about how this should be transitioned to occuring in a # job handler or detached MQ-driven thread jeha = trans.app.model.JobExportHistoryArchive.create_for_history( history, job, trans.sa_session, trans.app.object_store, compressed ) store_directory = jeha.temp_directory else: # creating a job directory in the web thread is bad (it is slow, bypasses # dynamic objectstore assignment, etc..) but it is arguably less bad than # creating a dataset (like above for dataset export case). # ensure job.id is available with transaction(trans.sa_session): trans.sa_session.commit() job_directory = create_working_directory_for_job(trans.app.object_store, job) store_directory = os.path.join(job_directory, "working", "_object_export") os.makedirs(store_directory) # # Setup job and job wrapper. # cmd_line = f"--galaxy-version '{job.galaxy_version}'" if compressed: cmd_line += " -G" cmd_line = f"{cmd_line} {store_directory}" # # Add parameters to job_parameter table. # # Set additional parameters. incoming["__HISTORY_TO_EXPORT__"] = history.id incoming["__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__"] = cmd_line if exporting_to_uri: directory_uri = incoming["directory_uri"] file_name = incoming.get("file_name") if file_name is None: hname = ready_name_for_url(history.name) human_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") if compressed: extension = ".tar.gz" else: extension = ".tar" file_name = f"Galaxy-History-{hname}-{human_timestamp}.{extension}" file_name = os.path.basename(os.path.abspath(file_name)) sep = "" if directory_uri.endswith("/") else "/" incoming["__EXPORT_TO_URI__"] = f"{directory_uri}{sep}{file_name}" for name, value in tool.params_to_strings(incoming, trans.app).items(): job.add_parameter(name, value) with transaction(trans.sa_session): trans.sa_session.commit() job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job.id) job_wrapper.setup_job( history, store_directory, include_hidden=incoming["include_hidden"], include_deleted=incoming["include_deleted"], compressed=compressed, user=trans.user, ) return job, {}