Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.isa

"""
ISA datatype

See https://github.com/ISA-tools
"""

import json
import logging
import os
import os.path
import re
import shutil
import tempfile

# Imports isatab after turning off warnings inside logger settings to avoid pandas warning making uploads fail.
logging.getLogger("isatools.isatab").setLevel(logging.ERROR)
from isatools import (
    isajson,
    isatab_meta
)
from markupsafe import escape

from galaxy import util
from galaxy.datatypes import data
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.sanitize_html import sanitize_html

# CONSTANTS {{{1
################################################################

# Main files regex
JSON_FILE_REGEX = re.compile(r"^.*\.json$", flags=re.IGNORECASE)
INVESTIGATION_FILE_REGEX = re.compile(r"^i_\w+\.txt$", flags=re.IGNORECASE)

# The name of the ISA archive (compressed file) as saved inside Galaxy
ISA_ARCHIVE_NAME = "archive"

# Set max number of lines of the history peek
_MAX_LINES_HISTORY_PEEK = 11

# Configure logger {{{1
################################################################

logger = logging.getLogger(__name__)

# Function for opening correctly a CSV file for csv.reader() for both Python 2 and 3 {{{1
################################################################


# ISA class {{{1
################################################################

class _Isa(data.Data):
    """ Base class for implementing ISA datatypes """
    composite_type = 'auto_primary_file'
    is_binary = True
    _main_file_regex = None

    # Make investigation instance {{{2
    ################################################################

    def _make_investigation_instance(self, filename):
        raise NotImplementedError()

    # Constructor {{{2
    ################################################################

    def __init__(self, main_file_regex, **kwd):
        super().__init__(**kwd)
        self._main_file_regex = main_file_regex

        # Add the archive file as the only composite file
        self.add_composite_file(ISA_ARCHIVE_NAME, is_binary=True, optional=True)

    # Get ISA folder path {{{2
    ################################################################

    def _get_isa_folder_path(self, dataset):
        isa_folder = dataset.extra_files_path
        if not isa_folder:
            raise Exception('Unvalid dataset object, or no extra files path found for this dataset.')
        return isa_folder

    # Get main file {{{2
    ################################################################

    def _get_main_file(self, dataset):
        """Get the main file of the ISA archive. Either the investigation file i_*.txt for ISA-Tab, or the JSON file for ISA-JSON."""

        main_file = None
        isa_folder = self._get_isa_folder_path(dataset)

        if os.path.exists(isa_folder):

            # Get ISA archive older
            isa_files = os.listdir(isa_folder)

            # Try to find main file
            main_file = self._find_main_file_in_archive(isa_files)

            if main_file is None:
                raise Exception('Invalid ISA archive. No main file found.')

            # Make full path
            main_file = os.path.join(isa_folder, main_file)

        return main_file

    # Get investigation {{{2
    ################################################################

    def _get_investigation(self, dataset):
        """Create a contained instance specific to the exact ISA type (Tab or Json).
           We will use it to parse and access information from the archive."""

        investigation = None
        main_file = self._get_main_file(dataset)
        if main_file is not None:
            investigation = self._make_investigation_instance(main_file)

        return investigation

    # Find main file in archive {{{2
    ################################################################

    def _find_main_file_in_archive(self, files_list):
        """Find the main file inside the ISA archive."""

        found_file = None

        for f in files_list:
            match = self._main_file_regex.match(f)
            if match:
                if found_file is None:
                    found_file = match.group()
                else:
                    raise Exception('More than one file match the pattern "', str(self._main_file_regex), '" to identify the investigation file')

        return found_file

    # Set peek {{{2
    ################################################################

    def set_peek(self, dataset):
        """Set the peek and blurb text. Get first lines of the main file and set it as the peek."""

        main_file = self._get_main_file(dataset)

        if main_file is None:
            raise RuntimeError("Unable to find the main file within the 'files_path' folder")

        # Read first lines of main file
        with open(main_file, encoding='utf-8') as f:
            data = []
            for line in f:
                if len(data) < _MAX_LINES_HISTORY_PEEK:
                    data.append(line)
                else:
                    break
            if not dataset.dataset.purged and data:
                dataset.peek = json.dumps({"data": data})
                dataset.blurb = 'data'
            else:
                dataset.peek = 'file does not exist'
                dataset.blurb = 'file purged from disk'

    # Display peek {{{2
    ################################################################

    def display_peek(self, dataset):
        """Create the HTML table used for displaying peek, from the peek text found by set_peek() method."""

        out = ['<table cellspacing="0" cellpadding="3">']
        try:
            if not dataset.peek:
                dataset.set_peek()
            json_data = json.loads(dataset.peek)
            for line in json_data["data"]:
                line = line.strip()
                if not line:
                    continue
                out.append(f"<tr><td>{escape(util.unicodify(line, 'utf-8'))}</td></tr>")
            out.append('</table>')
            out = "".join(out)
        except Exception as exc:
            out = f"Can't create peek: {util.unicodify(exc)}"
        return out

    # Generate primary file {{{2
    ################################################################

    def generate_primary_file(self, dataset=None):
        """Generate the primary file. It is an HTML file containing description of the composite dataset
           as well as a list of the composite files that it contains."""

        if dataset:
            rval = ['<html><head><title>ISA Dataset </title></head><p/>']
            if hasattr(dataset, "extra_files_path"):
                rval.append('<div>ISA Dataset composed of the following files:<p/><ul>')
                for cmp_file in os.listdir(dataset.extra_files_path):
                    rval.append(f'<li><a href="{cmp_file}" type="text/plain">{escape(cmp_file)}</a></li>')
                rval.append('</ul></div></html>')
            else:
                rval.append('<div>ISA Dataset is empty!<p/><ul>')
            return "\n".join(rval)
        return "<div>No dataset available</div>"

    # Dataset content needs grooming {{{2
    ################################################################

    def dataset_content_needs_grooming(self, file_name):
        """This function is called on an output dataset file after the content is initially generated."""
        return os.path.basename(file_name) == ISA_ARCHIVE_NAME

    # Groom dataset content {{{2
    ################################################################

    def groom_dataset_content(self, file_name):
        """This method is called by Galaxy to extract files contained in a composite data type."""
        # XXX Is the right place to extract files? Should this step not be a cleaning step instead?
        # Could extracting be done earlier and composite files declared as files contained inside the archive
        # instead of the archive itself?

        # extract basename and folder of the current file whose content has to be groomed
        basename = os.path.basename(file_name)
        output_path = os.path.dirname(file_name)
        # extract archive if the file corresponds to the ISA archive
        if basename == ISA_ARCHIVE_NAME:
            # perform extraction
            # For some ZIP files CompressedFile::extract() extract the file inside <output_folder>/<file_name> instead of outputing it inside <output_folder>. So we first create a temporary folder, extract inside it, and move content to final destination.
            temp_folder = tempfile.mkdtemp()
            CompressedFile(file_name).extract(temp_folder)
            shutil.rmtree(output_path)
            extracted_files = os.listdir(temp_folder)
            logger.debug(' '.join(extracted_files))
            if len(extracted_files) == 0:
                os.makedirs(output_path)
                shutil.rmtree(temp_folder)
            elif len(extracted_files) == 1 and os.path.isdir(os.path.join(temp_folder, extracted_files[0])):
                shutil.move(os.path.join(temp_folder, extracted_files[0]), output_path)
                shutil.rmtree(temp_folder)
            else:
                shutil.move(temp_folder, output_path)

    # Display data {{{2
    ################################################################

    def display_data(self, trans, dataset, preview=False, filename=None, to_ext=None, offset=None, ck_size=None, **kwd):
        """Downloads the ISA dataset if `preview` is `False`;
           if `preview` is `True`, it returns a preview of the ISA dataset as a HTML page.
           The preview is triggered when user clicks on the eye icon of the composite dataset."""

        headers = kwd.get("headers", {})
        # if it is not required a preview use the default behaviour of `display_data`
        if not preview:
            return super().display_data(trans, dataset, preview, filename, to_ext, **kwd)

        # prepare the preview of the ISA dataset
        investigation = self._get_investigation(dataset)
        if investigation is None:
            html = """<html><header><title>Error while reading ISA archive.</title></header>
                   <body>
                        <h1>An error occurred while reading content of ISA archive.</h1>
                        <p>If you have tried to load your archive with the uploader by selecting isa-tab as composite data type, then try to load it again with isa-json instead. Conversely, if you have tried to load your archive with the uploader by selecting isa-json as composite data type, then try isa-tab instead.</p>
                        <p>You may also try to look into your zip file in order to find out if this is a proper ISA archive. If you see a file i_Investigation.txt inside, then it is an ISA-Tab archive. If you see a file with extension .json inside, then it is an ISA-JSON archive. If you see nothing like that, then either your ISA archive is corrupted, or it is not an ISA archive.</p>
                   </body></html>"""
        else:
            html = '<html><body>'
            html += f'<h1>{investigation.title} {investigation.identifier}</h1>'

            # Loop on all studies
            for study in investigation.studies:
                html += f'<h2>Study {study.identifier}</h2>'
                html += f'<h3>{study.title}</h3>'
                html += f'<p>{study.description}</p>'
                html += f'<p>Submitted the {study.submission_date}</p>'
                html += f'<p>Released on {study.public_release_date}</p>'

                html += f"<p>Experimental factors used: {', '.join(x.name for x in study.factors)}</p>"

                # Loop on all assays of this study
                for assay in study.assays:
                    html += f'<h3>Assay {assay.filename}</h3>'
                    html += f'<p>Measurement type: {assay.measurement_type.term}</p>'  # OntologyAnnotation
                    html += f'<p>Technology type: {assay.technology_type.term}</p>'    # OntologyAnnotation
                    html += f'<p>Technology platform: {assay.technology_platform}</p>'
                    if assay.data_files is not None:
                        html += '<p>Data files:</p>'
                        html += '<ul>'
                        for data_file in assay.data_files:
                            if data_file.filename != '':
                                html += f"<li>{escape(util.unicodify(str(data_file.filename), 'utf-8'))} - {escape(util.unicodify(str(data_file.label), 'utf-8'))}</li>"
                        html += '</ul>'

            html += '</body></html>'

        # Set mime type
        mime = 'text/html'
        self._clean_and_set_mime_type(trans, mime, headers)

        return sanitize_html(html).encode('utf-8'), headers


# ISA-Tab class {{{1
################################################################

[docs]class IsaTab(_Isa): file_ext = "isa-tab" # Constructor {{{2 ################################################################
[docs] def __init__(self, **kwd): super().__init__(main_file_regex=INVESTIGATION_FILE_REGEX, **kwd)
# Make investigation instance {{{2 ################################################################ def _make_investigation_instance(self, filename): # Parse ISA-Tab investigation file parser = isatab_meta.InvestigationParser() isa_dir = os.path.dirname(filename) with open(filename, newline='', encoding='utf8') as fp: parser.parse(fp) for study in parser.isa.studies: s_parser = isatab_meta.LazyStudySampleTableParser(parser.isa) s_parser.parse(os.path.join(isa_dir, study.filename)) for assay in study.assays: a_parser = isatab_meta.LazyAssayTableParser(parser.isa) a_parser.parse(os.path.join(isa_dir, assay.filename)) isa = parser.isa return isa
# ISA-JSON class {{{1 ################################################################
[docs]class IsaJson(_Isa): file_ext = "isa-json" # Constructor {{{2 ################################################################
[docs] def __init__(self, **kwd): super().__init__(main_file_regex=JSON_FILE_REGEX, **kwd)
# Make investigation instance {{{2 ################################################################ def _make_investigation_instance(self, filename): # Parse JSON file with open(filename, newline='', encoding='utf8') as fp: isa = isajson.load(fp) return isa