Source code for galaxy.datatypes.molecules

import logging
import os
import re

from galaxy.datatypes import metadata
from galaxy.datatypes.binary import Binary
from galaxy.datatypes.data import (
    get_file_peek,
    Text,
)
from galaxy.datatypes.metadata import MetadataElement
from galaxy.datatypes.sniff import (
    build_sniff_from_prefix,
    get_headers,
    iter_headers
)
from galaxy.datatypes.tabular import Tabular
from galaxy.datatypes.util.generic_util import count_special_lines
from galaxy.datatypes.xml import GenericXml
from galaxy.util import (
    commands,
    unicodify
)

log = logging.getLogger(__name__)


[docs]def count_lines(filename, non_empty=False): """ counting the number of lines from the 'filename' file """ if non_empty: cmd = ['grep', '-cve', r'^\s*$', filename] else: cmd = ['wc', '-l', filename] try: out = commands.execute(cmd) except commands.CommandLineException as e: log.error(unicodify(e)) return 0 return int(out.split()[0])
[docs]class GenericMolFile(Text): """ Abstract class for most of the molecule files. """ MetadataElement(name="number_of_molecules", default=0, desc="Number of molecules", readonly=True, visible=True, optional=True, no_value=0)
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: if (dataset.metadata.number_of_molecules == 1): dataset.blurb = "1 molecule" else: dataset.blurb = f"{dataset.metadata.number_of_molecules} molecules" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs] def get_mime(self): return 'text/plain'
[docs]class MOL(GenericMolFile): file_ext = "mol"
[docs] def set_meta(self, dataset, **kwd): """ Set the number molecules, in the case of MOL its always one. """ dataset.metadata.number_of_molecules = 1
[docs]@build_sniff_from_prefix class SDF(GenericMolFile): file_ext = "sdf"
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a SDF2 file. An SDfile (structure-data file) can contain multiple compounds. Each compound starts with a block in V2000 or V3000 molfile format, which ends with a line equal to 'M END'. This is followed by a non-structural data block, which ends with a line equal to '$$$$'. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('drugbank_drugs.sdf') >>> SDF().sniff(fname) True >>> fname = get_test_fname('github88.v3k.sdf') >>> SDF().sniff(fname) True >>> fname = get_test_fname('chebi_57262.v3k.mol') >>> SDF().sniff(fname) False """ m_end_found = False limit = 10000 idx = 0 for line in file_prefix.line_iterator(): idx += 1 line = line.rstrip('\n\r') if idx < 4: continue elif idx == 4: if len(line) != 39 or not(line.endswith(' V2000') or line.endswith(' V3000')): return False elif not m_end_found: if line == 'M END': m_end_found = True elif line == '$$$$': return True if idx == limit: break return False
[docs] def set_meta(self, dataset, **kwd): """ Set the number of molecules in dataset. """ dataset.metadata.number_of_molecules = count_special_lines(r"^\$\$\$\$$", dataset.file_name)
[docs] @classmethod def split(cls, input_datasets, subdir_generator_function, split_params): """ Split the input files by molecule records. """ if split_params is None: return None if len(input_datasets) > 1: raise Exception("SD-file splitting does not support multiple files") input_files = [ds.file_name for ds in input_datasets] chunk_size = None if split_params['split_mode'] == 'number_of_parts': raise Exception(f"Split mode \"{split_params['split_mode']}\" is currently not implemented for SD-files.") elif split_params['split_mode'] == 'to_size': chunk_size = int(split_params['split_size']) else: raise Exception(f"Unsupported split mode {split_params['split_mode']}") def _read_sdf_records(filename): lines = [] with open(filename) as handle: for line in handle: lines.append(line) if line.startswith("$$$$"): yield lines lines = [] def _write_part_sdf_file(accumulated_lines): part_dir = subdir_generator_function() part_path = os.path.join(part_dir, os.path.basename(input_files[0])) with open(part_path, 'w') as part_file: part_file.writelines(accumulated_lines) try: sdf_records = _read_sdf_records(input_files[0]) sdf_lines_accumulated = [] for counter, sdf_record in enumerate(sdf_records, start=1): sdf_lines_accumulated.extend(sdf_record) if counter % chunk_size == 0: _write_part_sdf_file(sdf_lines_accumulated) sdf_lines_accumulated = [] if sdf_lines_accumulated: _write_part_sdf_file(sdf_lines_accumulated) except Exception as e: log.error('Unable to split files: %s', unicodify(e)) raise
[docs]@build_sniff_from_prefix class MOL2(GenericMolFile): file_ext = "mol2"
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a MOL2 file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('drugbank_drugs.mol2') >>> MOL2().sniff(fname) True >>> fname = get_test_fname('drugbank_drugs.cml') >>> MOL2().sniff(fname) False """ limit = 60 idx = 0 for line in file_prefix.line_iterator(): line = line.rstrip('\n\r') if line == '@<TRIPOS>MOLECULE': return True idx += 1 if idx == limit: break return False
[docs] def set_meta(self, dataset, **kwd): """ Set the number of lines of data in dataset. """ dataset.metadata.number_of_molecules = count_special_lines("@<TRIPOS>MOLECULE", dataset.file_name)
[docs] @classmethod def split(cls, input_datasets, subdir_generator_function, split_params): """ Split the input files by molecule records. """ if split_params is None: return None if len(input_datasets) > 1: raise Exception("MOL2-file splitting does not support multiple files") input_files = [ds.file_name for ds in input_datasets] chunk_size = None if split_params['split_mode'] == 'number_of_parts': raise Exception(f"Split mode \"{split_params['split_mode']}\" is currently not implemented for MOL2-files.") elif split_params['split_mode'] == 'to_size': chunk_size = int(split_params['split_size']) else: raise Exception(f"Unsupported split mode {split_params['split_mode']}") def _read_mol2_records(filename): lines = [] start = True with open(filename) as handle: for line in handle: if line.startswith("@<TRIPOS>MOLECULE"): if start: start = False else: yield lines lines = [] lines.append(line) def _write_part_mol2_file(accumulated_lines): part_dir = subdir_generator_function() part_path = os.path.join(part_dir, os.path.basename(input_files[0])) with open(part_path, 'w') as part_file: part_file.writelines(accumulated_lines) try: mol2_records = _read_mol2_records(input_files[0]) mol2_lines_accumulated = [] for counter, mol2_record in enumerate(mol2_records, start=1): mol2_lines_accumulated.extend(mol2_record) if counter % chunk_size == 0: _write_part_mol2_file(mol2_lines_accumulated) mol2_lines_accumulated = [] if mol2_lines_accumulated: _write_part_mol2_file(mol2_lines_accumulated) except Exception as e: log.error('Unable to split files: %s', unicodify(e)) raise
[docs]@build_sniff_from_prefix class FPS(GenericMolFile): """ chemfp fingerprint file: http://code.google.com/p/chem-fingerprints/wiki/FPS """ file_ext = "fps"
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a FPS file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('q.fps') >>> FPS().sniff(fname) True >>> fname = get_test_fname('drugbank_drugs.cml') >>> FPS().sniff(fname) False """ header = get_headers(file_prefix, sep='\t', count=1) if header[0][0].strip() == '#FPS1': return True else: return False
[docs] def set_meta(self, dataset, **kwd): """ Set the number of lines of data in dataset. """ dataset.metadata.number_of_molecules = count_special_lines('^#', dataset.file_name, invert=True)
[docs] @classmethod def split(cls, input_datasets, subdir_generator_function, split_params): """ Split the input files by fingerprint records. """ if split_params is None: return None if len(input_datasets) > 1: raise Exception("FPS-file splitting does not support multiple files") input_files = [ds.file_name for ds in input_datasets] chunk_size = None if split_params['split_mode'] == 'number_of_parts': raise Exception(f"Split mode \"{split_params['split_mode']}\" is currently not implemented for MOL2-files.") elif split_params['split_mode'] == 'to_size': chunk_size = int(split_params['split_size']) else: raise Exception(f"Unsupported split mode {split_params['split_mode']}") def _write_part_fingerprint_file(accumulated_lines): part_dir = subdir_generator_function() part_path = os.path.join(part_dir, os.path.basename(input_files[0])) with open(part_path, 'w') as part_file: part_file.writelines(accumulated_lines) try: header_lines = [] lines_accumulated = [] fingerprint_counter = 0 for line in open(input_files[0]): if not line.strip(): continue if line.startswith('#'): header_lines.append(line) else: fingerprint_counter += 1 lines_accumulated.append(line) if fingerprint_counter != 0 and fingerprint_counter % chunk_size == 0: _write_part_fingerprint_file(header_lines + lines_accumulated) lines_accumulated = [] if lines_accumulated: _write_part_fingerprint_file(header_lines + lines_accumulated) except Exception as e: log.error('Unable to split files: %s', unicodify(e)) raise
[docs] @staticmethod def merge(split_files, output_file): """ Merging fps files requires merging the header manually. We take the header from the first file. """ if len(split_files) == 1: # For one file only, use base class method (move/copy) return Text.merge(split_files, output_file) if not split_files: raise ValueError("No fps files given, %r, to merge into %s" % (split_files, output_file)) with open(output_file, "w") as out: first = True for filename in split_files: with open(filename) as handle: for line in handle: if line.startswith('#'): if first: out.write(line) else: # line is no header and not a comment, we assume the first header is written to out and we set 'first' to False first = False out.write(line)
[docs]class OBFS(Binary): """OpenBabel Fastsearch format (fs).""" file_ext = 'obfs' composite_type = 'basic' MetadataElement(name="base_name", default='OpenBabel Fastsearch Index', readonly=True, visible=True, optional=True,)
[docs] def __init__(self, **kwd): """ A Fastsearch Index consists of a binary file with the fingerprints and a pointer the actual molecule file. """ super().__init__(**kwd) self.add_composite_file('molecule.fs', is_binary=True, description='OpenBabel Fastsearch Index') self.add_composite_file('molecule.sdf', optional=True, is_binary=False, description='Molecule File') self.add_composite_file('molecule.smi', optional=True, is_binary=False, description='Molecule File') self.add_composite_file('molecule.inchi', optional=True, is_binary=False, description='Molecule File') self.add_composite_file('molecule.mol2', optional=True, is_binary=False, description='Molecule File') self.add_composite_file('molecule.cml', optional=True, is_binary=False, description='Molecule File')
[docs] def set_peek(self, dataset, is_multi_byte=False): """Set the peek and blurb text.""" if not dataset.dataset.purged: dataset.peek = "OpenBabel Fastsearch Index" dataset.blurb = "OpenBabel Fastsearch Index" else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disk"
[docs] def display_peek(self, dataset): """Create HTML content, used for displaying peek.""" try: return dataset.peek except Exception: return "OpenBabel Fastsearch Index"
[docs] def get_mime(self): """Returns the mime type of the datatype (pretend it is text for peek)""" return 'text/plain'
[docs] def merge(split_files, output_file, extra_merge_args): """Merging Fastsearch indices is not supported.""" raise NotImplementedError("Merging Fastsearch indices is not supported.")
[docs] def split(cls, input_datasets, subdir_generator_function, split_params): """Splitting Fastsearch indices is not supported.""" if split_params is None: return None raise NotImplementedError("Splitting Fastsearch indices is not possible.")
[docs]class DRF(GenericMolFile): file_ext = "drf"
[docs] def set_meta(self, dataset, **kwd): """ Set the number of lines of data in dataset. """ dataset.metadata.number_of_molecules = count_special_lines('\"ligand id\"', dataset.file_name, invert=True)
[docs]class PHAR(GenericMolFile): """ Pharmacophore database format from silicos-it. """ file_ext = "phar"
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: dataset.peek = get_file_peek(dataset.file_name) dataset.blurb = "pharmacophore" else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs]@build_sniff_from_prefix class PDB(GenericMolFile): """ Protein Databank format. http://www.wwpdb.org/documentation/format33/v3.3.html """ file_ext = "pdb" MetadataElement(name="chain_ids", default=[], desc="Chain IDs", readonly=False, visible=True)
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a PDB file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('5e5z.pdb') >>> PDB().sniff(fname) True >>> fname = get_test_fname('drugbank_drugs.cml') >>> PDB().sniff(fname) False """ headers = iter_headers(file_prefix, sep=' ', count=300) h = t = c = s = k = e = False for line in headers: section_name = line[0].strip() if section_name == 'HEADER': h = True elif section_name == 'TITLE': t = True elif section_name == 'COMPND': c = True elif section_name == 'SOURCE': s = True elif section_name == 'KEYWDS': k = True elif section_name == 'EXPDTA': e = True if h * t * c * s * k * e: return True else: return False
[docs] def set_meta(self, dataset, **kwd): """ Find Chain_IDs for metadata. """ try: chain_ids = set() with open(dataset.file_name) as fh: for line in fh: if line.startswith('ATOM ') or line.startswith('HETATM'): if line[21] != ' ': chain_ids.add(line[21]) dataset.metadata.chain_ids = list(chain_ids) except Exception as e: log.error('Error finding chain_ids: %s', unicodify(e)) raise
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: atom_numbers = count_special_lines("^ATOM", dataset.file_name) hetatm_numbers = count_special_lines("^HETATM", dataset.file_name) chain_ids = ','.join(dataset.metadata.chain_ids) if len(dataset.metadata.chain_ids) > 0 else 'None' dataset.peek = get_file_peek(dataset.file_name) dataset.blurb = f"{atom_numbers} atoms and {hetatm_numbers} HET-atoms\nchain_ids: {chain_ids}" else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs]@build_sniff_from_prefix class PDBQT(GenericMolFile): """ PDBQT Autodock and Autodock Vina format http://autodock.scripps.edu/faqs-help/faq/what-is-the-format-of-a-pdbqt-file """ file_ext = "pdbqt"
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a PDBQT file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('NuBBE_1_obabel_3D.pdbqt') >>> PDBQT().sniff(fname) True >>> fname = get_test_fname('drugbank_drugs.cml') >>> PDBQT().sniff(fname) False """ headers = iter_headers(file_prefix, sep=' ', count=300) h = t = c = s = k = False for line in headers: section_name = line[0].strip() if section_name == 'REMARK': h = True elif section_name == 'ROOT': t = True elif section_name == 'ENDROOT': c = True elif section_name == 'BRANCH': s = True elif section_name == 'TORSDOF': k = True if h * t * c * s * k: return True else: return False
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: root_numbers = count_special_lines("^ROOT", dataset.file_name) branch_numbers = count_special_lines("^BRANCH", dataset.file_name) dataset.peek = get_file_peek(dataset.file_name) dataset.blurb = f"{root_numbers} roots and {branch_numbers} branches" else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs]@build_sniff_from_prefix class PQR(GenericMolFile): """ Protein Databank format. https://apbs-pdb2pqr.readthedocs.io/en/latest/formats/pqr.html """ file_ext = "pqr" MetadataElement(name="chain_ids", default=[], desc="Chain IDs", readonly=False, visible=True)
[docs] def get_matcher(self): """ Atom and HETATM line fields are space separated, match group: 0: Field_name A string which specifies the type of PQR entry: ATOM or HETATM. 1: Atom_number An integer which provides the atom index. 2: Atom_name A string which provides the atom name. 3: Residue_name A string which provides the residue name. 5: Chain_ID (Optional, group 4 is whole field) An optional string which provides the chain ID of the atom. Note that chain ID support is a new feature of APBS 0.5.0 and later versions. 6: Residue_number An integer which provides the residue index. 7: X 8: Y 9: Z 3 floats which provide the atomic coordinates (in angstroms) 10: Charge A float which provides the atomic charge (in electrons). 11: Radius A float which provides the atomic radius (in angstroms). """ pat = r'(ATOM|HETATM)\s+' +\ r'(\d+)\s+' +\ r'([A-Z0-9]+)\s+' +\ r'([A-Z0-9]+)\s+' +\ r'(([A-Z]?)\s+)?' +\ r'([-+]?\d*\.\d+|\d+)\s+' +\ r'([-+]?\d*\.\d+|\d+)\s+' +\ r'([-+]?\d*\.\d+|\d+)\s+' +\ r'([-+]?\d*\.\d+|\d+)\s+' +\ r'([-+]?\d*\.\d+|\d+)\s+' return re.compile(pat)
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a PQR file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('5e5z.pqr') >>> PQR().sniff(fname) True >>> fname = get_test_fname('drugbank_drugs.cml') >>> PQR().sniff(fname) False """ prog = self.get_matcher() headers = iter_headers(file_prefix, sep=None, comment_designator='REMARK 5', count=3000) h = a = False for line in headers: section_name = line[0].strip() if section_name == 'REMARK': h = True elif section_name == 'ATOM' or section_name == 'HETATM': if prog.match(' '.join(line)): a = True break if h * a: return True else: return False
[docs] def set_meta(self, dataset, **kwd): """ Find Optional Chain_IDs for metadata. """ try: prog = self.get_matcher() chain_ids = set() with open(dataset.file_name) as fh: for line in fh: if line.startswith('REMARK'): continue match = prog.match(line.rstrip()) if match and match.groups()[5]: chain_ids.add(match.groups()[5]) dataset.metadata.chain_ids = list(chain_ids) except Exception as e: log.error('Error finding chain_ids: %s', unicodify(e)) raise
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: atom_numbers = count_special_lines("^ATOM", dataset.file_name) hetatm_numbers = count_special_lines("^HETATM", dataset.file_name) chain_ids = ','.join(dataset.metadata.chain_ids) if len(dataset.metadata.chain_ids) > 0 else 'None' dataset.peek = get_file_peek(dataset.file_name) dataset.blurb = f"{atom_numbers} atoms and {hetatm_numbers} HET-atoms\nchain_ids: {str(chain_ids)}" else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs]class grd(Text): file_ext = "grd"
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: dataset.peek = get_file_peek(dataset.file_name) dataset.blurb = "grids for docking" else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs]class grdtgz(Binary): file_ext = "grd.tgz"
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: dataset.peek = 'binary data' dataset.blurb = "compressed grids for docking" else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs]@build_sniff_from_prefix class InChI(Tabular): file_ext = "inchi" column_names = ['InChI'] MetadataElement(name="columns", default=2, desc="Number of columns", readonly=True, visible=False) MetadataElement(name="column_types", default=['str'], param=metadata.ColumnTypesParameter, desc="Column types", readonly=True, visible=False) MetadataElement(name="number_of_molecules", default=0, desc="Number of molecules", readonly=True, visible=True, optional=True, no_value=0)
[docs] def set_meta(self, dataset, **kwd): """ Set the number of lines of data in dataset. """ dataset.metadata.number_of_molecules = self.count_data_lines(dataset)
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: if (dataset.metadata.number_of_molecules == 1): dataset.blurb = "1 molecule" else: dataset.blurb = f"{dataset.metadata.number_of_molecules} molecules" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a InChI file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('drugbank_drugs.inchi') >>> InChI().sniff(fname) True >>> fname = get_test_fname('drugbank_drugs.cml') >>> InChI().sniff(fname) False """ inchi_lines = iter_headers(file_prefix, sep=' ', count=10) found_lines = False for inchi in inchi_lines: if not inchi[0].startswith('InChI='): return False found_lines = True return found_lines
[docs]class SMILES(Tabular): file_ext = "smi" column_names = ['SMILES', 'TITLE'] MetadataElement(name="columns", default=2, desc="Number of columns", readonly=True, visible=False) MetadataElement(name="column_types", default=['str', 'str'], param=metadata.ColumnTypesParameter, desc="Column types", readonly=True, visible=False) MetadataElement(name="number_of_molecules", default=0, desc="Number of molecules", readonly=True, visible=True, optional=True, no_value=0)
[docs] def set_meta(self, dataset, **kwd): """ Set the number of lines of data in dataset. """ dataset.metadata.number_of_molecules = self.count_data_lines(dataset)
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: if dataset.metadata.number_of_molecules == 1: dataset.blurb = "1 molecule" else: dataset.blurb = f"{dataset.metadata.number_of_molecules} molecules" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
''' def sniff(self, filename): """ Its hard or impossible to sniff a SMILES File. We can try to import the first SMILES and check if it is a molecule, but currently its not possible to use external libraries in datatype definition files. Moreover it seems mpossible to inlcude OpenBabel as python library because OpenBabel is GPL licensed. """ self.molecule_number = count_lines(filename, non_empty = True) word_count = count_lines(filename) if self.molecule_number != word_count: return False if self.molecule_number > 0: # test first 3 SMILES smiles_lines = get_headers(filename, sep='\t', count=3) for smiles_line in smiles_lines: if len(smiles_line) > 2: return False smiles = smiles_line[0] try: # if we have atoms, we have a molecule if not len(pybel.readstring('smi', smiles).atoms) > 0: return False except Exception: # if convert fails its not a smiles string return False return True else: return False '''
[docs]@build_sniff_from_prefix class CML(GenericXml): """ Chemical Markup Language http://cml.sourceforge.net/ """ file_ext = "cml" MetadataElement(name="number_of_molecules", default=0, desc="Number of molecules", readonly=True, visible=True, optional=True, no_value=0)
[docs] def set_meta(self, dataset, **kwd): """ Set the number of lines of data in dataset. """ dataset.metadata.number_of_molecules = count_special_lines(r'^\s*<molecule', dataset.file_name)
[docs] def set_peek(self, dataset, is_multi_byte=False): if not dataset.dataset.purged: if (dataset.metadata.number_of_molecules == 1): dataset.blurb = "1 molecule" else: dataset.blurb = f"{dataset.metadata.number_of_molecules} molecules" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk'
[docs] def sniff_prefix(self, file_prefix): """ Try to guess if the file is a CML file. >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname('interval.interval') >>> CML().sniff(fname) False >>> fname = get_test_fname('drugbank_drugs.cml') >>> CML().sniff(fname) True """ for expected_string in ['<?xml version="1.0"?>', 'http://www.xml-cml.org/schema']: if expected_string not in file_prefix.contents_header: return False return True
[docs] @classmethod def split(cls, input_datasets, subdir_generator_function, split_params): """ Split the input files by molecule records. """ if split_params is None: return None if len(input_datasets) > 1: raise Exception("CML-file splitting does not support multiple files") input_files = [ds.file_name for ds in input_datasets] chunk_size = None if split_params['split_mode'] == 'number_of_parts': raise Exception(f"Split mode \"{split_params['split_mode']}\" is currently not implemented for CML-files.") elif split_params['split_mode'] == 'to_size': chunk_size = int(split_params['split_size']) else: raise Exception(f"Unsupported split mode {split_params['split_mode']}") def _read_cml_records(filename): lines = [] with open(filename) as handle: for line in handle: if line.lstrip().startswith('<?xml version="1.0"?>') or \ line.lstrip().startswith('<cml xmlns="http://www.xml-cml.org/schema') or \ line.lstrip().startswith('</cml>'): continue lines.append(line) if line.lstrip().startswith('</molecule>'): yield lines lines = [] header_lines = ['<?xml version="1.0"?>\n', '<cml xmlns="http://www.xml-cml.org/schema">\n'] footer_line = ['</cml>\n'] def _write_part_cml_file(accumulated_lines): part_dir = subdir_generator_function() part_path = os.path.join(part_dir, os.path.basename(input_files[0])) with open(part_path, 'w') as part_file: part_file.writelines(header_lines) part_file.writelines(accumulated_lines) part_file.writelines(footer_line) try: cml_records = _read_cml_records(input_files[0]) cml_lines_accumulated = [] for counter, cml_record in enumerate(cml_records, start=1): cml_lines_accumulated.extend(cml_record) if counter % chunk_size == 0: _write_part_cml_file(cml_lines_accumulated) cml_lines_accumulated = [] if cml_lines_accumulated: _write_part_cml_file(cml_lines_accumulated) except Exception as e: log.error('Unable to split files: %s', unicodify(e)) raise
[docs] @staticmethod def merge(split_files, output_file): """ Merging CML files. """ if len(split_files) == 1: # For one file only, use base class method (move/copy) return Text.merge(split_files, output_file) if not split_files: raise ValueError("Given no CML files, %r, to merge into %s" % (split_files, output_file)) with open(output_file, "w") as out: for filename in split_files: with open(filename) as handle: header = handle.readline() if not header: raise ValueError(f"CML file {filename} was empty") if not header.lstrip().startswith('<?xml version="1.0"?>'): out.write(header) raise ValueError(f"{filename} is not a valid XML file!") line = handle.readline() header += line if not line.lstrip().startswith('<cml xmlns="http://www.xml-cml.org/schema'): out.write(header) raise ValueError(f"{filename} is not a CML file!") molecule_found = False for line in handle.readlines(): # We found two required header lines, the next line should start with <molecule > if line.lstrip().startswith('</cml>'): continue if line.lstrip().startswith('<molecule'): molecule_found = True if molecule_found: out.write(line) out.write("</cml>\n")