Source code for cwltool.stdfsaccess

"""Abstracted IO access."""

import glob
import os
import urllib
from typing import IO, Any, List

from schema_salad.ref_resolver import file_uri, uri_file_path

def abspath(src: str, basedir: str) -> str:
    if src.startswith("file://"):
        abpath = uri_file_path(src)
    elif urllib.parse.urlsplit(src).scheme in ["http", "https"]:
        return src
        if basedir.startswith("file://"):
            abpath = src if os.path.isabs(src) else basedir + "/" + src
            abpath = src if os.path.isabs(src) else os.path.join(basedir, src)
    return abpath

[docs]class StdFsAccess: """Local filesystem implementation."""
[docs] def __init__(self, basedir: str) -> None: """Perform operations with respect to a base directory.""" self.basedir = basedir
def _abs(self, p: str) -> str: return abspath(p, self.basedir)
[docs] def glob(self, pattern: str) -> List[str]: return [file_uri(str(self._abs(line))) for line in glob.glob(self._abs(pattern))]
[docs] def open(self, fn: str, mode: str) -> IO[Any]: return open(self._abs(fn), mode)
[docs] def exists(self, fn: str) -> bool: return os.path.exists(self._abs(fn))
[docs] def size(self, fn: str) -> int: return os.stat(self._abs(fn)).st_size
[docs] def isfile(self, fn: str) -> bool: return os.path.isfile(self._abs(fn))
[docs] def isdir(self, fn: str) -> bool: return os.path.isdir(self._abs(fn))
[docs] def listdir(self, fn: str) -> List[str]: return [abspath(urllib.parse.quote(entry), fn) for entry in os.listdir(self._abs(fn))]
[docs] def join(self, path, *paths): # type: (str, *str) -> str return os.path.join(path, *paths)
[docs] def realpath(self, path: str) -> str: return os.path.realpath(path)