Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.imp_exp
import datetime
import json
import logging
import os
import shutil
import tempfile
from json import dumps, load
from sqlalchemy.orm import eagerload_all
from sqlalchemy.sql import expression
from galaxy import model
from galaxy.exceptions import MalformedContents
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.web.framework.helpers import to_unicode
log = logging.getLogger(__name__)
[docs]class JobImportHistoryArchiveWrapper(UsesAnnotations):
"""
Class provides support for performing jobs that import a history from
an archive.
"""
[docs] def __init__(self, app, job_id):
self.app = app
self.job_id = job_id
self.sa_session = self.app.model.context
[docs] def cleanup_after_job(self):
""" Set history, datasets, and jobs' attributes and clean up archive directory. """
#
# Helper methods.
#
def file_in_dir(file_path, a_dir):
""" Returns true if file is in directory. """
abs_file_path = os.path.abspath(file_path)
return os.path.split(abs_file_path)[0] == a_dir
def get_tag_str(tag, value):
""" Builds a tag string for a tag, value pair. """
if not value:
return tag
else:
return tag + ":" + value
#
# Import history.
#
jiha = self.sa_session.query(model.JobImportHistoryArchive).filter_by(job_id=self.job_id).first()
if jiha:
try:
archive_dir = jiha.archive_dir
archive_dir = os.path.realpath(archive_dir)
user = jiha.job.user
# Bioblend previous to 17.01 exported histories with an extra subdir.
if not os.path.exists(os.path.join(archive_dir, 'history_attrs.txt')):
for d in os.listdir(archive_dir):
if os.path.isdir(os.path.join(archive_dir, d)):
archive_dir = os.path.join(archive_dir, d)
break
#
# Create history.
#
history_attr_file_name = os.path.join(archive_dir, 'history_attrs.txt')
history_attrs = load(open(history_attr_file_name))
# Create history.
new_history = model.History(name='imported from archive: %s' % history_attrs['name'],
user=user)
new_history.importing = True
new_history.hid_counter = history_attrs['hid_counter']
new_history.genome_build = history_attrs['genome_build']
self.sa_session.add(new_history)
jiha.history = new_history
self.sa_session.flush()
# Add annotation, tags.
if user:
self.add_item_annotation(self.sa_session, user, new_history, history_attrs['annotation'])
"""
TODO: figure out to how add tags to item.
for tag, value in history_attrs[ 'tags' ].items():
trans.app.tag_handler.apply_item_tags( trans, trans.user, new_history, get_tag_str( tag, value ) )
"""
#
# Create datasets.
#
datasets_attrs_file_name = os.path.join(archive_dir, 'datasets_attrs.txt')
datasets_attrs = load(open(datasets_attrs_file_name))
provenance_file_name = datasets_attrs_file_name + ".provenance"
if os.path.exists(provenance_file_name):
provenance_attrs = load(open(provenance_file_name))
datasets_attrs += provenance_attrs
# Get counts of how often each dataset file is used; a file can
# be linked to multiple dataset objects (HDAs).
datasets_usage_counts = {}
for dataset_attrs in datasets_attrs:
temp_dataset_file_name = \
os.path.realpath(os.path.join(archive_dir, dataset_attrs['file_name']))
if (temp_dataset_file_name not in datasets_usage_counts):
datasets_usage_counts[temp_dataset_file_name] = 0
datasets_usage_counts[temp_dataset_file_name] += 1
# Create datasets.
for dataset_attrs in datasets_attrs:
metadata = dataset_attrs['metadata']
# Create dataset and HDA.
hda = model.HistoryDatasetAssociation(name=dataset_attrs['name'],
extension=dataset_attrs['extension'],
info=dataset_attrs['info'],
blurb=dataset_attrs['blurb'],
peek=dataset_attrs['peek'],
designation=dataset_attrs['designation'],
visible=dataset_attrs['visible'],
dbkey=metadata['dbkey'],
metadata=metadata,
history=new_history,
create_dataset=True,
sa_session=self.sa_session)
if 'uuid' in dataset_attrs:
hda.dataset.uuid = dataset_attrs["uuid"]
if dataset_attrs.get('exported', True) is False:
hda.state = hda.states.DISCARDED
hda.deleted = True
hda.purged = True
else:
hda.state = hda.states.OK
self.sa_session.add(hda)
self.sa_session.flush()
new_history.add_dataset(hda, genome_build=None)
hda.hid = dataset_attrs['hid'] # Overwrite default hid set when HDA added to history.
# TODO: Is there a way to recover permissions? Is this needed?
# permissions = trans.app.security_agent.history_get_default_permissions( new_history )
# trans.app.security_agent.set_all_dataset_permissions( hda.dataset, permissions )
self.sa_session.flush()
if dataset_attrs.get('exported', True) is True:
# Do security check and move/copy dataset data.
temp_dataset_file_name = \
os.path.realpath(os.path.abspath(os.path.join(archive_dir, dataset_attrs['file_name'])))
if not file_in_dir(temp_dataset_file_name, os.path.join(archive_dir, "datasets")):
raise MalformedContents("Invalid dataset path: %s" % temp_dataset_file_name)
if datasets_usage_counts[temp_dataset_file_name] == 1:
self.app.object_store.update_from_file(hda.dataset, file_name=temp_dataset_file_name, create=True)
# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get('extra_files_path', None)
if dataset_extra_files_path:
try:
file_list = os.listdir(os.path.join(archive_dir, dataset_extra_files_path))
except OSError:
file_list = []
if file_list:
for extra_file in file_list:
self.app.object_store.update_from_file(
hda.dataset, extra_dir='dataset_%s_files' % hda.dataset.id,
alt_name=extra_file, file_name=os.path.join(archive_dir, dataset_extra_files_path, extra_file),
create=True)
else:
datasets_usage_counts[temp_dataset_file_name] -= 1
shutil.copyfile(temp_dataset_file_name, hda.file_name)
hda.dataset.set_total_size() # update the filesize record in the database
# Set tags, annotations.
if user:
self.add_item_annotation(self.sa_session, user, hda, dataset_attrs['annotation'])
# TODO: Set tags.
"""
for tag, value in dataset_attrs[ 'tags' ].items():
trans.app.tag_handler.apply_item_tags( trans, trans.user, hda, get_tag_str( tag, value ) )
self.sa_session.flush()
"""
# Although metadata is set above, need to set metadata to recover BAI for BAMs.
if hda.extension == 'bam':
self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_app(
self.app.datatypes_registry.set_external_metadata_tool, self.app, jiha.job.session_id,
new_history.id, jiha.job.user, incoming={'input1': hda}, overwrite=False
)
#
# Create jobs.
#
# Decode jobs attributes.
def as_hda(obj_dct):
""" Hook to 'decode' an HDA; method uses history and HID to get the HDA represented by
the encoded object. This only works because HDAs are created above. """
if obj_dct.get('__HistoryDatasetAssociation__', False):
return self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=obj_dct['hid']).first()
return obj_dct
jobs_attr_file_name = os.path.join(archive_dir, 'jobs_attrs.txt')
jobs_attrs = load(open(jobs_attr_file_name), object_hook=as_hda)
# Create each job.
for job_attrs in jobs_attrs:
imported_job = model.Job()
imported_job.user = user
# TODO: set session?
# imported_job.session = trans.get_galaxy_session().id
imported_job.history = new_history
imported_job.imported = True
imported_job.tool_id = job_attrs['tool_id']
imported_job.tool_version = job_attrs['tool_version']
imported_job.set_state(job_attrs['state'])
imported_job.info = job_attrs.get('info', None)
imported_job.exit_code = job_attrs.get('exit_code', None)
imported_job.traceback = job_attrs.get('traceback', None)
imported_job.stdout = job_attrs.get('stdout', None)
imported_job.stderr = job_attrs.get('stderr', None)
imported_job.command_line = job_attrs.get('command_line', None)
try:
imported_job.create_time = datetime.datetime.strptime(job_attrs["create_time"], "%Y-%m-%dT%H:%M:%S.%f")
imported_job.update_time = datetime.datetime.strptime(job_attrs["update_time"], "%Y-%m-%dT%H:%M:%S.%f")
except Exception:
pass
self.sa_session.add(imported_job)
self.sa_session.flush()
class HistoryDatasetAssociationIDEncoder(json.JSONEncoder):
""" Custom JSONEncoder for a HistoryDatasetAssociation that encodes an HDA as its ID. """
def default(self, obj):
""" Encode an HDA, default encoding for everything else. """
if isinstance(obj, model.HistoryDatasetAssociation):
return obj.id
return json.JSONEncoder.default(self, obj)
# Set parameters. May be useful to look at metadata.py for creating parameters.
# TODO: there may be a better way to set parameters, e.g.:
# for name, value in tool.params_to_strings( incoming, trans.app ).items():
# job.add_parameter( name, value )
# to make this work, we'd need to flesh out the HDA objects. The code below is
# relatively similar.
for name, value in job_attrs['params'].items():
# Transform parameter values when necessary.
if isinstance(value, model.HistoryDatasetAssociation):
# HDA input: use hid to find input.
input_hda = self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=value.hid).first()
value = input_hda.id
imported_job.add_parameter(name, dumps(value, cls=HistoryDatasetAssociationIDEncoder))
# TODO: Connect jobs to input datasets.
# Connect jobs to output datasets.
for output_hid in job_attrs['output_datasets']:
output_hda = self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=output_hid).first()
if output_hda:
imported_job.add_output_dataset(output_hda.name, output_hda)
# Connect jobs to input datasets.
if 'input_mapping' in job_attrs:
for input_name, input_hid in job_attrs['input_mapping'].items():
input_hda = self.sa_session.query(model.HistoryDatasetAssociation) \
.filter_by(history=new_history, hid=input_hid).first()
if input_hda:
imported_job.add_input_dataset(input_name, input_hda)
self.sa_session.flush()
# Done importing.
new_history.importing = False
self.sa_session.flush()
# Cleanup.
if os.path.exists(archive_dir):
shutil.rmtree(archive_dir)
except Exception as e:
jiha.job.stderr += "Error cleaning up history import job: %s" % e
self.sa_session.flush()
raise
[docs]class JobExportHistoryArchiveWrapper(UsesAnnotations):
"""
Class provides support for performing jobs that export a history to an
archive.
"""
[docs] def get_history_datasets(self, trans, history):
"""
Returns history's datasets.
"""
query = (trans.sa_session.query(trans.model.HistoryDatasetAssociation)
.filter(trans.model.HistoryDatasetAssociation.history == history)
.join("dataset")
.options(eagerload_all("dataset.actions"))
.order_by(trans.model.HistoryDatasetAssociation.hid)
.filter(trans.model.HistoryDatasetAssociation.deleted == expression.false())
.filter(trans.model.Dataset.purged == expression.false()))
return query.all()
# TODO: should use db_session rather than trans in this method.
[docs] def setup_job(self, trans, jeha, include_hidden=False, include_deleted=False):
""" Perform setup for job to export a history into an archive. Method generates
attribute files for export, sets the corresponding attributes in the jeha
object, and returns a command line for running the job. The command line
includes the command, inputs, and options; it does not include the output
file because it must be set at runtime. """
#
# Helper methods/classes.
#
def get_item_tag_dict(item):
""" Create dictionary of an item's tags. """
tags = {}
for tag in item.tags:
tag_user_tname = to_unicode(tag.user_tname)
tag_user_value = to_unicode(tag.user_value)
tags[tag_user_tname] = tag_user_value
return tags
def prepare_metadata(metadata):
""" Prepare metatdata for exporting. """
for name, value in list(metadata.items()):
# Metadata files are not needed for export because they can be
# regenerated.
if isinstance(value, trans.app.model.MetadataFile):
del metadata[name]
return metadata
class HistoryDatasetAssociationEncoder(json.JSONEncoder):
""" Custom JSONEncoder for a HistoryDatasetAssociation. """
def default(self, obj):
""" Encode an HDA, default encoding for everything else. """
if isinstance(obj, trans.app.model.HistoryDatasetAssociation):
rval = {
"__HistoryDatasetAssociation__": True,
"create_time": obj.create_time.__str__(),
"update_time": obj.update_time.__str__(),
"hid": obj.hid,
"name": to_unicode(obj.name),
"info": to_unicode(obj.info),
"blurb": obj.blurb,
"peek": obj.peek,
"extension": obj.extension,
"metadata": prepare_metadata(dict(obj.metadata.items())),
"parent_id": obj.parent_id,
"designation": obj.designation,
"deleted": obj.deleted,
"visible": obj.visible,
"file_name": obj.file_name,
"uuid": (lambda uuid: str(uuid) if uuid else None)(obj.dataset.uuid),
"annotation": to_unicode(getattr(obj, 'annotation', '')),
"tags": get_item_tag_dict(obj),
"extra_files_path": obj.extra_files_path
}
if not obj.visible and not include_hidden:
rval['exported'] = False
elif obj.deleted and not include_deleted:
rval['exported'] = False
else:
rval['exported'] = True
return rval
return json.JSONEncoder.default(self, obj)
#
# Create attributes/metadata files for export.
#
temp_output_dir = tempfile.mkdtemp()
# Write history attributes to file.
history = jeha.history
history_attrs = {
"create_time": history.create_time.__str__(),
"update_time": history.update_time.__str__(),
"name": to_unicode(history.name),
"hid_counter": history.hid_counter,
"genome_build": history.genome_build,
"annotation": to_unicode(self.get_item_annotation_str(trans.sa_session, history.user, history)),
"tags": get_item_tag_dict(history),
"includes_hidden_datasets": include_hidden,
"includes_deleted_datasets": include_deleted
}
history_attrs_filename = tempfile.NamedTemporaryFile(dir=temp_output_dir).name
history_attrs_out = open(history_attrs_filename, 'w')
history_attrs_out.write(dumps(history_attrs))
history_attrs_out.close()
jeha.history_attrs_filename = history_attrs_filename
# Write datasets' attributes to file.
datasets = self.get_history_datasets(trans, history)
included_datasets = []
datasets_attrs = []
provenance_attrs = []
for dataset in datasets:
dataset.annotation = self.get_item_annotation_str(trans.sa_session, history.user, dataset)
if (not dataset.visible and not include_hidden) or (dataset.deleted and not include_deleted):
provenance_attrs.append(dataset)
else:
datasets_attrs.append(dataset)
included_datasets.append(dataset)
datasets_attrs_filename = tempfile.NamedTemporaryFile(dir=temp_output_dir).name
datasets_attrs_out = open(datasets_attrs_filename, 'w')
datasets_attrs_out.write(dumps(datasets_attrs, cls=HistoryDatasetAssociationEncoder))
datasets_attrs_out.close()
jeha.datasets_attrs_filename = datasets_attrs_filename
provenance_attrs_out = open(datasets_attrs_filename + ".provenance", 'w')
provenance_attrs_out.write(dumps(provenance_attrs, cls=HistoryDatasetAssociationEncoder))
provenance_attrs_out.close()
#
# Write jobs attributes file.
#
# Get all jobs associated with included HDAs.
jobs_dict = {}
for hda in included_datasets:
# Get the associated job, if any. If this hda was copied from another,
# we need to find the job that created the origial hda
job_hda = hda
while job_hda.copied_from_history_dataset_association: # should this check library datasets as well?
job_hda = job_hda.copied_from_history_dataset_association
if not job_hda.creating_job_associations:
# No viable HDA found.
continue
# Get the job object.
job = None
for assoc in job_hda.creating_job_associations:
job = assoc.job
break
if not job:
# No viable job.
continue
jobs_dict[job.id] = job
# Get jobs' attributes.
jobs_attrs = []
for id, job in jobs_dict.items():
job_attrs = {}
job_attrs['tool_id'] = job.tool_id
job_attrs['tool_version'] = job.tool_version
job_attrs['state'] = job.state
job_attrs['info'] = job.info
job_attrs['traceback'] = job.traceback
job_attrs['command_line'] = job.command_line
job_attrs['stderr'] = job.stderr
job_attrs['stdout'] = job.stdout
job_attrs['exit_code'] = job.exit_code
job_attrs['create_time'] = job.create_time.isoformat()
job_attrs['update_time'] = job.update_time.isoformat()
# Get the job's parameters
try:
params_objects = job.get_param_values(trans.app)
except Exception:
# Could not get job params.
continue
params_dict = {}
for name, value in params_objects.items():
params_dict[name] = value
job_attrs['params'] = params_dict
# -- Get input, output datasets. --
input_datasets = []
input_mapping = {}
for assoc in job.input_datasets:
# Optional data inputs will not have a dataset.
if assoc.dataset:
input_datasets.append(assoc.dataset.hid)
input_mapping[assoc.name] = assoc.dataset.hid
job_attrs['input_datasets'] = input_datasets
job_attrs['input_mapping'] = input_mapping
output_datasets = [assoc.dataset.hid for assoc in job.output_datasets]
job_attrs['output_datasets'] = output_datasets
jobs_attrs.append(job_attrs)
jobs_attrs_filename = tempfile.NamedTemporaryFile(dir=temp_output_dir).name
jobs_attrs_out = open(jobs_attrs_filename, 'w')
jobs_attrs_out.write(dumps(jobs_attrs, cls=HistoryDatasetAssociationEncoder))
jobs_attrs_out.close()
jeha.jobs_attrs_filename = jobs_attrs_filename
#
# Create and return command line for running tool.
#
options = ""
if jeha.compressed:
options = "-G"
return "%s %s %s %s" % (options, history_attrs_filename,
datasets_attrs_filename,
jobs_attrs_filename)
[docs] def cleanup_after_job(self, db_session):
""" Remove temporary directory and attribute files generated during setup for this job. """
# Get jeha for job.
jeha = db_session.query(model.JobExportHistoryArchive).filter_by(job_id=self.job_id).first()
if jeha:
for filename in [jeha.history_attrs_filename, jeha.datasets_attrs_filename, jeha.jobs_attrs_filename]:
try:
os.remove(filename)
except Exception as e:
log.debug('Failed to cleanup attributes file (%s): %s' % (filename, e))
temp_dir = os.path.split(jeha.history_attrs_filename)[0]
try:
shutil.rmtree(temp_dir)
except Exception as e:
log.debug('Error deleting directory containing attribute files (%s): %s' % (temp_dir, e))