Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.imp_exp
import datetime
import json
import logging
import os
import shutil
import tempfile
from json import dumps, load
from sqlalchemy.orm import eagerload_all
from sqlalchemy.sql import expression
from galaxy import model
from galaxy.exceptions import MalformedContents
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.web.framework.helpers import to_unicode
log = logging.getLogger(__name__)
[docs]class JobImportHistoryArchiveWrapper(UsesAnnotations):
"""
Class provides support for performing jobs that import a history from
an archive.
"""
[docs] def __init__(self, app, job_id):
self.app = app
self.job_id = job_id
self.sa_session = self.app.model.context
[docs] def cleanup_after_job(self):
""" Set history, datasets, and jobs' attributes and clean up archive directory. """
#
# Helper methods.
#
def file_in_dir(file_path, a_dir):
""" Returns true if file is in directory. """
abs_file_path = os.path.abspath(file_path)
return os.path.split(abs_file_path)[0] == a_dir
def get_tag_str(tag, value):
""" Builds a tag string for a tag, value pair. """
if not value:
return tag
else:
return tag + ":" + value
#
# Import history.
#
jiha = self.sa_session.query(model.JobImportHistoryArchive).filter_by(job_id=self.job_id).first()
if jiha:
try:
archive_dir = jiha.archive_dir
archive_dir = os.path.realpath(archive_dir)
user = jiha.job.user
# Bioblend previous to 17.01 exported histories with an extra subdir.
if not os.path.exists(os.path.join(archive_dir, 'history_attrs.txt')):
for d in os.listdir(archive_dir):
if os.path.isdir(os.path.join(archive_dir, d)):
archive_dir = os.path.join(archive_dir, d)
break
#
# Create history.
#
history_attr_file_name = os.path.join(archive_dir, 'history_attrs.txt')
history_attrs = load(open(history_attr_file_name))
# Create history.
new_history = model.History(name='imported from archive: %s' % history_attrs['name'],
user=user)
new_history.importing = True
new_history.hid_counter = history_attrs['hid_counter']
new_history.genome_build = history_attrs['genome_build']
self.sa_session.add(new_history)
jiha.history = new_history
self.sa_session.flush()
# Add annotation, tags.
if user:
self.add_item_annotation(self.sa_session, user, new_history, history_attrs['annotation'])
"""
TODO: figure out to how add tags to item.
for tag, value in history_attrs[ 'tags' ].items():
trans.app.tag_handler.apply_item_tags( trans, trans.user, new_history, get_tag_str( tag, value ) )
"""
#
# Create datasets.
#
datasets_attrs_file_name = os.path.join(archive_dir, 'datasets_attrs.txt')
datasets_attrs = load(open(datasets_attrs_file_name))
provenance_file_name = datasets_attrs_file_name + ".provenance"
if os.path.exists(provenance_file_name):
provenance_attrs = load(open(provenance_file_name))
datasets_attrs += provenance_attrs
# Get counts of how often each dataset file is used; a file can
# be linked to multiple dataset objects (HDAs).
datasets_usage_counts = {}
for dataset_attrs in datasets_attrs:
temp_dataset_file_name = \
os.path.realpath(os.path.join(archive_dir, dataset_attrs['file_name']))
if (temp_dataset_file_name not in datasets_usage_counts):
datasets_usage_counts[temp_dataset_file_name] = 0
datasets_usage_counts[temp_dataset_file_name] += 1
# Create datasets.
for dataset_attrs in datasets_attrs:
metadata = dataset_attrs['metadata']
# Create dataset and HDA.
hda = model.HistoryDatasetAssociation(name=dataset_attrs['name'],
extension=dataset_attrs['extension'],
info=dataset_attrs['info'],
blurb=dataset_attrs['blurb'],
peek=dataset_attrs['peek'],
designation=dataset_attrs['designation'],
visible=dataset_attrs['visible'],
dbkey=metadata['dbkey'],
metadata=metadata,
history=new_history,
create_dataset=True,
sa_session=self.sa_session)
if 'uuid' in dataset_attrs:
hda.dataset.uuid = dataset_attrs["uuid"]
if dataset_attrs.get('exported', True) is False:
hda.state = hda.states.DISCARDED
hda.deleted = True
hda.purged = True
else:
hda.state = hda.states.OK
self.sa_session.add(hda)
self.sa_session.flush()
new_history.add_dataset(hda, genome_build=None)
hda.hid = dataset_attrs['hid'] # Overwrite default hid set when HDA added to history.
# TODO: Is there a way to recover permissions? Is this needed?
# permissions = trans.app.security_agent.history_get_default_permissions( new_history )
# trans.app.security_agent.set_all_dataset_permissions( hda.dataset, permissions )
self.sa_session.flush()
if dataset_attrs.get('exported', True) is True:
# Do security check and move/copy dataset data.
temp_dataset_file_name = \
os.path.realpath(os.path.abspath(os.path.join(archive_dir, dataset_attrs['file_name'])))
if not file_in_dir(temp_dataset_file_name, os.path.join(archive_dir, "datasets")):
raise MalformedContents("Invalid dataset path: %s" % temp_dataset_file_name)
if datasets_usage_counts[temp_dataset_file_name] == 1:
self.app.object_store.update_from_file(hda.dataset, file_name=temp_dataset_file_name, create=True)
# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get('extra_files_path', None)
if dataset_extra_files_path:
try:
file_list = os.listdir(os.path.join(archive_dir, dataset_extra_files_path))
except OSError:
file_list = []
if file_list:
for extra_file in file_list:
self.app.object_store.update_from_file(
hda.dataset, extra_dir='dataset_%s_files' % hda.dataset.id,
alt_name=extra_file, file_name=os.path.join(archive_dir, dataset_extra_files_path, extra_file),
create=True)
else:
datasets_usage_counts[temp_dataset_file_name] -= 1
shutil.copyfile(temp_dataset_file_name, hda.file_name)
hda.dataset.set_total_size() # update the filesize record in the database
# Set tags, annotations.
if user:
self.add_item_annotation(self.sa_session, user, hda, dataset_attrs['annotation'])
# TODO: Set tags.
"""
for tag, value in dataset_attrs[ 'tags' ].items():
trans.app.tag_handler.apply_item_tags( trans, trans.user, hda, get_tag_str( tag, value ) )
self.sa_session.flush()
"""
# Although metadata is set above, need to set metadata to recover BAI for BAMs.
if hda.extension == 'bam':
self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_app(
self.app.datatypes_registry.set_external_metadata_tool, self.app, jiha.job.session_id,
new_history.id, jiha.job.user, incoming={'input1': hda}, overwrite=False
)
#
# Create jobs.
#
# Decode jobs attributes.
def as_hda(obj_dct):
""" Hook to 'decode' an HDA; method uses history and HID to get the HDA represented by
the encoded object. This only works because HDAs are created above. """
if obj_dct.get('__HistoryDatasetAssociation__', False):
return self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=obj_dct['hid']).first()
return obj_dct
jobs_attr_file_name = os.path.join(archive_dir, 'jobs_attrs.txt')
jobs_attrs = load(open(jobs_attr_file_name), object_hook=as_hda)
# Create each job.
for job_attrs in jobs_attrs:
imported_job = model.Job()
imported_job.user = user
# TODO: set session?
# imported_job.session = trans.get_galaxy_session().id
imported_job.history = new_history
imported_job.imported = True
imported_job.tool_id = job_attrs['tool_id']
imported_job.tool_version = job_attrs['tool_version']
imported_job.set_state(job_attrs['state'])
imported_job.info = job_attrs.get('info', None)
imported_job.exit_code = job_attrs.get('exit_code', None)
imported_job.traceback = job_attrs.get('traceback', None)
imported_job.stdout = job_attrs.get('stdout', None)
imported_job.stderr = job_attrs.get('stderr', None)
imported_job.command_line = job_attrs.get('command_line', None)
try:
imported_job.create_time = datetime.datetime.strptime(job_attrs["create_time"], "%Y-%m-%dT%H:%M:%S.%f")
imported_job.update_time = datetime.datetime.strptime(job_attrs["update_time"], "%Y-%m-%dT%H:%M:%S.%f")
except Exception:
pass
self.sa_session.add(imported_job)
self.sa_session.flush()
class HistoryDatasetAssociationIDEncoder(json.JSONEncoder):
""" Custom JSONEncoder for a HistoryDatasetAssociation that encodes an HDA as its ID. """
def default(self, obj):
""" Encode an HDA, default encoding for everything else. """
if isinstance(obj, model.HistoryDatasetAssociation):
return obj.id
return json.JSONEncoder.default(self, obj)
# Set parameters. May be useful to look at metadata.py for creating parameters.
# TODO: there may be a better way to set parameters, e.g.:
# for name, value in tool.params_to_strings( incoming, trans.app ).items():
# job.add_parameter( name, value )
# to make this work, we'd need to flesh out the HDA objects. The code below is
# relatively similar.
for name, value in job_attrs['params'].items():
# Transform parameter values when necessary.
if isinstance(value, model.HistoryDatasetAssociation):
# HDA input: use hid to find input.
input_hda = self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=value.hid).first()
value = input_hda.id
imported_job.add_parameter(name, dumps(value, cls=HistoryDatasetAssociationIDEncoder))
# TODO: Connect jobs to input datasets.
# Connect jobs to output datasets.
for output_hid in job_attrs['output_datasets']:
output_hda = self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=output_hid).first()
if output_hda:
imported_job.add_output_dataset(output_hda.name, output_hda)
# Connect jobs to input datasets.
if 'input_mapping' in job_attrs:
for input_name, input_hid in job_attrs['input_mapping'].items():
input_hda = self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=input_hid).first()
if input_hda:
imported_job.add_input_dataset(input_name, input_hda)
self.sa_session.flush()
# Done importing.
new_history.importing = False
self.sa_session.flush()
# Cleanup.
if os.path.exists(archive_dir):
shutil.rmtree(archive_dir)
except Exception as e:
jiha.job.stderr += "Error cleaning up history import job: %s" % e
self.sa_session.flush()
raise
[docs]class JobExportHistoryArchiveWrapper(UsesAnnotations):
"""
Class provides support for performing jobs that export a history to an
archive.
"""
[docs] def get_history_datasets(self, trans, history):
"""
Returns history's datasets.
"""
query = (trans.sa_session.query(trans.model.HistoryDatasetAssociation)
.filter(trans.model.HistoryDatasetAssociation.history == history)
.join("dataset")
.options(eagerload_all("dataset.actions"))
.order_by(trans.model.HistoryDatasetAssociation.hid)
.filter(trans.model.HistoryDatasetAssociation.deleted == expression.false())
.filter(trans.model.Dataset.purged == expression.false()))
return query.all()
# TODO: should use db_session rather than trans in this method.
[docs] def setup_job(self, trans, jeha, include_hidden=False, include_deleted=False):
""" Perform setup for job to export a history into an archive. Method generates
attribute files for export, sets the corresponding attributes in the jeha
object, and returns a command line for running the job. The command line
includes the command, inputs, and options; it does not include the output
file because it must be set at runtime. """
#
# Helper methods/classes.
#
def get_item_tag_dict(item):
""" Create dictionary of an item's tags. """
tags = {}
for tag in item.tags:
tag_user_tname = to_unicode(tag.user_tname)
tag_user_value = to_unicode(tag.user_value)
tags[tag_user_tname] = tag_user_value
return tags
def prepare_metadata(metadata):
""" Prepare metatdata for exporting. """
for name, value in list(metadata.items()):
# Metadata files are not needed for export because they can be
# regenerated.
if isinstance(value, trans.app.model.MetadataFile):
del metadata[name]
return metadata
class HistoryDatasetAssociationEncoder(json.JSONEncoder):
""" Custom JSONEncoder for a HistoryDatasetAssociation. """
def default(self, obj):
""" Encode an HDA, default encoding for everything else. """
if isinstance(obj, trans.app.model.HistoryDatasetAssociation):
rval = {
"__HistoryDatasetAssociation__": True,
"create_time": obj.create_time.__str__(),
"update_time": obj.update_time.__str__(),
"hid": obj.hid,
"name": to_unicode(obj.name),
"info": to_unicode(obj.info),
"blurb": obj.blurb,
"peek": obj.peek,
"extension": obj.extension,
"metadata": prepare_metadata(dict(obj.metadata.items())),
"parent_id": obj.parent_id,
"designation": obj.designation,
"deleted": obj.deleted,
"visible": obj.visible,
"file_name": obj.file_name,
"uuid": (lambda uuid: str(uuid) if uuid else None)(obj.dataset.uuid),
"annotation": to_unicode(getattr(obj, 'annotation', '')),
"tags": get_item_tag_dict(obj),
"extra_files_path": obj.extra_files_path
}
if not obj.visible and not include_hidden:
rval['exported'] = False
elif obj.deleted and not include_deleted:
rval['exported'] = False
else:
rval['exported'] = True
return rval
return json.JSONEncoder.default(self, obj)
#
# Create attributes/metadata files for export.
#
temp_output_dir = tempfile.mkdtemp()
# Write history attributes to file.
history = jeha.history
history_attrs = {
"create_time": history.create_time.__str__(),
"update_time": history.update_time.__str__(),
"name": to_unicode(history.name),
"hid_counter": history.hid_counter,
"genome_build": history.genome_build,
"annotation": to_unicode(self.get_item_annotation_str(trans.sa_session, history.user, history)),
"tags": get_item_tag_dict(history),
"includes_hidden_datasets": include_hidden,
"includes_deleted_datasets": include_deleted
}
history_attrs_filename = tempfile.NamedTemporaryFile(dir=temp_output_dir).name
history_attrs_out = open(history_attrs_filename, 'w')
history_attrs_out.write(dumps(history_attrs))
history_attrs_out.close()
jeha.history_attrs_filename = history_attrs_filename
# Write datasets' attributes to file.
datasets = self.get_history_datasets(trans, history)
included_datasets = []
datasets_attrs = []
provenance_attrs = []
for dataset in datasets:
dataset.annotation = self.get_item_annotation_str(trans.sa_session, history.user, dataset)
if (not dataset.visible and not include_hidden) or (dataset.deleted and not include_deleted):
provenance_attrs.append(dataset)
else:
datasets_attrs.append(dataset)
included_datasets.append(dataset)
datasets_attrs_filename = tempfile.NamedTemporaryFile(dir=temp_output_dir).name
datasets_attrs_out = open(datasets_attrs_filename, 'w')
datasets_attrs_out.write(dumps(datasets_attrs, cls=HistoryDatasetAssociationEncoder))
datasets_attrs_out.close()
jeha.datasets_attrs_filename = datasets_attrs_filename
provenance_attrs_out = open(datasets_attrs_filename + ".provenance", 'w')
provenance_attrs_out.write(dumps(provenance_attrs, cls=HistoryDatasetAssociationEncoder))
provenance_attrs_out.close()
#
# Write jobs attributes file.
#
# Get all jobs associated with included HDAs.
jobs_dict = {}
for hda in included_datasets:
# Get the associated job, if any. If this hda was copied from another,
# we need to find the job that created the origial hda
job_hda = hda
while job_hda.copied_from_history_dataset_association: # should this check library datasets as well?
job_hda = job_hda.copied_from_history_dataset_association
if not job_hda.creating_job_associations:
# No viable HDA found.
continue
# Get the job object.
job = None
for assoc in job_hda.creating_job_associations:
job = assoc.job
break
if not job:
# No viable job.
continue
jobs_dict[job.id] = job
# Get jobs' attributes.
jobs_attrs = []
for id, job in jobs_dict.items():
job_attrs = {}
job_attrs['tool_id'] = job.tool_id
job_attrs['tool_version'] = job.tool_version
job_attrs['state'] = job.state
job_attrs['info'] = job.info
job_attrs['traceback'] = job.traceback
job_attrs['command_line'] = job.command_line
job_attrs['stderr'] = job.stderr
job_attrs['stdout'] = job.stdout
job_attrs['exit_code'] = job.exit_code
job_attrs['create_time'] = job.create_time.isoformat()
job_attrs['update_time'] = job.update_time.isoformat()
# Get the job's parameters
try:
params_objects = job.get_param_values(trans.app)
except Exception:
# Could not get job params.
continue
params_dict = {}
for name, value in params_objects.items():
params_dict[name] = value
job_attrs['params'] = params_dict
# -- Get input, output datasets. --
input_datasets = []
input_mapping = {}
for assoc in job.input_datasets:
# Optional data inputs will not have a dataset.
if assoc.dataset:
input_datasets.append(assoc.dataset.hid)
input_mapping[assoc.name] = assoc.dataset.hid
job_attrs['input_datasets'] = input_datasets
job_attrs['input_mapping'] = input_mapping
output_datasets = [assoc.dataset.hid for assoc in job.output_datasets]
job_attrs['output_datasets'] = output_datasets
jobs_attrs.append(job_attrs)
jobs_attrs_filename = tempfile.NamedTemporaryFile(dir=temp_output_dir).name
jobs_attrs_out = open(jobs_attrs_filename, 'w')
jobs_attrs_out.write(dumps(jobs_attrs, cls=HistoryDatasetAssociationEncoder))
jobs_attrs_out.close()
jeha.jobs_attrs_filename = jobs_attrs_filename
#
# Create and return command line for running tool.
#
options = ""
if jeha.compressed:
options = "-G"
return "%s %s %s %s" % (options, history_attrs_filename,
datasets_attrs_filename,
jobs_attrs_filename)
[docs] def cleanup_after_job(self, db_session):
""" Remove temporary directory and attribute files generated during setup for this job. """
# Get jeha for job.
jeha = db_session.query(model.JobExportHistoryArchive).filter_by(job_id=self.job_id).first()
if jeha:
for filename in [jeha.history_attrs_filename, jeha.datasets_attrs_filename, jeha.jobs_attrs_filename]:
try:
os.remove(filename)
except Exception as e:
log.debug('Failed to cleanup attributes file (%s): %s' % (filename, e))
temp_dir = os.path.split(jeha.history_attrs_filename)[0]
try:
shutil.rmtree(temp_dir)
except Exception as e:
log.debug('Error deleting directory containing attribute files (%s): %s' % (temp_dir, e))